@shared_task()
def import_files(files: List[Path], database: Optional[str] = None) -> str:
if database is None:
database = 'a' + random_string(10)
logger.info(f"Starting KG import {files} into {database}")
if isinstance(files, str):
files = Path(files)
if isinstance(files, (Path, str)):
if files.is_dir():
files = list(files.glob('**/*'))
else:
files = [files]
logger.info(f"Loading KG from {files}")
with StardogApi.admin() as admin:
admin.new_database(
database,
{'strict.parsing': False},
*[
stardog.content.File(str(file.absolute()))
for file in files
]
)
return database