-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathjob.py
83 lines (62 loc) · 2.89 KB
/
job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import collections
from gearman.constants import PRIORITY_NONE, JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
class GearmanJob(object):
"""Represents the basics of a job... used in GearmanClient / GearmanWorker to represent job states"""
def __init__(self, connection, handle, task, unique, data):
self.connection = connection
self.handle = handle
self.task = task
self.unique = unique
self.data = data
def to_dict(self):
return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data)
def __repr__(self):
return '<GearmanJob connection/handle=(%r, %r), task=%s, unique=%s, data=%r>' % (self.connection, self.handle, self.task, self.unique, self.data)
class GearmanJobRequest(object):
"""Represents a job request... used in GearmanClient to represent job states"""
def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, max_attempts=1):
self.gearman_job = gearman_job
self.priority = initial_priority
self.background = background
self.connection_attempts = 0
self.max_connection_attempts = max_attempts
self.initialize_request()
def initialize_request(self):
# Holds WORK_COMPLETE responses
self.result = None
# Holds WORK_EXCEPTION responses
self.exception = None
# Queues to hold WORK_WARNING, WORK_DATA responses
self.warning_updates = collections.deque()
self.data_updates = collections.deque()
# Holds WORK_STATUS / STATUS_REQ responses
self.status = {}
self.state = JOB_UNKNOWN
self.timed_out = False
def reset(self):
self.initialize_request()
self.connection = None
self.handle = None
@property
def status_updates(self):
"""Deprecated since 2.0.1, removing in next major release"""
output_queue = collections.deque()
if self.status:
output_queue.append((self.status.get('numerator', 0), self.status.get('denominator', 0)))
return output_queue
@property
def server_status(self):
"""Deprecated since 2.0.1, removing in next major release"""
return self.status
@property
def job(self):
return self.gearman_job
@property
def complete(self):
background_complete = bool(self.background and self.state in (JOB_CREATED))
foreground_complete = bool(not self.background and self.state in (JOB_FAILED, JOB_COMPLETE))
actually_complete = background_complete or foreground_complete
return actually_complete
def __repr__(self):
formatted_representation = '<GearmanJobRequest task=%r, unique=%r, priority=%r, background=%r, state=%r, timed_out=%r>'
return formatted_representation % (self.job.task, self.job.unique, self.priority, self.background, self.state, self.timed_out)