Skip to content

tasks

DEFAULT_LIMIT = 100 module-attribute

DEFAULT_TIMEOUT = 5000 module-attribute

logger = get_logger() module-attribute

result_error(e, duration)

Source code in reports/tasks.py
 97
 98
 99
100
101
102
103
104
def result_error(e: QueryExecutionException, duration: float):
    return {
        'output_type': 'error',
        'ename': type(e).__name__,
        'evalue': str(e),
        'traceback': [],
        'execution_time': duration
    }

run_cell(report_id, cell_id)

Source code in reports/tasks.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@shared_task()
def run_cell(report_id: UUID, cell_id: UUID) -> str:
    report: Report = Report.objects.get(id=report_id)
    state = report.get_cell_state(cell_id)
    if state and state['status'] == CellState.RUNNING.value:
        raise Exception(f'Cell {cell_id} is already running')

    logger.info(f'Running cell {cell_id} in notebook {report_id}')
    Report.update_cell_state(report_id, cell_id, CellState.RUNNING)

    report: Report = Report.objects.get(id=report_id)
    dataset: Dataset = report.dataset
    error = False

    try:
        cell = report.get_cell(cell_id)

        if cell is None:
            raise Exception(f'Cell {cell_id} not found in notebook {report_id}')

        if dataset.state != DatasetState.IMPORTED.value:
            raise Exception(f'Dataset {dataset.id} is not imported yet')

        timeout = deepget(cell, ['metadata', 'timeout'], None)
        limit = deepget(cell, ['metadata', 'limit'], None)

        outputs: list = []
        cell_type = cell.get('cell_type', '')
        match cell_type:
            case 'code':
                outputs, error = run_sparql(dataset, cell.get('source', ''), timeout, limit)
            case _ if cell_type.startswith('widget_'):
                snapshot = cell.get('data', {})
                for source in cell.get('source', []):
                    outputs_s, error = run_sparql(dataset, source, timeout, limit)

                    for output in outputs_s:
                        if output.get('output_type') == 'execute_result':
                            output['snapshot'] = snapshot

                    outputs.extend(outputs_s)
                    if error:
                        break

                # outputs.
            case _:
                raise Exception(f'Cell {cell_id} in notebook {report_id} has unknown cell type {cell_type}')

        Report.update_cell_outputs(report_id, cell_id, outputs)
    except Exception as e:
        logger.error(f'Error running cell {cell_id} in notebook {report_id}')
        Report.update_cell_state(report_id, cell_id, CellState.ERROR)
        raise e

    Report.update_cell_state(report_id, cell_id, CellState.ERROR if error else CellState.FINISHED)

run_sparql(dataset, source, timeout=None, limit=None)

Source code in reports/tasks.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def run_sparql(dataset: Dataset, source: str, timeout: int = None, limit: int = None):
    start_time = timer()
    limit = (limit or DEFAULT_LIMIT) if 'LIMIT ' not in source.upper() else None
    timeout = timeout or DEFAULT_TIMEOUT

    outputs, error = [], False
    try:
        output = dataset.get_query_service().query(source, limit, timeout)
        outputs.append({
            'output_type': 'execute_result',
            'execute_count': 1,
            'data': output,
            'execution_time': float(timer() - start_time)
        })
    except QueryExecutionException as e:
        error = True
        outputs.append(result_error(e, float(timer() - start_time)))

    return outputs, error