本系列的最新代码将持续更新到: http://www.madmalls.com/blog/post/latest-code/

1. Redis Queue
RQ (Redis Queue)任务队列CelerybrokerRedisRabbitMQRedisAmazon SQS

1.1 在 Linux 上启动后端 Flask API

RQfork()rq worker

所以为了方便,我们准备将后端 Flask API 应用迁移到 Linux 中,关于在 Linux 中如何部署需要到第 20 章才讲,本章只是简单介绍在 CentOS 上如何重新运行起我们的后端应用

将后端代码拷贝到 CentOS 上,并执行:

$ cd back-end
$ python -m venv venv
$ source venv/bin/activate
(venv)$ pip install -r requirements.txt

# Flask-Migrate create database
(venv)$ flask db upgrade

# Pre deploy, eg. insert roles
(venv)$ flask deploy

# create back-end/.env file, like this
FLASK_APP=madblog.py
FLASK_DEBUG=1

(venv)$ flask run -h 0.0.0.0 -p 5000
192.168.80.1http://192.168.80.1:5000/api/ping"Pong!"
front-end/src/http.js
import Vue from 'vue'
import axios from 'axios'
import router from './router'
import store from './store'


// 基础配置
if (process.env.NODE_ENV === 'production') {
  axios.defaults.baseURL = 'http://www.madmalls.com:5000';
} else {
  axios.defaults.baseURL = 'http://192.168.80.1:5000';
}

然后重启前端应用

D:\python-code\flask-vuejs-madblog\front-end>npm run dev

如果应用能够正常访问后端 API,则进行下一节

1.2 Linux 上安装 Redis

$ yum install -y epel-release
$ yum install -y redis
$ systemctl start redis
$ systemctl enable redis
Redis
$ redis-cli ping
PONG

$ ss -tunlp | grep redis
tcp    LISTEN     0      128    127.0.0.1:6379                  *:*                   users:(("redis-server",pid=18833,fd=5))
tcp    LISTEN     0      128    192.168.80.1:6379                  *:*                   users:(("redis-server",pid=18833,fd=4))

1.3 安装 RQ

rqredis
(venv)$ pip install rq
(venv)$ pip freeze > requirements.txt

然后重新启动 Flask:

(venv)$ flask run -h 0.0.0.0 -p 5000 

1.4 RQ 连接 Redis

Redisback-end/config.py
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
REDIS_URLexport REDIS_URL=xxxback-end/.envredis://6379
back-end/app/__init__.py
from redis import Redis
import rq
...

def configure_app(app, config_class):
    app.config.from_object(config_class)
    # 不检查路由中最后是否有斜杠/
    app.url_map.strict_slashes = False
    # 整合RQ任务队列
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('madblog-tasks', connection=app.redis, default_timeout=3600)  # 设置任务队列中各任务的执行最大超时时间为 1 小时
rq.Queue()madblog-tasksrq worker madblog-tasksmadblog-tasks立即

1.5 任务进入队列(Enqueueing Jobs)

任务(job/task)
back-end/app/utils/tasks.py
import time


def test_rq(num):
    print('Starting task')
    for i in range(num):
        print(i)
        time.sleep(1)
    print('Task completed')
    return 'Done'
Flask Shell
(venv) $ flask shell
Python 3.6.4 (default, Apr 13 2019, 20:59:14) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
App: app [production]
Instance: /root/flask-vuejs-madblog/back-end/instance
>>> job = app.task_queue.enqueue('app.utils.tasks.test_rq', 60)  # 将一个任务压入任务队列中
>>> job
Job('96231bff-0680-4ebf-aa37-88372946e524', enqueued_at=datetime.datetime(2019, 4, 16, 1, 48, 54, 295339))
>>> job.get_id()  # 获取任务的ID
'96231bff-0680-4ebf-aa37-88372946e524'
>>> job.status  # 任务当前的状态,只是进入队列了,还没有被 rq worker 开始执行,因为我们还没有启动 worker
'queued'
>>> job.func_name  # 任务实际上将要执行的函数
'app.utils.tasks.test_rq'
>>> job.args  # equeue() 方法中第一个参数是要执行的函数名,后面的参数都将传递给要执行的函数中去
(60,)
>>> job.kwargs
{}
rq worker
$ cd /root/flask-vuejs-madblog/back-end
$ source venv/bin/activate
$ rq worker madblog-tasks  # 表示启动了一个 worker,并让它监视名为 madblog-tasks 的任务队列
madblog-tasksrq worker
(venv) $ rq worker madblog-tasks
10:00:01 RQ worker 'rq:worker:CentOS.33041' started, version 0.13.0
10:00:01 *** Listening on madblog-tasks...
10:00:01 Cleaning registries for queue: madblog-tasks
10:00:01 madblog-tasks: app.utils.tasks.test_rq(60) (71a3d64c-dd02-4139-bdf6-5e5e4d3f5617)
Starting task
0
1
2
3
...
58
59
Task completed
10:01:01 madblog-tasks: Job OK (71a3d64c-dd02-4139-bdf6-5e5e4d3f5617)
10:01:01 Result is kept for 500 seconds
rq workerFlask Shell
>>> job.status  # 表示任务已经在执行中了
'started'
>>> job.result  # 任务尚未结束,所以没有返回值
>>> 
>>> job.status  # 1分钟后...  表示任务已经结束了
'finished'
>>> job.result  # 返回了任务的执行结果
'Done'
job/task任务队列(假设为变量 q)
len(q)q.jobsq.job_idsq.fetch_job('abc123')q.empty()
2. 应用RQ实现后台任务
RQ

2.1 增加数据模型Task

Task一对多
back-end/app/models.py
class User(PaginatedAPIMixin, db.Model):
    ...
    # 用户的RQ后台任务
    tasks = db.relationship('Task', backref='user', lazy='dynamic')


class Task(PaginatedAPIMixin, db.Model):
    __tablename__ = 'tasks'
    # 不使用默认的整数主键,而是用 RQ 为每个任务生成的字符串ID
    id = db.Column(db.String(36), primary_key=True)
    # 任务名
    name = db.Column(db.String(128), index=True)
    # 任务描述
    description = db.Column(db.String(128))
    # 任务所属的用户
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
    # 是否已执行完成
    complete = db.Column(db.Boolean, default=False)

    def __repr__(self):
        return '<Task {}>'.format(self.id)

2.2 数据库迁移

(venv) $ flask db migrate -m "tasks"
(venv) $ flask db upgrade

2.3 后台任务函数

rq worker
back-end/app/utils/tasks.py
import sys
import time
from rq import get_current_job
from app import create_app
from app.extensions import db
from app.models import User, Message, Task
from app.utils.email import send_email
from config import Config


# RQ worker 在我们的博客Flask应用之外运行,所以需要创建自己的应用实例
app = create_app(Config)
# 后续会使用Flask-SQLAlchemy来查询数据库,所以需要推送一个上下文使应用成为 "当前" 的应用实例
app.app_context().push()


def test_rq(num):
    print('Starting task')
    for i in range(num):
        print(i)
        time.sleep(1)
    print('Task completed')
    return 'Done'


def send_messages(*args, **kwargs):
    '''群发私信'''
    try:  # 由于 rq worker 运行在单独的进程中,当它出现意外错误时,我们需要捕获异常
        # 发送方
        sender = User.