As of Celery version 3.0 and above, Celery integration with Flask should no longer need to depend on third party extension. That's what they said. However, my experience integrating Celery with Flask especially when using Flask with blueprints shows that it can be a little bit tricky.
Challenges
So here is the basic setup for Celery taken from http://flask.pocoo.org/docs/0.10/patterns/celery/
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
and you use it like this,
from flask import Flask
app = Flask(__name__)
app.config.update(
CELERY_BROKER_URL='redis://localhost:6379',
CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)
@celery.task()
def add_together(a, b):
return a + b
Now, this works fine if your app is simple. The challenge here is when you want to fire up celery task from the view. You'll notice that you're going to have circular reference and that is not allowed in Python.
To illustrate this, assume the following code:
## app/module_one/views.py
from flask import Blueprint, render_template, current_app, jsonify
# import our worker
from ..tasks import worker
module_one = Blueprint('module_one', __name__)
@module_one.route('/run', methods=['GET'])
def run_task():
# run job in celery
task = worker.run_task_async()
return jsonify(name=current_app.name, status='Task is running', taskid=task.id), 202
@module_one.route('/status/<taskid>', methods=['GET'])
def task_status(taskid):
task = worker.check_status_async(taskid)
return jsonify(status=task.get('status', None), task=task.get('job', None))
## app/app.py
from flask import Flask, render_template
from celery import Celery
# This is needed to register with blueprint
from .module_one import module_one
__all__ = ['create_app']
def create_app(config=None, app_name=None):
"""
Create Flask app
"""
app = Flask(app_name)
configure_app(app, config)
# More configuration here ...
configure_blueprints(app)
return app
def make_celery(app=None):
# ....
return celery
## app/tasks/worker.py
from celery import chain
from ..app import create_app, create_celery
app = create_app()
celery = create_celery(app)
@celery.task(bind=True)
def long_run_task(self, x, y):
self.update_state(state='PROGRESS', meta={'job': 'run'})
sleep(25)
return x + y
@celery.task(bind=True)
def long_map_task(self, x, y):
self.update_state(state='PROGRESS', meta={'job': 'map')
sleep(20)
return x + y
def run_task_async():
task = chain(long_run_task.s(2,2), long_map_task.s(4)).apply_async()
return task
Notice the circular dependencies app.py -> views.py -> worker.py -> app.py
. No matter how I tried to refactor the application, there will always be circular reference like this due to how I need to fire up the worker task from the view.
One of the way I tried is to actually move out creation of celery
instance into extensions.py
where I have all my extensions like flask-sqlalchemy
, flask-login
, etc and have it lazily configured during app creation in app.py
. This works but it is not a real solution although I am instantiating celery
instance but I cannot actually make it lazily reconfigured to use the config from flask app
instance. Celery will works but it is not really configured to use it with Flask.
## app/extensions.py
# Other extensions ...
from celery import Celery
celery = Celery('app', backend='rpc', broker='amqp://127.0.0.1')
If I approach it this way, I have to set all the backend
, broker
params here to make it work. Then later during Flask app
creation, I could just import celery
instance from extensions.py
and have it reconfigured by doing celery.conf.update(...)
. Turns out, it will not pickup the changes.
Now, I think the reason was that when you're running celery
worker by running celery -A app.extensions.celery ...
, the celery
instance picked up during running that command was all that settings in extensions.py
. Even if I update the celery conf
to include for example CELERY_START_TRACKED
to True
, you will see that it is not doing so since the celery
instance tied to the worker is the one from extensions.py
.
What should happen during celery -A app.extension.celery ...
is that first create Flask app
, pass it to celery
for Flask integration, then give it to the worker. What it is happening now is totally different, create celery
, pass it to the worker, create Flask app
and try to reconfigure celery
. So celery
is not aware of the changes since that was run when Flask is running.
Solution
Now, to make it to work is really simple but it is really a hackish solution. The way I need to solve it is by trying to break the circular reference. We can do this by doing local import instead of importing package at the top level.
### app/app.py
def create_app(config=None, app_name=None, register_blueprints=True):
"""
Create Flask app
"""
app = Flask(app_name)
configure_app(app, config)
# So no circular import when using Celery
if register_blueprints:
# move the import into configure_blueprints
from .module_one import module_one
configure_blueprints(app)
configure_extensions(app)
configure_error_handlers(app)
return app
## app/tasks/worker.py
# Skip register blueprint so we don't have circular import issue
app = create_app(register_blueprints=False)
celery = create_celery(app)
This breaks the circular reference. In my worker.py
, I can just instantiate Flask app
, and pass it to my celery
instance to have it configured with Flask. Now I can continue using Flask blueprint like I used to.
There are still some questions that I have yet to figure out. One is that should I create one celery
instance per module or should I just share it. The documentation said to share it, but it only work when you're using the decorator method (i.e. @task
). If I use custom Task
classes, it seems to break. I'm trying to figure that one out. I will share in the next post on how to create custom Task
classes.