Skip to content

imports

logger = get_logger() module-attribute

download_url(url, path=None)

Source code in datasets/tasks/imports.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@shared_task()
def download_url(url: str, path: str = None) -> Optional[Path]:
    logger.info(f"Downloading knowledge graph from {url}")
    download_folder = (Path(path) if path else DOWNLOAD_DIR) / random_string(10)
    download_folder.mkdir(parents=True)

    try:
        if 'github.com' in url and 'raw' not in url:
            logger.info(f"Downloading from github raw")
            url += ('&raw=true' if urlparse(url).query else '?raw=true')
    except Exception as e:
        logger.error(f"Error preprocessing {url}: {e}")
        raise Exception(e)

    try:
        filename = None

        logger.info('Trying to infer file name')
        remotefile = urlopen(url)
        headerblob = remotefile.info().get('Content-Disposition', None)
        if headerblob:
            value, params = cgi.parse_header(headerblob)
            filename = params.get('filename', None)

        if filename is None:
            filename = os.path.basename(urlparse(url).path)
    except Exception as e:
        logger.error(f"Failed to parse URL {url}. Error: {e}")
        shutil.rmtree(download_folder)
        raise Exception(e)

    download_path = download_folder / filename

    try:
        logger.info(f"Downloading {url} to {download_path}")
        # TODO: add a hook to check if file is not too big
        urlretrieve(url, download_path)
        logger.info(f"Downloaded {url} to {download_path}")
    except Exception as e:
        logger.error(f"Failed to download {url}. Error: {e}")
        shutil.rmtree(download_folder)
        raise Exception(e)

    # TODO: We may need to rename the file extension.
    return download_path

import_files(files, database=None)

Source code in datasets/tasks/imports.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@shared_task()
def import_files(files: List[Path], database: Optional[str] = None) -> str:
    if database is None:
        database = 'a' + random_string(10)

    logger.info(f"Starting KG import {files} into {database}")
    if isinstance(files, str):
        files = Path(files)

    if isinstance(files, (Path, str)):
        if files.is_dir():
            files = list(files.glob('**/*'))
        else:
            files = [files]

    logger.info(f"Loading KG from {files}")
    with StardogApi.admin() as admin:
        admin.new_database(
            database,
            {'strict.parsing': False},
            *[
                stardog.content.File(str(file.absolute()))
                for file in files
            ]
        )

    return database