A Google Cloud Pubsub client for node.js geared towards queues and jobs. Inspired by ceejbot/fivebeans
Node 8+ required
yarn add @splitmedialabs/pubsub-queue
- a GCP account and project
- a Pubsub Topic for the main jobs
- a Subscription for this topic
- a Pubsub topic for the failed jobs
- a JSON keyFilename with correct IAM permissions for PubSub
import PubsubQueue from '@splitmedialabs/pubsub-queue';
const Pubsub = new PubsubQueue(
{
// connection config
projectId: 'my-gcp-project-id',
keyFilename: '~/gcp.json',
},
{
// topics and subscriptions config
topicName: 'worker-test', // name of the default topicName for the jobs
subscriptionName: 'test-sub', // name of the subscription under the topic
buriedTopicName: 'worker-test-buried', // Optional, name of the buried topics. When a job fails, it'll get published here.
}
);
// minimal job publishing. This will publish the job to the default topicName
Pubsub.Publisher.publish({
type: 'hello', // name of the handler
payload: {
hello: 'world! simple',
}, // arbitrary payload. Will be serialized to JSON
});
// all bells and whistle
Pubsub.Publisher.publish({
type: 'hello-fail', // name of the handler
payload: {
hello: 'world delayed',
}, // arbitrary payload. Will be serialized to JSON
delayed: {
// job will only be executed after this date
unit: 'seconds',
value: '10',
},
});
// custom topic
Pubsub.Publisher.publish('custom-topic-name', {
type: 'hello-fail', // name of the handler
payload: {
hello: 'world delayed',
}, // arbitrary payload. Will be serialized to JSON
// will only sttart after this date
delayed: new Date(new Date().getTime() + 10000).toISOString(),
});
// # handlers/hello.ts
export default {
async work(payload) {
console.log('job-handler', { payload });
return; // any return means success
},
};
// # handlers/hello-repeat.ts
export default {
async work(payload) {
console.log('job-handler', { payload });
return 'put'; // the job will be succesful but will be put back on the queue
},
};
// # handlers/hello-fail.ts
export default {
retries: {
count: 5, // how many times to retry this job
delay: 1000, // delay between each retries
},
async work(payload) {
console.log('job-handler', { payload });
throw new Error('Fake Error!'); // throwing will fail the job
},
};
// # index.ts
import PubsubQueue from '@splitmedialabs/pubsub-queue';
const Pubsub = new PubsubQueue(
{
// connection config
projectId: 'my-gcp-project-id',
keyFilename: '~/gcp.json',
},
{
// topics and subscriptions config
topicName: 'worker-test',
buriedTopicName: 'worker-test-buried',
subscriptionName: 'test-sub',
}
);
const handlers = {
hello: require('./handlers/hello'),
'hello-repeat': require('./handlers/hello-repeat'),
'hello-fail': require('./handlers/hello-fail'),
};
Pubsub.Worker.start(handlers);
This is useful for statistics
const handlers = {};
Pubsub.Worker.on('job.reserved', (data) => console.log(data)); // when a job is starting
Pubsub.Worker.on('job.handled', (data) => console.log(data)); // when a job is done
Pubsub.Worker.on('job.buried', (data) => console.log(data)); // when a job has failed
Pubsub.Worker.start(handlers);