网站首页Python相关

django-celery使用小结

发布时间:2017-02-06编辑:fc不将就阅读(1419

    软件版本重要

        Django==1.9.6

        django-celery==3.1.17

        celery==3.1.25

    注意:celery4暂时与django-celery3.1.17不兼容

    Demo:


    新建一个django项目为celerytest,新建app'app1',目录结构如下:

    celerytest/
    ├── app1
    │   ├── __init__.py
    │   ├── tasks.py
    ├── celerytest
    │   ├── __init__.py
    │   ├── settings.py
    │   ├── urls.py
    │   ├── wsgi.py
    ├── manage.py
    └── templates

    修改celerytest/settings文件:

    #添加app1和celery到app列表中
    INSTALLED_APPS = [
        ……
        'djcelery',
        'app1',
    ]
    #我是用redis作为中间对象的,添加如下配置
    #注意redis需要绑定到0.0.0.0或者网卡地址,关闭保护模式(不需要密码)
    #如redis有密码参考官方文档进行配置
    import djcelery
    djcelery.setup_loader()
    BROKER_URL = 'redis://xxx.xxx.xxx.xxx:6379/0'
    CELERY_RESULT_BACKEND = 'redis://xxx.xxx.xxx.xxx:6379/0'

    新建文件celerytest/celery.py

    from __future__ import absolute_import
    import os
    from celery import Celery
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celerytest.settings')
    from django.conf import settings  # noqa
    app = Celery('celerytest')
    # Using a string here means the worker will not have to
    # pickle the object when using Windows.
    app.config_from_object('django.conf:settings')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    @app.task(bind=True)
    def debug_task(self):
        print('Request: {0!r}'.format(self.request))

    修改文件celerytest/__init__.py:

    from __future__ import absolute_import
    
    # This will make sure the app is always imported when
    # Django starts so that shared_task will use this app.
    from .celery import app as celery_app  # noqa

    修改文件app/task.py:

    from __future__ import absolute_import
    from celery import shared_task
    
    @shared_task
    def add(x, y):
        return x + y
    
    @shared_task
    def mul(x, y):
        return x * y
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)

    启动服务,在目录celerytest下:

    #生成数据库文件
    python manage.py migrate
    #启动
    python manage.py runserver
    #启动celery
    python manage.py  celery worker -c 4

    验证

    在命令行中启动django-shell

    python manage.py shell
    
    In [1]: from app1.tasks import add
    
    In [2]: r = add.delay(2,3)
    
    In [3]: r.ready()
    Out[3]: True
    
    In [4]: r.get()
    Out[4]: 5

    如果登陆到redis服务器上可以看到多了一些键。

    如果在views中使用也可以按照上述所示方法。