@shared_task()
def create_search_index(
dataset_id: UUID,
min_term_count: int = 3,
path: str = None,
force: bool = True,
):
dataset = Dataset.objects.get(id=dataset_id)
logger.info(f"Creating search index for {dataset.name}")
database = dataset.local_database
if database is None:
raise Exception("Dataset has no database")
search_index_dir = DATA_DIR / f'search_index_{database}'
if search_index_dir.exists():
if force:
logger.info(f"Removing existing search index at {search_index_dir}")
shutil.rmtree(search_index_dir)
else:
logger.info(f"Search index already exists for {dataset.name}")
return
search_index_dir.mkdir(parents=True, exist_ok=True)
tmp_dir = (Path(path) if path else DOWNLOAD_DIR) / random_string(10)
tmp_dir.mkdir(parents=True, exist_ok=True)
try:
terms_files = []
terms_s_file = tmp_dir / 'terms_s.tsv'
query = QUERY_EXPORT_SEARCH \
.replace('{triple}', '{ ?t ?p ?v }') \
.replace('{min_count}', str(min_term_count)) \
.replace('{pos}', '0')
logger.info(f'Exporting subject search terms {terms_s_file}')
query_to_file(database, query, terms_s_file, timeout=60 * 60 * 1000)
terms_files.append(terms_s_file)
terms_p_file = tmp_dir / 'terms_p.tsv'
query = QUERY_EXPORT_SEARCH \
.replace('{triple}', '{ ?s ?t ?v }') \
.replace('{min_count}', str(min_term_count)) \
.replace('{pos}', '1')
logger.info(f'Exporting predicate search terms {terms_p_file}')
query_to_file(database, query, terms_p_file, timeout=60 * 60 * 1000)
terms_files.append(terms_p_file)
terms_o_file = tmp_dir / 'terms_o.tsv'
query = QUERY_EXPORT_SEARCH \
.replace('{triple}', '{ ?s ?p ?t FILTER(?p != rdfs:label) }') \
.replace('{min_count}', str(min_term_count)) \
.replace('{pos}', '2')
logger.info(f'Exporting object search terms {terms_o_file}')
query_to_file(database, query, terms_o_file, timeout=60 * 60 * 1000)
terms_files.append(terms_o_file)
logger.info('Creating search index from documents')
consume_print(BoldCli.cmd(
['build-index', '--force', *map(str, terms_files), '--index', str(search_index_dir)]
))
logger.info('Search index created')
finally:
logger.info(f"Cleaning up {tmp_dir}")
shutil.rmtree(tmp_dir, ignore_errors=True)