Skip to content

models

CellState

Bases: Enum

Source code in reports/models.py
18
19
20
21
22
23
24
class CellState(Enum):
    # 'FINISHED' | 'ERROR' | 'RUNNING' | 'QUEUED' | 'INITIAL'
    FINISHED = 'FINISHED'
    ERROR = 'ERROR'
    RUNNING = 'RUNNING'
    QUEUED = 'QUEUED'
    INITIAL = 'INITIAL'

ERROR = 'ERROR' class-attribute

FINISHED = 'FINISHED' class-attribute

INITIAL = 'INITIAL' class-attribute

QUEUED = 'QUEUED' class-attribute

RUNNING = 'RUNNING' class-attribute

PacketType

Bases: Enum

Source code in reports/models.py
27
28
29
30
class PacketType(Enum):
    CELL_RUN = 'CELL_RUN'
    CELL_RESULT = 'CELL_RESULT'
    CELL_STATE = 'CELL_STATE'

CELL_RESULT = 'CELL_RESULT' class-attribute

CELL_RUN = 'CELL_RUN' class-attribute

CELL_STATE = 'CELL_STATE' class-attribute

Report

Bases: TaskMixin, TimeStampMixin

Source code in reports/models.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class Report(TaskMixin, TimeStampMixin):
    class ShareModes(models.TextChoices):
        PRIVATE = 'PRIVATE', _('Private')
        PUBLIC_READONLY = 'PUBLIC_READONLY', _('Public (read-only)')
        PUBLIC_READWRITE = 'PUBLIC_READWRITE', _('Public (read-write)')

    id = models.UUIDField(default=uuid.uuid4, primary_key=True)
    creator = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True)
    dataset = models.ForeignKey(Dataset, on_delete=models.CASCADE)
    notebook = models.JSONField()

    share_mode = models.CharField(max_length=255, choices=ShareModes.choices, default=ShareModes.PRIVATE)
    discoverable = models.BooleanField(default=False)

    objects = models.Manager()

    def get_cell(self, cell_id: uuid.UUID):
        cell = deepget(self.notebook, ['content', 'cells', str(cell_id)])
        if cell is None:
            raise Exception(f'Cell {cell_id} not found in notebook {self.id}')

        return cell

    def get_cell_state(self, cell_id: uuid.UUID):
        state = deepget(self.notebook, ['results', 'states', str(cell_id)])
        return state

    @staticmethod
    def update_cell_state(report_id: uuid.UUID, cell_id: uuid.UUID, state: CellState):
        Report.objects.filter(id=report_id).update(
            notebook=q_json_update('notebook', ['results', 'states', str(cell_id), 'status'], state.value)
        )
        report = Report.objects.get(id=report_id)
        state = deepget(report.notebook, ['results', 'states', str(cell_id)])

        send_to_group_sync(str(report_id), {
            'type': 'task_message',
            'message': Packet(PacketType.CELL_STATE.value, {
                'cell_id': str(cell_id),
                'state': state,
            }).dumps()
        })

    @staticmethod
    def update_cell_outputs(report_id: uuid.UUID, cell_id: uuid.UUID, outputs: list):
        Report.objects.filter(id=report_id).update(
            notebook=q_json_update('notebook', ['results', 'outputs', str(cell_id)], outputs)
        )
        report = Report.objects.get(id=report_id)
        result = deepget(report.notebook, ['results', 'outputs', str(cell_id)])

        send_to_group_sync(str(report_id), {
            'type': 'task_message',
            'message': Packet(PacketType.CELL_RESULT.value, {
                'cell_id': str(cell_id),
                'outputs': result,
            }).dumps()
        })

    def can_edit(self, user: User):
        return user and (
            user.is_superuser or
            self.creator == user or
            self.share_mode == self.ShareModes.PUBLIC_READWRITE
        )

    def can_view(self, user: User):
        return user and (
            user.is_superuser or
            self.creator == user or
            self.share_mode != self.ShareModes.PRIVATE
        )

creator = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True) class-attribute

dataset = models.ForeignKey(Dataset, on_delete=models.CASCADE) class-attribute

discoverable = models.BooleanField(default=False) class-attribute

id = models.UUIDField(default=uuid.uuid4, primary_key=True) class-attribute

notebook = models.JSONField() class-attribute

objects = models.Manager() class-attribute

share_mode = models.CharField(max_length=255, choices=ShareModes.choices, default=ShareModes.PRIVATE) class-attribute

ShareModes

Bases: models.TextChoices

Source code in reports/models.py
34
35
36
37
class ShareModes(models.TextChoices):
    PRIVATE = 'PRIVATE', _('Private')
    PUBLIC_READONLY = 'PUBLIC_READONLY', _('Public (read-only)')
    PUBLIC_READWRITE = 'PUBLIC_READWRITE', _('Public (read-write)')

PRIVATE = ('PRIVATE', _('Private')) class-attribute

PUBLIC_READONLY = ('PUBLIC_READONLY', _('Public (read-only)')) class-attribute

PUBLIC_READWRITE = ('PUBLIC_READWRITE', _('Public (read-write)')) class-attribute

can_edit(user)

Source code in reports/models.py
92
93
94
95
96
97
def can_edit(self, user: User):
    return user and (
        user.is_superuser or
        self.creator == user or
        self.share_mode == self.ShareModes.PUBLIC_READWRITE
    )

can_view(user)

Source code in reports/models.py
 99
100
101
102
103
104
def can_view(self, user: User):
    return user and (
        user.is_superuser or
        self.creator == user or
        self.share_mode != self.ShareModes.PRIVATE
    )

get_cell(cell_id)

Source code in reports/models.py
49
50
51
52
53
54
def get_cell(self, cell_id: uuid.UUID):
    cell = deepget(self.notebook, ['content', 'cells', str(cell_id)])
    if cell is None:
        raise Exception(f'Cell {cell_id} not found in notebook {self.id}')

    return cell

get_cell_state(cell_id)

Source code in reports/models.py
56
57
58
def get_cell_state(self, cell_id: uuid.UUID):
    state = deepget(self.notebook, ['results', 'states', str(cell_id)])
    return state

update_cell_outputs(report_id, cell_id, outputs) staticmethod

Source code in reports/models.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@staticmethod
def update_cell_outputs(report_id: uuid.UUID, cell_id: uuid.UUID, outputs: list):
    Report.objects.filter(id=report_id).update(
        notebook=q_json_update('notebook', ['results', 'outputs', str(cell_id)], outputs)
    )
    report = Report.objects.get(id=report_id)
    result = deepget(report.notebook, ['results', 'outputs', str(cell_id)])

    send_to_group_sync(str(report_id), {
        'type': 'task_message',
        'message': Packet(PacketType.CELL_RESULT.value, {
            'cell_id': str(cell_id),
            'outputs': result,
        }).dumps()
    })

update_cell_state(report_id, cell_id, state) staticmethod

Source code in reports/models.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@staticmethod
def update_cell_state(report_id: uuid.UUID, cell_id: uuid.UUID, state: CellState):
    Report.objects.filter(id=report_id).update(
        notebook=q_json_update('notebook', ['results', 'states', str(cell_id), 'status'], state.value)
    )
    report = Report.objects.get(id=report_id)
    state = deepget(report.notebook, ['results', 'states', str(cell_id)])

    send_to_group_sync(str(report_id), {
        'type': 'task_message',
        'message': Packet(PacketType.CELL_STATE.value, {
            'cell_id': str(cell_id),
            'state': state,
        }).dumps()
    })