@shared_task()
def import_dataset(dataset_id: UUID, files: List[str] = None) -> str:
dataset = Dataset.objects.get(id=dataset_id)
source = dataset.source
logger.info(f"Importing dataset {dataset.name}")
Dataset.objects.filter(id=dataset_id).update(
state=DatasetState.IMPORTING.value,
import_task_id=import_dataset.request.id,
)
tmp_dir = DOWNLOAD_DIR / random_string(10)
tmp_dir.mkdir(parents=True)
files = files or []
files = list(map(Path, files))
try:
source_type = source.get('source_type', None)
match (dataset.mode, source_type):
case (Dataset.Mode.LOCAL.value, 'urls'):
urls = source.get('urls', [])
if len(urls) == 0:
raise Exception("No URLs specified")
logger.info(f"Downloading {len(urls)} files")
files = []
for url in set(urls):
file = download_url(url, str(tmp_dir))
files.append(file)
logger.info(f"Importing {len(files)} files")
dataset.local_database = import_files(files)
logger.info(f'Created database {dataset.local_database}')
case (Dataset.Mode.LOCAL.value, 'existing'):
dataset.local_database = source.get('database', None)
logger.info(f'Using existing database {dataset.local_database}')
case (Dataset.Mode.SPARQL.value, 'sparql'):
dataset.sparql_endpoint = source.get('sparql', None)
logger.info(f'Using sparql endpoint {dataset.sparql_endpoint}')
case (Dataset.Mode.LOCAL.value, 'upload'):
if len(files) == 0:
raise Exception("No files specified")
dataset.local_database = import_files(files)
logger.info(f'Using existing database {dataset.local_database}')
case _:
raise Exception(f"Unsupported source type {source_type}")
dataset.save()
logger.info(f"Updating dataset info")
update_dataset_info(dataset_id)
create_default_search_index(path=str(tmp_dir), force=False)
if dataset.search_mode == Dataset.SearchMode.LOCAL.value:
logger.info(f"Creating search index")
create_search_index(dataset_id, path=str(tmp_dir))
logger.info(f"Import finished")
Dataset.objects.filter(id=dataset_id).update(state=DatasetState.IMPORTED.value)
except Exception as e:
logger.error(f"Error importing dataset {dataset.name}: {e}")
Dataset.objects.filter(id=dataset_id).update(state=DatasetState.FAILED.value)
raise e
finally:
logger.info(f"Cleaning up {tmp_dir}")
shutil.rmtree(tmp_dir, ignore_errors=True)
for file in files:
logger.info(f"Cleaning up {file}")
try:
file.unlink()
except Exception as e:
logger.error(f"Error deleting {file}: {e}")