Celery integration with Flask

As of Celery version 3.0 and above, Celery integration with Flask should no longer need to depend on third party extension. That's what they said. However, my experience integrating Celery with Flask especially when using Flask with blueprints shows that it can be a little bit tricky.

Challenges

So here is the basic setup for Celery taken from http://flask.pocoo.org/docs/0.10/patterns/celery/

from celery import Celery

def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

and you use it like this,

from flask import Flask

app = Flask(__name__)
app.config.update(
    CELERY_BROKER_URL='redis://localhost:6379',
    CELERY_RESULT_BACKEND='redis://localhost:6379'
)
celery = make_celery(app)

@celery.task()
def add_together(a, b):
    return a + b

Now, this works fine if your app is simple. The challenge here is when you want to fire up celery task from the view. You'll notice that you're going to have circular reference and that is not allowed in Python.

To illustrate this, assume the following code:

## app/module_one/views.py

from flask import Blueprint, render_template, current_app, jsonify

# import our worker
from ..tasks import worker

module_one = Blueprint('module_one', __name__)

@module_one.route('/run', methods=['GET'])
def run_task():
    # run job in celery
    task = worker.run_task_async()
    return jsonify(name=current_app.name, status='Task is running', taskid=task.id), 202


@module_one.route('/status/<taskid>', methods=['GET'])
def task_status(taskid):
    task = worker.check_status_async(taskid)
    return jsonify(status=task.get('status', None), task=task.get('job', None))
## app/app.py
from flask import Flask, render_template
from celery import Celery

# This is needed to register with blueprint
from .module_one import module_one

__all__ = ['create_app']


def create_app(config=None, app_name=None):
    """
    Create Flask app
    """

    app = Flask(app_name)

    configure_app(app, config)
    # More configuration here ...
    configure_blueprints(app)

    return app


def make_celery(app=None):
    # ....
    return celery
## app/tasks/worker.py

from celery import chain
from ..app import create_app, create_celery

app = create_app()
celery = create_celery(app)

@celery.task(bind=True)
def long_run_task(self, x, y):
    self.update_state(state='PROGRESS', meta={'job': 'run'})
    sleep(25)
    return x + y


@celery.task(bind=True)
def long_map_task(self, x, y):
    self.update_state(state='PROGRESS', meta={'job': 'map')
    sleep(20)
    return x + y

def run_task_async():
    task = chain(long_run_task.s(2,2), long_map_task.s(4)).apply_async()

    return task

Notice the circular dependencies app.py -> views.py -> worker.py -> app.py. No matter how I tried to refactor the application, there will always be circular reference like this due to how I need to fire up the worker task from the view.

One of the way I tried is to actually move out creation of celery instance into extensions.py where I have all my extensions like flask-sqlalchemy, flask-login, etc and have it lazily configured during app creation in app.py. This works but it is not a real solution although I am instantiating celery instance but I cannot actually make it lazily reconfigured to use the config from flask app instance. Celery will works but it is not really configured to use it with Flask.

## app/extensions.py

# Other extensions ...

from celery import Celery
celery = Celery('app', backend='rpc', broker='amqp://127.0.0.1')

If I approach it this way, I have to set all the backend, broker params here to make it work. Then later during Flask app creation, I could just import celery instance from extensions.py and have it reconfigured by doing celery.conf.update(...). Turns out, it will not pickup the changes.
Now, I think the reason was that when you're running celery worker by running celery -A app.extensions.celery ..., the celery instance picked up during running that command was all that settings in extensions.py. Even if I update the celery conf to include for example CELERY_START_TRACKED to True, you will see that it is not doing so since the celery instance tied to the worker is the one from extensions.py.

What should happen during celery -A app.extension.celery ... is that first create Flask app, pass it to celery for Flask integration, then give it to the worker. What it is happening now is totally different, create celery, pass it to the worker, create Flask app and try to reconfigure celery. So celery is not aware of the changes since that was run when Flask is running.

Solution

Now, to make it to work is really simple but it is really a hackish solution. The way I need to solve it is by trying to break the circular reference. We can do this by doing local import instead of importing package at the top level.

### app/app.py

def create_app(config=None, app_name=None, register_blueprints=True):
    """
    Create Flask app
    """
    app = Flask(app_name)
    configure_app(app, config)

    # So no circular import when using Celery
    if register_blueprints:
        # move the import into configure_blueprints
        from .module_one import module_one
        configure_blueprints(app)

    configure_extensions(app)
    configure_error_handlers(app)

    return app
## app/tasks/worker.py

# Skip register blueprint so we don't have circular import issue
app = create_app(register_blueprints=False)
celery = create_celery(app)

This breaks the circular reference. In my worker.py, I can just instantiate Flask app, and pass it to my celery instance to have it configured with Flask. Now I can continue using Flask blueprint like I used to.

There are still some questions that I have yet to figure out. One is that should I create one celery instance per module or should I just share it. The documentation said to share it, but it only work when you're using the decorator method (i.e. @task). If I use custom Task classes, it seems to break. I'm trying to figure that one out. I will share in the next post on how to create custom Task classes.