Celery CheatSheet

0x00 前言

本文为 Cheatsheet 类型文章,用于记录我在日常编程中经常使用的 Celery 相关和命令。

不定期更新。

本文编写于 2018 年初,仅仅聚焦于 Django2.0 与 Celery 4.1.0

本文需要结合我的开源项目 YaDjangoBlog 来看 https://github.com/twocucao/YaDjangoBlog

Celery 主要用于分布式队列。

分布式队列可以用来做什么呢?

试想一下这么个场景:

  • 假如用户提交一个耗时任务,Django 一般会在 Function Based View 里进行处理这个任务(假设这个任务耗时 30s),我可以先把这个任务放在分布式队列里面,将耗时任务甩锅给 Celery 的 Worker 。这样的话,就可以极大的减少 Function Based View 处理耗时任务的时间。

当然,除此之外,Celery 还可以用于其他的分布式队列场景:

  • 批量发送邮件
  • 定时任务 CronJob
  • 分配爬虫抓取任务

当然,如果不用 Celery 的话也能搞定上面的场景,比如,借用数据库本身就可以直接做分布式任务派发。

只是这样的实现本身代码可维护性,扩展性,性能等等都不是很好罢了。

Celery 的特点如下:

  • 简单易维护
  • 高可用
  • 快:依据官网的资料,单进程可以在若干分钟内处理上百万级数据,亚毫秒级往返延迟 (using RabbitMQ, librabbitmq, and optimized settings)
  • 扩展性好:可定制池,序列器 compression schemes, 日志,调度器,consumers, producers, broker transports, and much more.

除此之外,还有什么特性呢?

  • 监控
  • Work-Flows
  • 时间 / 速度 限制
  • 日程安排 (CronJob)
  • 资源泄露保护
  • 用户组件

本文基于 RabbitMQ 做消息队列,并非 Redis 所以 Broker 默认使用 RabbitMQ

支持下面两个:

  • Monitoring : 比如 Flower
  • Remote Control: 可以通过 celery inspect 和 control 命令进行检查和控制。

0x01 安装,配置,基本 shell 命令

1. 安装

1
pip install "celery[librabbitmq]"

2. 配置

指定 Broker URL 即可

3. 插件

Flower 插件

0x02 Celery 配套工具

0x03 Celery 基础概念

  • Application
  • Tasks
  • Calling Tasks
  • Canvas: Designing Work-flows
  • Workers Guide
  • Daemonization
  • Periodic Tasks
  • Routing Tasks
  • Monitoring and Management Guide
  • Security
  • Optimizing
  • Debugging
  • Concurrency
  • Signals
  • Testing with Celery
  • Extensions and Bootsteps
  • Configuration and defaults

Application

Tasks

Calling Tasks

Canvas: Designing Work-flows

Workers Guide

Daemonization

Periodic Tasks

Routing Tasks

Monitoring and Management Guide

Security

Optimizing

Debugging

Concurrency

Signals

Testing with Celery

Extensions and Bootsteps

Configuration and defaults

0x03 RabbitMQ

1
broker_url = 'amqp://myuser:mypassword@localhost:5672/myvhost'
1
2
3
4
sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags myuser mytag
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

发布者的消息经过交换机,分发到不同的队列,最后由接收方进行处理。

那么问题来了,交换机都是用来干嘛的

  • Direct 单播路由:扔一条消息到一个队列中,依照 routingkey 投递
  • Topic 多播路由:发给某几类队列(通知).
  • Fanout 广播路由:发给全部绑定在该路由上面的队列。
  • Headers
  1. 应用解耦。(平台无关,语言无关)

比如说,但项目足够大的时候,更新一个活动,可能需要更新用户的一些状态,可能要更新一波统计数据,可能要记录一批日志。这个时候原来的代码可能这么写:

1
2
3
4
5
update_activity()
update_user()
update_user_cache()
update_stats()
record_user_log()

现在代码就可能这么写:

1
2
3
4
5
send_task_update_activity()
send_task_update_user()
send_task_update_user_cache()
send_task_update_stats()
send_task_record_user_log()
  1. 异步通信。(减轻请求峰值)

原本一个 webapp 不做异步的话,也能搞定,但做了异步之后,可以大幅度提升吞吐量和响应时间。

  1. 数据持久化。(不丢失消息)
  2. 送达保证。(ack late)

简单步骤

  1. 定义 app, 指定 broker 和 backend
  2. 定义 tasks
  3. 指定 worker
  4. 调用 Task , 调用后返回 AsyncResult 实例,
1
2
3
4
5
6
7
add.delay(2, 2)
add.apply_async((2, 2))
add.apply_async((2, 2), queue='lopri', countdown=10)
add.signature((2, 2), countdown=10)
res = add.delay(2, 2)
res.get(timeout=1)

Groups

1
2
3
4
5
6
# 并行
group(add.s(i, i) for i in xrange(10))().get()
# [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# partial group
g = group(add.s(i) for i in xrange(10))
g(10).get()

Chains

1
2
3
4
5
chain(add.s(4, 4) | mul.s(8))().get()
(add.s(4, 4) | mul.s(8))().get()
# partial chain
g = chain(add.s(4) | mul.s(8))
g(4).get()

Chords

1
2
3
chord((add.s(i, i) for i in xrange(10)), xsum.s())().get()
(group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
# eg : upload_document.s(file) | group(apply_filter.s() for filter in filters)

0x05 调试

0x06 Python SDK

官方提供了两个 SDK 方便我们进行日常的开发:

6.1 与 Python 集成

前者偏底层一些,后者偏高层一些,高底层关系的有点类似于 sqlalchemy core 和 sqlalchemy orm 之间的关系。

6.2 与 Django 集成

elasticsearch-analysis-ik 的配置

0x07 踩坑集

  • 序列问题

0xEE 参考链接


ChangeLog:

  • 2018-02-20 重修文字