Skip to content

models

CustomGenericRelation

Bases: GenericRelation

Source code in tasks/models.py
181
182
183
class CustomGenericRelation(GenericRelation):
    def bulk_related_objects(self, objs, using="default"):
        return []
Source code in tasks/models.py
182
183
def bulk_related_objects(self, objs, using="default"):
    return []

ModelAsyncResult

Bases: AsyncResult

Source code in tasks/models.py
106
107
108
109
class ModelAsyncResult(AsyncResult):
    def forget(self):
        Task.objects.filter(task_id=self.id).delete()
        return super(ModelAsyncResult, self).forget()

forget()

Source code in tasks/models.py
107
108
109
def forget(self):
    Task.objects.filter(task_id=self.id).delete()
    return super(ModelAsyncResult, self).forget()

Task

Bases: TimeStampMixin

Source code in tasks/models.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class Task(TimeStampMixin):
    STATES = (
        (TaskState.PENDING, 'PENDING'),
        (TaskState.STARTED, 'STARTED'),
        (TaskState.RETRY, 'RETRY'),
        (TaskState.FAILURE, 'FAILURE'),
        (TaskState.SUCCESS, 'SUCCESS'),
    )

    task_id = models.UUIDField(primary_key=True)
    state = models.CharField(choices=STATES, default=TaskState.PENDING, max_length=255)
    object_id = models.UUIDField(null=True)
    content_object = GenericForeignKey()
    content_type = models.ForeignKey(ContentType, null=True, on_delete=models.SET_NULL)
    name = models.CharField(max_length=255)
    creator = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True)
    """The user who started the task."""

    objects = TaskManager()

    def __str__(self):
        return '%s: %s' % (self.task_id, dict(self.STATES)[self.state])

    @property
    def result(self):
        return ModelAsyncResult(self.task_id)

    @property
    def short_id(self):
        return str(self.task_id)[:6]

    def delete(self, using=None, keep_parents=False):
        return super().delete(using, keep_parents)

    def can_view(self, user: User):
        return user.is_superuser or self.creator == user or (
            self.content_object and self.content_object.can_view(user)
        )

STATES = ((TaskState.PENDING, 'PENDING'), (TaskState.STARTED, 'STARTED'), (TaskState.RETRY, 'RETRY'), (TaskState.FAILURE, 'FAILURE'), (TaskState.SUCCESS, 'SUCCESS')) class-attribute

content_object = GenericForeignKey() class-attribute

content_type = models.ForeignKey(ContentType, null=True, on_delete=models.SET_NULL) class-attribute

creator = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True) class-attribute

The user who started the task.

name = models.CharField(max_length=255) class-attribute

object_id = models.UUIDField(null=True) class-attribute

objects = TaskManager() class-attribute

state = models.CharField(choices=STATES, default=TaskState.PENDING, max_length=255) class-attribute

task_id = models.UUIDField(primary_key=True) class-attribute

__str__()

Source code in tasks/models.py
86
87
def __str__(self):
    return '%s: %s' % (self.task_id, dict(self.STATES)[self.state])

can_view(user)

Source code in tasks/models.py
100
101
102
103
def can_view(self, user: User):
    return user.is_superuser or self.creator == user or (
        self.content_object and self.content_object.can_view(user)
    )

delete(using=None, keep_parents=False)

Source code in tasks/models.py
97
98
def delete(self, using=None, keep_parents=False):
    return super().delete(using, keep_parents)

result() property

Source code in tasks/models.py
89
90
91
@property
def result(self):
    return ModelAsyncResult(self.task_id)

short_id() property

Source code in tasks/models.py
93
94
95
@property
def short_id(self):
    return str(self.task_id)[:6]

TaskFilterMixin

Bases: object

