forked from wayhood/yii2-queue
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Queue.php
143 lines (125 loc) · 3.3 KB
/
Queue.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
<?php
namespace wh\queue;
use yii\base\Component;
use Jeremeamia\SuperClosure\SerializableClosure;
use yii\helpers\StringHelper;
abstract class Queue extends Component
{
/**
* @var string 队名前缀
*/
public $queuePrefix;
/**
* @param $key
* @return string
*/
public function buildPrefix($name)
{
if (is_string($name)) {
$name = ctype_alnum($name) && StringHelper::byteLength($name) <= 32 ? $name : md5($name);
} else {
$name = md5(json_encode($name));
}
return $this->queuePrefix . $name;
}
/**
* 入队列
* @param $job 执行任务的类或回调
* @param string $data 数据
* @param null $queue 队列名
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushInternal($this->createPayload($job, $data), $queue);
}
/**
* 出队列
* @param null $queue
* @return mixed
*/
public function pop($queue = null)
{
return $this->popInternal($queue);
}
/**
* 创建消息体
* @param $job
* @param string $data
* @param null $queue
* @return string
*/
protected function createPayload($job, $data = '', $queue = null)
{
/*if ($job instanceof Closure)
{
return json_encode($this->createClosurePayload($job, $data));
}*/
$payload = [
'job' => $job,
'data' => $data
];
$payload = $this->setMeta(json_encode($payload), 'id', $this->getRandomId());
return $payload;
}
/**
* Create a payload string for the given Closure job.
*
* @param \Closure $job
* @param mixed $data
* @return string
*/
/*protected function createClosurePayload($job, $data)
{
$closure = $this->crypt->encrypt(serialize(new SerializableClosure($job)));
return array('job' => 'IlluminateQueueClosure', 'data' => compact('closure'));
}*/
/**
* Set additional meta on a payload string.
*
* @param string $payload
* @param string $key
* @param string $value
* @return string
*/
protected function setMeta($payload, $key, $value)
{
$payload = json_decode($payload, true);
$payload[$key] = $value;
return json_encode($payload);
}
/**
* Get a random ID string.
*
* @return string
*/
protected function getRandomId()
{
$pool = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
return substr(str_shuffle(str_repeat($pool, 5)), 0, 32);
}
/**
* 获取队列名
* @param $queue
* @return string
*/
protected function getQueue($queue)
{
return $this->buildPrefix($queue) . $this->getQueueInternal($queue);
}
/**
* 入队列内部实现
* @param $payload
* @param null $queue
* @param array $options
* @return mixed
*/
abstract protected function pushInternal($payload, $queue = null, array $options = []);
/**
* 获得队列名内部实现
* @param $queue
* @return mixed
*/
abstract protected function getQueueInternal($queue = null);
abstract protected function popInternal($queue = null);
}