并发处理之异步任务

我在实习过程中,最常见的一个场景就是高并发,那么如何处理高并发问题呢?

就拿我前几天写的需求来说吧,我需要给一些特定的用户推送特定的客服消息。推送消息这件事看起来简单,但是这里面还包含了很多额外的动作需要做,例如:

用户点击广告,未支付成功的时候:

  • 把这个用户的 open id 塞到 redis 里
  • 推送消息时候,我们需要去拿连接微信的 access token,调用微信的接口或者第三方 SDK 给他推消息
  • 定时推送消息等等…

同步与异步

用户点击广告之后,客户端会传来请求,这时候如果我们同步的去完成上面的动作:进程在收到消息之后,马上执行上面的动作。这样的处理方式会导致服务器响应时间很高,而且有多个请求时,势必会造成阻塞。

事实上,客户端传来这个请求只是为了告诉服务器:『你要给这个 open id 推客服消息』,至于服务器怎么给这个人推消息,什么时候推,和响应结果并不相关,客户端只确认服务器收到这个 open id 就够了。

那么,我们是不是可以异步的去做:

  • 客户端传来请求
  • 服务端把传来的 open id 塞到 redis 里,返回响应
  • 另起一个线程或者进程去取这些 open id,完成给用户推消息的事情

对于这些允许 delay 的事情,叫做任务(Task),我们可以把他们写成一个函数,前端发来请求之后,新起另一个线程或者进程去执行这个任务。

但这样还是存在一定的问题:

  • 如果并发很高,我们需要起成千上万个线程(进程)去执行任务吗?
  • 如果场景改成,项目 A 存下客户端传来的 open id,由项目 B 发送消息,又该如何处理?

消息队列

我们可以使用消息队列中间件来控制 Task 的消费:

拿我们熟悉的生产者-消费者模型来说:

  • 客户端发来 HTTP 请求之后,把这些耗时的 Task 作为一个消息放入消息队列里(生产);
  • 之后有专门来执行这些 Task 的进程进行消费

这样做的好处是:

  • 异步处理
  • 解耦:
    • 消息的发送者和消费者彼此之间相互独立,不存在依赖关系
  • 广播:
    • 发送方的消息被广播,可通知多个下游系统来消费
  • 流量削峰与流控:
    • 当上下游系统处理能力存在差距的时候,利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候,再进行分发

当然,使用消息队列也有成本:

  • 需要额外部署消息队列
  • 需要对消息进行监控

Celery 实现异步任务

在使用 Python 的实际生产中,我们一般使用 Celery + RabbitMQ 实现异步任务

Celery 是一个专注于实时处理消息的任务队列,一般用于处理异步任务和定时任务。它有四个主要的模块:

  • Task(任务模块):
    • 包含异步任务和定时任务
    • 异步任务通常在业务逻辑中被触发并发往任务队列
    • 定时任务由 Celery Beat 进程周期性地将任务发往任务队列
  • Broker(消息中间件):
    • 接受任务生产者发来的消息,将任务存入队列
    • celery 自身不提供队列服务,需要搭配 RabbitMQ 等
  • Worker(任务执行单元):
    • 监控队列
    • 执行队列中的任务
  • Backend(任务结果存储):
    • 存储任务执行的日志

使用 Celery 实现异步任务的步骤也很简单:

  • 创建一个 Celery 实例
  • 启动 Worker
  • 应用程序创建一个 Task 对象,调用其 delay() 方法, 将任务发送到消息中间件 Broker
  • Worker 监控到该任务后,就会由别的进程执行这个 Task

异步任务这块算是说完了… 或许以后会再补充
如有错误,欢迎指正~