Skip to content

query

LocalQueryService

Bases: QueryService

Source code in datasets/services/query.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class LocalQueryService(QueryService):
    database: str

    def __init__(self, database: str):
        self.database = database

    def query(self, query: str, limit: int = 10, timeout: int = None, **options) -> dict:
        try:
            with StardogApi.connection(self.database) as conn:
                if 'LIMIT' in query:
                    limit = None

                if is_graph_query(query):
                    return {
                        'application/n-triples': conn.graph(query, 'application/n-triples', limit=limit,
                            timeout=timeout).decode('utf-8')
                    }
                else:
                    output = conn.select(query, limit=limit, timeout=timeout)
                    return {
                        'application/sparql-results+json': json.dumps(output)
                    }
        except stardog.exceptions.StardogException as e:
            raise QueryExecutionException(str(e))

database = database instance-attribute

__init__(database)

Source code in datasets/services/query.py
28
29
def __init__(self, database: str):
    self.database = database

query(query, limit=10, timeout=None, **options)

Source code in datasets/services/query.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def query(self, query: str, limit: int = 10, timeout: int = None, **options) -> dict:
    try:
        with StardogApi.connection(self.database) as conn:
            if 'LIMIT' in query:
                limit = None

            if is_graph_query(query):
                return {
                    'application/n-triples': conn.graph(query, 'application/n-triples', limit=limit,
                        timeout=timeout).decode('utf-8')
                }
            else:
                output = conn.select(query, limit=limit, timeout=timeout)
                return {
                    'application/sparql-results+json': json.dumps(output)
                }
    except stardog.exceptions.StardogException as e:
        raise QueryExecutionException(str(e))

QueryExecutionException

Bases: Exception

Source code in datasets/services/query.py
11
12
class QueryExecutionException(Exception):
    pass

QueryService

Bases: ABC

Source code in datasets/services/query.py
15
16
17
18
19
20
21
22
class QueryService(ABC):
    @abstractmethod
    def query(self, query: str, limit: int = 10, timeout: int = None, **options) -> dict:
        pass

    def query_select(self, query: str, limit: int = 10, timeout: int = None, **options) -> dict:
        data = self.query(query, limit, timeout, **options)
        return json.loads(data['application/sparql-results+json'])

query(query, limit=10, timeout=None, **options) abstractmethod

Source code in datasets/services/query.py
16
17
18
@abstractmethod
def query(self, query: str, limit: int = 10, timeout: int = None, **options) -> dict:
    pass

query_select(query, limit=10, timeout=None, **options)

Source code in datasets/services/query.py
20
21
22
def query_select(self, query: str, limit: int = 10, timeout: int = None, **options) -> dict:
    data = self.query(query, limit, timeout, **options)
    return json.loads(data['application/sparql-results+json'])

SPARQLQueryService

Bases: QueryService

Source code in datasets/services/query.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class SPARQLQueryService(QueryService):
    endpoint: str

    def __init__(self, endpoint: str):
        self.endpoint = endpoint

    def query(self, query: str, limit: int = 10, timeout: int = None, ignore_limit=False, **options) -> dict:
        if 'LIMIT' not in query.upper() and not ignore_limit:
            raise QueryExecutionException(f'SPARQL queries must specify a LIMIT')

        accept = 'application/n-triples' if is_graph_query(query) else 'application/sparql-results+json'

        with Session() as session:
            response = session.post(
                self.endpoint,
                data=query,
                params={
                    'limit': limit,
                    'timeout': timeout,
                },
                headers={
                    'User-Agent': 'https://github.com/EgorDm/BOLD',
                    'Content-Type': 'application/sparql-query',
                    'Accept': accept,
                },
                timeout=timeout,
                allow_redirects=False
            )

            retry_count = 0
            while response.status_code // 100 == 3 and retry_count < 3:
                request = response.request
                request.url = response.headers.get('Location')
                response = session.send(response.request)
                retry_count += 1

        if response.status_code != 200:
            raise QueryExecutionException(f'{response.status_code} {response.reason}\n{response.text}')

        if accept == 'application/n-triples':
            return {'application/n-triples': response.text}
        else:
            return {'application/sparql-results+json': response.text}

endpoint = endpoint instance-attribute

__init__(endpoint)

Source code in datasets/services/query.py
54
55
def __init__(self, endpoint: str):
    self.endpoint = endpoint

query(query, limit=10, timeout=None, ignore_limit=False, **options)

Source code in datasets/services/query.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def query(self, query: str, limit: int = 10, timeout: int = None, ignore_limit=False, **options) -> dict:
    if 'LIMIT' not in query.upper() and not ignore_limit:
        raise QueryExecutionException(f'SPARQL queries must specify a LIMIT')

    accept = 'application/n-triples' if is_graph_query(query) else 'application/sparql-results+json'

    with Session() as session:
        response = session.post(
            self.endpoint,
            data=query,
            params={
                'limit': limit,
                'timeout': timeout,
            },
            headers={
                'User-Agent': 'https://github.com/EgorDm/BOLD',
                'Content-Type': 'application/sparql-query',
                'Accept': accept,
            },
            timeout=timeout,
            allow_redirects=False
        )

        retry_count = 0
        while response.status_code // 100 == 3 and retry_count < 3:
            request = response.request
            request.url = response.headers.get('Location')
            response = session.send(response.request)
            retry_count += 1

    if response.status_code != 200:
        raise QueryExecutionException(f'{response.status_code} {response.reason}\n{response.text}')

    if accept == 'application/n-triples':
        return {'application/n-triples': response.text}
    else:
        return {'application/sparql-results+json': response.text}

is_graph_query(query)

Source code in datasets/services/query.py
96
97
def is_graph_query(query: str) -> bool:
    return 'CONSTRUCT' in query.upper().strip() or 'DESCRIBE' in query.upper().strip()