Source code in tasks/models.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class TaskFilterMixin(object):
    objects: models.Manager()

    def pending(self):
        return self.objects.filter(state=TaskState.PENDING)

    def started(self):
        return self.objects.filter(state=TaskState.STARTED)

    def retrying(self):
        return self.objects.filter(state=TaskState.RETRY)

    def failed(self):
        return self.objects.filter(state=TaskState.FAILURE)

    def successful(self):
        return self.objects.filter(state=TaskState.SUCCESS)

    def running(self):
        return self.objects.filter(Q(state=TaskState.PENDING) |
                                   Q(state=TaskState.STARTED) |
                                   Q(state=TaskState.RETRY))

    def ready(self):
        return self.objects.filter(Q(state=TaskState.FAILURE) |
                                   Q(state=TaskState.SUCCESS))

objects: models.Manager() class-attribute

failed()

Source code in tasks/models.py
39
40
def failed(self):
    return self.objects.filter(state=TaskState.FAILURE)

pending()

Source code in tasks/models.py
30
31
def pending(self):
    return self.objects.filter(state=TaskState.PENDING)

ready()

Source code in tasks/models.py
50
51
52
def ready(self):
    return self.objects.filter(Q(state=TaskState.FAILURE) |
                               Q(state=TaskState.SUCCESS))

retrying()

Source code in tasks/models.py
36
37
def retrying(self):
    return self.objects.filter(state=TaskState.RETRY)

running()

Source code in tasks/models.py
45
46
47
48
def running(self):
    return self.objects.filter(Q(state=TaskState.PENDING) |
                               Q(state=TaskState.STARTED) |
                               Q(state=TaskState.RETRY))

started()

Source code in tasks/models.py
33
34
def started(self):
    return self.objects.filter(state=TaskState.STARTED)

successful()

Source code in tasks/models.py
42
43
def successful(self):
    return self.objects.filter(state=TaskState.SUCCESS)

TaskManager

Bases: TaskFilterMixin, models.Manager

Source code in tasks/models.py
59
60
61
62
63
class TaskManager(TaskFilterMixin, models.Manager):
    use_for_related_fields = True

    def get_queryset(self):
        return TaskQuerySet(self.model, using=self._db)

get_queryset()

Source code in tasks/models.py
62
63
def get_queryset(self):
    return TaskQuerySet(self.model, using=self._db)

TaskMetaFilterMixin

Bases: object

Source code in tasks/models.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class TaskMetaFilterMixin(object):
    objects: models.Manager()

    def with_tasks(self):
        return self.objects.filter(tasks__state__isnull=False)

    def with_pending_tasks(self):
        return self.objects.filter(tasks__state=TaskState.PENDING)

    def with_started_tasks(self):
        return self.objects.filter(tasks__state=TaskState.STARTED)

    def with_retrying_tasks(self):
        return self.objects.filter(tasks__state=TaskState.RETRY)

    def with_failed_tasks(self):
        return self.objects.filter(tasks__state=TaskState.FAILURE)

    def with_successful_tasks(self):
        return self.objects.filter(tasks__state=TaskState.SUCCESS)

    def with_running_tasks(self):
        return self.objects.filter(Q(tasks__state=TaskState.PENDING) |
                                   Q(tasks__state=TaskState.STARTED) |
                                   Q(tasks__state=TaskState.RETRY))

    def with_ready_tasks(self):
        return self.objects.filter(Q(tasks__state=TaskState.FAILURE) |
                                   Q(tasks__state=TaskState.SUCCESS))

    def without_tasks(self):
        return self.objects.exclude(tasks__state__isnull=False)

    def without_pending_tasks(self):
        return self.objects.exclude(tasks__state=TaskState.PENDING)

    def without_started_tasks(self):
        return self.objects.exclude(tasks__state=TaskState.STARTED)

    def without_retrying_tasks(self):
        return self.objects.exclude(tasks__state=TaskState.RETRY)

    def without_failed_tasks(self):
        return self.objects.exclude(tasks__state=TaskState.FAILURE)

    def without_successful_tasks(self):
        return self.objects.exclude(tasks__state=TaskState.SUCCESS)

    def without_running_tasks(self):
        return self.objects.exclude(Q(tasks__state=TaskState.PENDING) |
                                    Q(tasks__state=TaskState.STARTED) |
                                    Q(tasks__state=TaskState.RETRY))

    def without_ready_tasks(self):
        return self.objects.exclude(Q(tasks__state=TaskState.FAILURE) |
                                    Q(tasks__state=TaskState.SUCCESS))

