A Flask extension for RQ (Redis Queue).
This is a continuation of Flask-RQ more in spirit than in code. Many thanks to Matt Wright for the inspiration and providing the shoulders to stand on.
pip install Flask-RQ2
To quickly start using Flask-RQ2, simply create an RQ
instance:
from flask import Flask
from flask_rq2 import RQ
app = Flask(__name__)
rq = RQ(app)
Alternatively, if you're using the application factory pattern:
from flask_rq2 import RQ
rq = RQ()
and then later call init_app
where you create your application object:
from flask import Flask
def create_app():
app = Flask(__name__)
from yourapplication.jobs import rq
rq.init_app(app)
# more here..
return app
A decorator to mark a function as an RQ job and to add some helpers to the function to simplify enqueuing:
from flask_rq2 import RQ
rq = RQ()
@rq.job
def add(x, y):
return x + y
Then in your app code:
job = add.queue(1, 2)
A specific queue name can also be passed as argument:
@rq.job('low')
def add(x, y):
return x + y
Some other parameters are available as well:
@rq.job('low', timeout=180, results_ttl=60*60, ttl=60*60*24)
def add(x, y):
return x + y
You can additionally schedule jobs to run at a certain time, after a certain timespan or by a cron-like plan:
@rq.job
def add(x, y):
return x + y
# queue job in 60 seconds
add.schedule(timedelta(seconds=60), 1, 2)
# queue job at a certain datetime (UTC!)
add.schedule(datetime(2016, 12, 31, 23, 59, 59), 1, 2)
# queue job in 14 days and then repeat once 14 days later
add.schedule(timedelta(days=14), 1, 2, repeat=1)
# queue job every day at noon (UTC!)
add.cron('0 0 12 * * ?', 'add-one-two', 1, 2)
See the full API docs for more information.
An optional decorator for custom exception handlers that the RQ worker should call when catching exceptions during job execution.
from flask_rq2 import RQ
rq = RQ()
@rq.exception_handler
def send_alert_to_ops(job, *exc_info):
# call other code to send alert to OPs team
The exception handler will automatically be used when running the worker
from the get_worker
method or the CLI integration.
There are a few useful methods to fetch RQ backend objects for advanced patterns.
They will use the same Flask config values as the decorators and CLI integration and should be used instead of rq's own functions with the same name.
Returns default queue or specific queue for name given as argument:
from flask_rq2 import RQ
rq = RQ()
default_queue = rq.get_queue()
low_queue = rq.get_queue('low')
easy_job = default_queue.enqueue(add, args=(1, 2))
hard_job = low_queue.enqueue(add, args=(1e100, 2e200))
Returns a worker for default queue or specific queues for names given as arguments:
from flask_rq2 import RQ
rq = RQ()
# Creates a worker that handle jobs in ``default`` queue.
default_worker = rq.get_worker()
default_worker.work(burst=True)
# Creates a worker that handle jobs in both ``simple`` and ``low`` queues.
low_n_simple_worker = rq.get_worker('low', 'simple')
low_n_simple_worker.work(burst=True)
Returns an RQ Scheduler instance for periodically enqueuing jobs:
from flask_rq2 import RQ
rq = RQ()
# check every 10 seconds if there are any jobs to enqueue
scheduler = rq.get_scheduler(interval=10)
scheduler.run()
Flask-RQ2 supports both the (upcoming) Click based CLI feature in Flask >= 1.0 (including the backport to Flask < 1.0 in Flask-CLI) as well as Flask-Script.
For the Flask CLI to work it's recommended to install the Flask-CLI package since it contains a import shim to automatically import CLI code from Flask in case >= 1.0 is installed. That means this is the most future proof option for you.
The rest happens automatically: a new rq
subcommand will be added to the
flask
command that wraps RQ's own rq
CLI tool using the Flask
configuration values.
Please call flask rq --help
for more information, assuming
you've set the FLASK_APP
environment variable to the Flask app path.
You can install the dependencies for this using this shortcut:
pip install Flask-RQ2[cli]
Flask-Script works a bit different and requires you to manually register a command manager with the main script manager. For example:
from flask_script import Manager
from flask_rq2.script import RQManager
from app import create_app
from jobs import rq # a flask_rq2.RQ instance
app = create_app()
manager = Manager(app)
manager.add_command('rq', RQManager(rq))
That adds a rq
subcommand to your management script and wraps RQ's own
rq
CLI tools automatically using the Flask configuration values.
Please call python manage.py rq --help
for more information, assuming
your management script is called manage.py
.
You can also install the dependencies for this using this shortcut:
pip install Flask-RQ2[script]
There isn't an official overview of CLI commands in the RQ documentation, but these are the commands that Flask-RQ2 support.
worker
-- Starts an RQ worker (required to run jobs).scheduler
-- Starts an RQ Scheduler (optional for scheduled jobs).info
-- Shows an RQ command-line monitor.empty
-- Empty the given RQ queues.requeue
-- Requeues failed jobs.suspend
-- Suspends all workers.resume
-- Resumes all workers.
Please call each command with the --help
option to learn more about their
required and optional paramaters.
The URL to pass to redis-py's Redis.from_url
classmethod to create a
Redis connetion pool. Defaults to connecting to the local Redis instance,
database 0.
app.config['RQ_REDIS_URL'] = 'redis://localhost:6379/0'
The default queues that the worker and CLI commands (empty
, info
and
worker
) act on by default.
app.config['RQ_QUEUES'] = ['default']
Whether or not to run jobs asynchronously. Defaults to True
meaning
that jobs only run when they are processed by the workers.
app.config['RQ_ASYNC'] = True
Set to False
to run jobs immediatedly upon enqueuing in-process.
This may be useful for testing purposes or other constrained environments.
This is the main switch, use with discretion.
The queue to enqueue scheduled jobs in.
app.config['RQ_SCHEDULER_QUEUE'] = 'scheduled'
Defaults to 'default'
.
The default interval the RQ Scheduler checks for jobs to enqueue.
app.config['RQ_SCHEDULER_INTERVAL'] = 1
Defaults to 60
.