Skip to content

pipeline

logger = get_logger() module-attribute

delete_dataset(dataset_id)

Source code in datasets/tasks/pipeline.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@shared_task()
def delete_dataset(dataset_id: UUID) -> str:
    dataset = Dataset.objects.get(id=dataset_id)
    logger.info(f"Deleting dataset {dataset.name}")

    if dataset.search_mode == Dataset.SearchMode.LOCAL.value:
        if dataset.search_index_path and dataset.search_index_path.exists():
            logger.info(f"Deleting search index {dataset.search_index_path}")
            shutil.rmtree(dataset.search_index_path)

    if dataset.mode == Dataset.Mode.LOCAL.value and dataset.local_database:
        logger.info(f"Deleting database {dataset.local_database}")
        with StardogApi.admin() as admin:
            admin.database(dataset.local_database).drop()

    dataset.delete()

import_dataset(dataset_id, files=None)

Source code in datasets/tasks/pipeline.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@shared_task()
def import_dataset(dataset_id: UUID, files: List[str] = None) -> str:
    dataset = Dataset.objects.get(id=dataset_id)
    source = dataset.source
    logger.info(f"Importing dataset {dataset.name}")

    Dataset.objects.filter(id=dataset_id).update(
        state=DatasetState.IMPORTING.value,
        import_task_id=import_dataset.request.id,
    )
    tmp_dir = DOWNLOAD_DIR / random_string(10)
    tmp_dir.mkdir(parents=True)

    files = files or []
    files = list(map(Path, files))

    try:
        source_type = source.get('source_type', None)
        match (dataset.mode, source_type):
            case (Dataset.Mode.LOCAL.value, 'urls'):
                urls = source.get('urls', [])
                if len(urls) == 0:
                    raise Exception("No URLs specified")

                logger.info(f"Downloading {len(urls)} files")
                files = []
                for url in set(urls):
                    file = download_url(url, str(tmp_dir))
                    files.append(file)

                logger.info(f"Importing {len(files)} files")
                dataset.local_database = import_files(files)
                logger.info(f'Created database {dataset.local_database}')
            case (Dataset.Mode.LOCAL.value, 'existing'):
                dataset.local_database = source.get('database', None)
                logger.info(f'Using existing database {dataset.local_database}')
            case (Dataset.Mode.SPARQL.value, 'sparql'):
                dataset.sparql_endpoint = source.get('sparql', None)
                logger.info(f'Using sparql endpoint {dataset.sparql_endpoint}')
            case (Dataset.Mode.LOCAL.value, 'upload'):
                if len(files) == 0:
                    raise Exception("No files specified")

                dataset.local_database = import_files(files)
                logger.info(f'Using existing database {dataset.local_database}')
            case _:
                raise Exception(f"Unsupported source type {source_type}")

        dataset.save()

        logger.info(f"Updating dataset info")
        update_dataset_info(dataset_id)

        create_default_search_index(path=str(tmp_dir), force=False)
        if dataset.search_mode == Dataset.SearchMode.LOCAL.value:
            logger.info(f"Creating search index")
            create_search_index(dataset_id, path=str(tmp_dir))

        logger.info(f"Import finished")
        Dataset.objects.filter(id=dataset_id).update(state=DatasetState.IMPORTED.value)
    except Exception as e:
        logger.error(f"Error importing dataset {dataset.name}: {e}")
        Dataset.objects.filter(id=dataset_id).update(state=DatasetState.FAILED.value)
        raise e
    finally:
        logger.info(f"Cleaning up {tmp_dir}")
        shutil.rmtree(tmp_dir, ignore_errors=True)

        for file in files:
            logger.info(f"Cleaning up {file}")
            try:
                file.unlink()
            except Exception as e:
                logger.error(f"Error deleting {file}: {e}")