objects: models.Manager() class-attribute

with_failed_tasks()

Source code in tasks/models.py
127
128
def with_failed_tasks(self):
    return self.objects.filter(tasks__state=TaskState.FAILURE)

with_pending_tasks()

Source code in tasks/models.py
118
119
def with_pending_tasks(self):
    return self.objects.filter(tasks__state=TaskState.PENDING)

with_ready_tasks()

Source code in tasks/models.py
138
139
140
def with_ready_tasks(self):
    return self.objects.filter(Q(tasks__state=TaskState.FAILURE) |
                               Q(tasks__state=TaskState.SUCCESS))

with_retrying_tasks()

Source code in tasks/models.py
124
125
def with_retrying_tasks(self):
    return self.objects.filter(tasks__state=TaskState.RETRY)

with_running_tasks()

Source code in tasks/models.py
133
134
135
136
def with_running_tasks(self):
    return self.objects.filter(Q(tasks__state=TaskState.PENDING) |
                               Q(tasks__state=TaskState.STARTED) |
                               Q(tasks__state=TaskState.RETRY))

with_started_tasks()

Source code in tasks/models.py
121
122
def with_started_tasks(self):
    return self.objects.filter(tasks__state=TaskState.STARTED)

with_successful_tasks()

Source code in tasks/models.py
130
131
def with_successful_tasks(self):
    return self.objects.filter(tasks__state=TaskState.SUCCESS)

with_tasks()

Source code in tasks/models.py
115
116
def with_tasks(self):
    return self.objects.filter(tasks__state__isnull=False)

without_failed_tasks()

Source code in tasks/models.py
154
155
def without_failed_tasks(self):
    return self.objects.exclude(tasks__state=TaskState.FAILURE)

without_pending_tasks()

Source code in tasks/models.py
145
146
def without_pending_tasks(self):
    return self.objects.exclude(tasks__state=TaskState.PENDING)

without_ready_tasks()

Source code in tasks/models.py
165
166
167
def without_ready_tasks(self):
    return self.objects.exclude(Q(tasks__state=TaskState.FAILURE) |
                                Q(tasks__state=TaskState.SUCCESS))

without_retrying_tasks()

Source code in tasks/models.py
151
152
def without_retrying_tasks(self):
    return self.objects.exclude(tasks__state=TaskState.RETRY)

without_running_tasks()

Source code in tasks/models.py
160
161
162
163
def without_running_tasks(self):
    return self.objects.exclude(Q(tasks__state=TaskState.PENDING) |
                                Q(tasks__state=TaskState.STARTED) |
                                Q(tasks__state=TaskState.RETRY))

without_started_tasks()

Source code in tasks/models.py
148
149
def without_started_tasks(self):
    return self.objects.exclude(tasks__state=TaskState.STARTED)

without_successful_tasks()

Source code in tasks/models.py
157
158
def without_successful_tasks(self):
    return self.objects.exclude(tasks__state=TaskState.SUCCESS)

without_tasks()

Source code in tasks/models.py
142
143
def without_tasks(self):
    return self.objects.exclude(tasks__state__isnull=False)

TaskMetaManager

Bases: TaskMetaFilterMixin, models.Manager

Source code in tasks/models.py
174
175
176
177
178
class TaskMetaManager(TaskMetaFilterMixin, models.Manager):
    use_for_related_fields = True

    def get_queryset(self):
        return TaskMetaQuerySet(self.model, using=self._db)

get_queryset()

Source code in tasks/models.py
177
178
def get_queryset(self):
    return TaskMetaQuerySet(self.model, using=self._db)

TaskMetaQuerySet

Bases: TaskMetaFilterMixin, QuerySet

Source code in tasks/models.py
170
171
class TaskMetaQuerySet(TaskMetaFilterMixin, QuerySet):
    pass

TaskMixin

Bases: models.Model

