A different Beanstalkd client library.
Library is composed of two layers, Protocol
and Client
. Protocol
is one-to-one mapping of Beanstalkd commands, and Client
is higher level API which decomposes functionality into multiple interfaces.
Job dispatcher/runner functionality is also provided, as well as support for signal handling.
Interface for the protocol is Zlikavac32\BeanstalkdLib\Protocol
and default implementation is provided as Zlikavac32\BeanstalkdLib\Protocol\ProtocolOverSocket
.
Methods represent Beanstalkd commands as they are defined in the protocol. That means that there are no default values in the methods. Every distinct Beanstalkd error has it's own exception to make error recovery easier.
Client is higher level API for Beanstalkd, that consists of three main interfaces:
Zlikavac32\BeanstalkdLib\Client
- interface towards generic server commandsZlikavac32\BeanstalkdLib\TubeHandle
- interface towards tube related commandsZlikavac32\BeanstalkdLib\JobHandle
- interface towards job related commands
To express higher level API, interface methods are no longer directly mapped to Beanstalkd commands.
Status responses have their own classes which expose readable interface to those values, and values themselves are strictly typed.
Default implementations are backend by Protocol
and are provided in:
Zlikavac32\BeanstalkdLib\Client\ProtocolClient
Zlikavac32\BeanstalkdLib\Client\ProtocolTubeHandle
Zlikavac32\BeanstalkdLib\Client\ProtocolJobHandle
Library itself is agnostic to any framework. Adapters are currently provided only for Symfony. Where possible, library is also agnostic to other libraries and extensions. For example, yaml parsing, socket access and json serialization are defined through interfaces required by this library, and adapters are provided for extensions and libraries.
Zlikavac32\BeanstalkdLib\SignalHandlerInstaller
installs injected Zlikavac32\BeanstalkdLib\InterruptHandler
to handle SIGINT
, SIGTERM
and SIGQUIT
.
Existing handling strategies are:
Zlikavac32\BeanstalkdLib\InterruptHandler\DoNothingInterruptHandler
- just ignores signalZlikavac32\BeanstalkdLib\InterruptHandler\GracefulExitInterruptHandler
- mark that graceful exit is in progressZlikavac32\BeanstalkdLib\InterruptHandler\HardInterruptHandler
- on second signal throws interrupt exceptionZlikavac32\BeanstalkdLib\InterruptHandler\TimeoutHardInterruptHandler
- schedules interrupt exception to be thrown in N seconds
Zlikavac32\BeanstalkdLib\GracefulExit
can be used to inspect whether graceful exit is in progress, and if so, no more processing should be done. By default, graceful exit inspection is provided through Zlikavac32\BeanstalkdLib\InterruptHandler\GracefulExitInterruptHandler
.
Job dispatcher and protocol inspect value to break from waiting.
Since the idea is to keep classes thin, more functionality is provided by decorating original service.
For example, sockets can be made lazy with Zlikavac32\BeanstalkdLib\Socket\LazySocket
and have exclusive access (due to the signals) with Zlikavac32\BeanstalkdLib\Socket\ExclusiveAccessSocket
.
Zlikavac32\BeanstalkdLib\Runner\AutoTouchRunner
is runner decorator that will asynchronously touch job if his time to run is running out.
Default values are defined on tube configuration, which client uses when no other value is provided. That means that different tubes can have different default values, like time to run, or serializer used.
Recommended installation is through Composer.
composer require zlikavac32/beanstalkd-lib
Next, we'll explore configuration and usage.
Provided protocol implementation (Zlikavac32\BeanstalkdLib\Protocol\ProtocolOverSocket
) uses Zlikavac32\BeanstalkdLib\Socket
to establish connection with the server. Provided adapter is for sockets
PHP extension.
$socket = new NativePHPSocket(60000000);
Original socket can be decorated so that it's, for example, lazily loadad.
$socket = new LazySocket($socket);
Provided protocol implementation also requires Zlikavac32\BeanstalkdLib\YamlParser
for protocol parsing. Adapter is provided Symfony YAML component.
$yamlParser = new SymfonyYamlParser();
One last thing that protocol requires is graceful exit object (instance of Zlikavac32\BeanstalkdLib\GracefulExit
) to determine whether we should break from waiting.
$gracefulExit = new GracefulExitInterruptHandler();
Now we can create our protocol.
$protocol = new ProtocolOverSocket(
$socket->open('127.0.0.1', 11300),
$gracefulExit,
$yamlParser
);
Protocol on it's own does not bring any great benefit to our code, and that's where Zlikavac32\BeanstalkdLib\Client
comes into play.
Since client abstracts tube access, we can reduce number of use tube commands by decorating protocol to be state aware.
$protocol = new StateAwareProtocol($protocol);
Next we need tube configuration for each tube that we want the system to know about. Every tube configuration requires serializer implemented as Zlikavac32\BeanstalkdLib\Serializer
. For domain objects custom serializers can be implemented, or a generic serializer like Zlikavac32\BeanstalkdLib\Adapter\PHP\Json\NativePHPJsonSerializer
can be used.
$jsonSerializer = new NativePHPJsonSerializer(true);
$fooTubeConfiguration = new StaticTubeConfiguration(0, 1024, 300, 3600, $jsonSerializer)
Now we can create our client. Client requires protocol tube purger instance, and a map of known tube configurations where the key is a tube name and the value is a tube configuration.
$protocolTubePurger = new IterativeProtocolTubePurger();
$client = new ProtocolClient($protocol, $protocolTubePurger, new Map([
'foo' => $fooTubeConfiguration,
]));
To put a job into the queue, we retrieve correct tube and put a job in it.
$fooTube = $client->tube('foo');
$fooTube->put([1, 2, 3, 4]);
To reserve a job, we call reserve()
on the client.
$reservedJob = $client->reserve();
Reserved job can then be manipulated. Configured tube serializer is used to deserialize payload.
$jobId = $reservedJob->id();
$deserializedPayload = $reservedJob->payload();
$reservedJob->delete();
To make things easier, job dispatcher is provided in Zlikavac32\BeanstalkdLib\JobDispatcher\TubeMapJobDispatcher
. Complete configuration, with signal handling, can be found in examples/worker_producer
You can see more examples with code comments in examples.