-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathclient_handler.py
168 lines (127 loc) · 6.98 KB
/
client_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import collections
import time
import logging
from gearman.command_handler import GearmanCommandHandler
from gearman.constants import JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import InvalidClientState
from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority
gearman_logger = logging.getLogger(__name__)
class GearmanClientCommandHandler(GearmanCommandHandler):
"""Maintains the state of this connection on behalf of a GearmanClient"""
def __init__(self, connection_manager=None):
super(GearmanClientCommandHandler, self).__init__(connection_manager=connection_manager)
# When we first submit jobs, we don't have a handle assigned yet... these handles will be returned in the order of submission
self.requests_awaiting_handles = collections.deque()
self.handle_to_request_map = dict()
##################################################################
##### Public interface methods to be called by GearmanClient #####
##################################################################
def send_job_request(self, current_request):
"""Register a newly created job request"""
self._assert_request_state(current_request, JOB_UNKNOWN)
gearman_job = current_request.job
# Handle the I/O for requesting a job - determine which COMMAND we need to send
cmd_type = submit_cmd_for_background_priority(current_request.background, current_request.priority)
outbound_data = self.encode_data(gearman_job.data)
self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)
# Once this command is sent, our request needs to wait for a handle
current_request.state = JOB_PENDING
self.requests_awaiting_handles.append(current_request)
def send_get_status_of_job(self, current_request):
"""Forward the status of a job"""
self._register_request(current_request)
self.send_command(GEARMAN_COMMAND_GET_STATUS, job_handle=current_request.job.handle)
def on_io_error(self):
for pending_request in self.requests_awaiting_handles:
pending_request.state = JOB_UNKNOWN
for inflight_request in self.handle_to_request_map.values():
inflight_request.state = JOB_UNKNOWN
def _register_request(self, current_request):
self.handle_to_request_map[current_request.job.handle] = current_request
def _unregister_request(self, current_request):
# De-allocate this request for all jobs
return self.handle_to_request_map.pop(current_request.job.handle, None)
##################################################################
## Gearman command callbacks with kwargs defined by protocol.py ##
##################################################################
def _assert_request_state(self, current_request, expected_state):
if current_request.state != expected_state:
raise InvalidClientState('Expected handle (%s) to be in state %r, got %r' % (current_request.job.handle, expected_state, current_request.state))
def recv_job_created(self, job_handle):
if not self.requests_awaiting_handles:
raise InvalidClientState('Received a job_handle with no pending requests')
# If our client got a JOB_CREATED, our request now has a server handle
current_request = self.requests_awaiting_handles.popleft()
self._assert_request_state(current_request, JOB_PENDING)
# Update the state of this request
current_request.job.handle = job_handle
current_request.state = JOB_CREATED
self._register_request(current_request)
return True
def recv_work_data(self, job_handle, data):
# Queue a WORK_DATA update
current_request = self.handle_to_request_map[job_handle]
self._assert_request_state(current_request, JOB_CREATED)
current_request.data_updates.append(self.decode_data(data))
return True
def recv_work_warning(self, job_handle, data):
# Queue a WORK_WARNING update
current_request = self.handle_to_request_map[job_handle]
self._assert_request_state(current_request, JOB_CREATED)
current_request.warning_updates.append(self.decode_data(data))
return True
def recv_work_status(self, job_handle, numerator, denominator):
# Queue a WORK_STATUS update
current_request = self.handle_to_request_map[job_handle]
self._assert_request_state(current_request, JOB_CREATED)
# The protocol spec is ambiguous as to what type the numerator and denominator is...
# But according to Eskil, gearmand interprets these as integers
current_request.status = {
'handle': job_handle,
'known': True,
'running': True,
'numerator': int(numerator),
'denominator': int(denominator),
'time_received': time.time()
}
return True
def recv_work_complete(self, job_handle, data):
# Update the state of our request and store our returned result
current_request = self.handle_to_request_map[job_handle]
self._assert_request_state(current_request, JOB_CREATED)
current_request.result = self.decode_data(data)
current_request.state = JOB_COMPLETE
self._unregister_request(current_request)
return True
def recv_work_fail(self, job_handle):
# Update the state of our request and mark this job as failed
current_request = self.handle_to_request_map[job_handle]
self._assert_request_state(current_request, JOB_CREATED)
current_request.state = JOB_FAILED
self._unregister_request(current_request)
return True
def recv_work_exception(self, job_handle, data):
# Using GEARMAND_COMMAND_WORK_EXCEPTION is not recommended at time of this writing [2010-02-24]
# http://groups.google.com/group/gearman/browse_thread/thread/5c91acc31bd10688/529e586405ed37fe
#
current_request = self.handle_to_request_map[job_handle]
self._assert_request_state(current_request, JOB_CREATED)
current_request.exception = self.decode_data(data)
return True
def recv_status_res(self, job_handle, known, running, numerator, denominator):
# If we received a STATUS_RES update about this request, update our known status
current_request = self.handle_to_request_map[job_handle]
job_known = bool(known == '1')
# Make our status response Python friendly
current_request.status = {
'handle': job_handle,
'known': job_known,
'running': bool(running == '1'),
'numerator': int(numerator),
'denominator': int(denominator),
'time_received': time.time()
}
# If the server doesn't know about this request, we no longer need to track it
if not job_known:
self._unregister_request(current_request)
return True