Source code in tasks/models.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class TaskMixin(models.Model):
    tasks = CustomGenericRelation(Task)

    objects = TaskMetaManager()

    class Meta:
        abstract = True

    @property
    def has_running_task(self):
        return self.tasks.running().exists()

    @property
    def has_ready_task(self):
        return self.tasks.ready().exists()

    def apply_async(self, task_fn, *args, name=None, creator=None, **kwargs):
        if 'task_id' in kwargs:
            task_id = kwargs['task_id']
        else:
            task_id = kwargs['task_id'] = uuid()

        if name is None:
            name = task_fn.__name__

        forget_if_ready(AsyncResult(task_id))
        try:
            task = Task.objects.get(task_id=task_id)
            task.content_object = self
            task.name = name
            task.creator = creator
        except Task.DoesNotExist:
            task = Task(task_id=task_id, content_object=self, name=name)
            task.creator = creator

        task.save()
        return task_fn.apply_async(*args, **kwargs)

    def get_task_results(self):
        return map(lambda x: x.result, self.tasks.all())

    def get_task_result(self, task_id):
        return self.tasks.get(task_id=task_id).result

    def clear_task_results(self):
        for task_result in self.get_task_results():
            forget_if_ready(task_result)

    def clear_task_result(self, task_id):
        task_result = self.get_task_result(task_id)
        forget_if_ready(task_result)

objects = TaskMetaManager() class-attribute

tasks = CustomGenericRelation(Task) class-attribute

Meta

Source code in tasks/models.py
192
193
class Meta:
    abstract = True

abstract = True class-attribute

apply_async(task_fn, *args, name=None, creator=None, **kwargs)

Source code in tasks/models.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def apply_async(self, task_fn, *args, name=None, creator=None, **kwargs):
    if 'task_id' in kwargs:
        task_id = kwargs['task_id']
    else:
        task_id = kwargs['task_id'] = uuid()

    if name is None:
        name = task_fn.__name__

    forget_if_ready(AsyncResult(task_id))
    try:
        task = Task.objects.get(task_id=task_id)
        task.content_object = self
        task.name = name
        task.creator = creator
    except Task.DoesNotExist:
        task = Task(task_id=task_id, content_object=self, name=name)
        task.creator = creator

    task.save()
    return task_fn.apply_async(*args, **kwargs)

clear_task_result(task_id)

Source code in tasks/models.py
235
236
237
def clear_task_result(self, task_id):
    task_result = self.get_task_result(task_id)
    forget_if_ready(task_result)

clear_task_results()

Source code in tasks/models.py
231
232
233
def clear_task_results(self):
    for task_result in self.get_task_results():
        forget_if_ready(task_result)

get_task_result(task_id)

Source code in tasks/models.py
228
229
def get_task_result(self, task_id):
    return self.tasks.get(task_id=task_id).result

get_task_results()

Source code in tasks/models.py
225
226
def get_task_results(self):
    return map(lambda x: x.result, self.tasks.all())

has_ready_task() property

Source code in tasks/models.py
199
200
201
@property
def has_ready_task(self):
    return self.tasks.ready().exists()

has_running_task() property

Source code in tasks/models.py
195
196
197
@property
def has_running_task(self):
    return self.tasks.running().exists()

TaskQuerySet

Bases: TaskFilterMixin, QuerySet

Source code in tasks/models.py
55
56
class TaskQuerySet(TaskFilterMixin, QuerySet):
    pass

TaskState

Bases: object

Source code in tasks/models.py
15
16
17
18
19
20
21
22
23
24
class TaskState(object):
    PENDING = 'PENDING'
    STARTED = 'STARTED'
    RETRY = 'RETRY'
    FAILURE = 'FAILURE'
    SUCCESS = 'SUCCESS'

    @classmethod
    def lookup(cls, state):
        return getattr(cls, state)

FAILURE = 'FAILURE' class-attribute

PENDING = 'PENDING' class-attribute

RETRY = 'RETRY' class-attribute

STARTED = 'STARTED' class-attribute

SUCCESS = 'SUCCESS' class-attribute

lookup(state) classmethod

Source code in tasks/models.py
22
23
24
@classmethod
def lookup(cls, state):
    return getattr(cls, state)

forget_if_ready(async_result)

Source code in tasks/models.py
240
241
242
def forget_if_ready(async_result):
    if async_result and async_result.ready():
        async_result.forget()