@shared_task()
def run_cell(report_id: UUID, cell_id: UUID) -> str:
report: Report = Report.objects.get(id=report_id)
state = report.get_cell_state(cell_id)
if state and state['status'] == CellState.RUNNING.value:
raise Exception(f'Cell {cell_id} is already running')
logger.info(f'Running cell {cell_id} in notebook {report_id}')
Report.update_cell_state(report_id, cell_id, CellState.RUNNING)
report: Report = Report.objects.get(id=report_id)
dataset: Dataset = report.dataset
error = False
try:
cell = report.get_cell(cell_id)
if cell is None:
raise Exception(f'Cell {cell_id} not found in notebook {report_id}')
if dataset.state != DatasetState.IMPORTED.value:
raise Exception(f'Dataset {dataset.id} is not imported yet')
timeout = deepget(cell, ['metadata', 'timeout'], None)
limit = deepget(cell, ['metadata', 'limit'], None)
outputs: list = []
cell_type = cell.get('cell_type', '')
match cell_type:
case 'code':
outputs, error = run_sparql(dataset, cell.get('source', ''), timeout, limit)
case _ if cell_type.startswith('widget_'):
snapshot = cell.get('data', {})
for source in cell.get('source', []):
outputs_s, error = run_sparql(dataset, source, timeout, limit)
for output in outputs_s:
if output.get('output_type') == 'execute_result':
output['snapshot'] = snapshot
outputs.extend(outputs_s)
if error:
break
# outputs.
case _:
raise Exception(f'Cell {cell_id} in notebook {report_id} has unknown cell type {cell_type}')
Report.update_cell_outputs(report_id, cell_id, outputs)
except Exception as e:
logger.error(f'Error running cell {cell_id} in notebook {report_id}')
Report.update_cell_state(report_id, cell_id, CellState.ERROR)
raise e
Report.update_cell_state(report_id, cell_id, CellState.ERROR if error else CellState.FINISHED)