1. Redis Queue本系列的最新代码将持续更新到: http://www.madmalls.com/blog/post/latest-code/
RQ (Redis Queue)任务队列CelerybrokerRedisRabbitMQRedisAmazon SQS
1.1 在 Linux 上启动后端 Flask API
RQfork()rq worker
所以为了方便,我们准备将后端 Flask API 应用迁移到 Linux 中,关于在 Linux 中如何部署需要到第 20 章才讲,本章只是简单介绍在 CentOS 上如何重新运行起我们的后端应用
将后端代码拷贝到 CentOS 上,并执行:
$ cd back-end $ python -m venv venv $ source venv/bin/activate (venv)$ pip install -r requirements.txt # Flask-Migrate create database (venv)$ flask db upgrade # Pre deploy, eg. insert roles (venv)$ flask deploy # create back-end/.env file, like this FLASK_APP=madblog.py FLASK_DEBUG=1 (venv)$ flask run -h 0.0.0.0 -p 5000
192.168.80.1http://192.168.80.1:5000/api/ping"Pong!"
front-end/src/http.js
import Vue from 'vue'
import axios from 'axios'
import router from './router'
import store from './store'
// 基础配置
if (process.env.NODE_ENV === 'production') {
axios.defaults.baseURL = 'http://www.madmalls.com:5000';
} else {
axios.defaults.baseURL = 'http://192.168.80.1:5000';
}
然后重启前端应用
D:\python-code\flask-vuejs-madblog\front-end>npm run dev
如果应用能够正常访问后端 API,则进行下一节
1.2 Linux 上安装 Redis
$ yum install -y epel-release $ yum install -y redis $ systemctl start redis $ systemctl enable redis
Redis
$ redis-cli ping
PONG
$ ss -tunlp | grep redis
tcp LISTEN 0 128 127.0.0.1:6379 *:* users:(("redis-server",pid=18833,fd=5))
tcp LISTEN 0 128 192.168.80.1:6379 *:* users:(("redis-server",pid=18833,fd=4))
1.3 安装 RQ
rqredis
(venv)$ pip install rq (venv)$ pip freeze > requirements.txt
然后重新启动 Flask:
(venv)$ flask run -h 0.0.0.0 -p 5000
1.4 RQ 连接 Redis
Redisback-end/config.py
REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
REDIS_URLexport REDIS_URL=xxxback-end/.envredis://6379
back-end/app/__init__.py
from redis import Redis
import rq
...
def configure_app(app, config_class):
app.config.from_object(config_class)
# 不检查路由中最后是否有斜杠/
app.url_map.strict_slashes = False
# 整合RQ任务队列
app.redis = Redis.from_url(app.config['REDIS_URL'])
app.task_queue = rq.Queue('madblog-tasks', connection=app.redis, default_timeout=3600) # 设置任务队列中各任务的执行最大超时时间为 1 小时
rq.Queue()madblog-tasksrq worker madblog-tasksmadblog-tasks立即
1.5 任务进入队列(Enqueueing Jobs)
任务(job/task)
back-end/app/utils/tasks.py
import time
def test_rq(num):
print('Starting task')
for i in range(num):
print(i)
time.sleep(1)
print('Task completed')
return 'Done'
Flask Shell
(venv) $ flask shell
Python 3.6.4 (default, Apr 13 2019, 20:59:14)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux
App: app [production]
Instance: /root/flask-vuejs-madblog/back-end/instance
>>> job = app.task_queue.enqueue('app.utils.tasks.test_rq', 60) # 将一个任务压入任务队列中
>>> job
Job('96231bff-0680-4ebf-aa37-88372946e524', enqueued_at=datetime.datetime(2019, 4, 16, 1, 48, 54, 295339))
>>> job.get_id() # 获取任务的ID
'96231bff-0680-4ebf-aa37-88372946e524'
>>> job.status # 任务当前的状态,只是进入队列了,还没有被 rq worker 开始执行,因为我们还没有启动 worker
'queued'
>>> job.func_name # 任务实际上将要执行的函数
'app.utils.tasks.test_rq'
>>> job.args # equeue() 方法中第一个参数是要执行的函数名,后面的参数都将传递给要执行的函数中去
(60,)
>>> job.kwargs
{}
rq worker
$ cd /root/flask-vuejs-madblog/back-end $ source venv/bin/activate $ rq worker madblog-tasks # 表示启动了一个 worker,并让它监视名为 madblog-tasks 的任务队列
madblog-tasksrq worker
(venv) $ rq worker madblog-tasks 10:00:01 RQ worker 'rq:worker:CentOS.33041' started, version 0.13.0 10:00:01 *** Listening on madblog-tasks... 10:00:01 Cleaning registries for queue: madblog-tasks 10:00:01 madblog-tasks: app.utils.tasks.test_rq(60) (71a3d64c-dd02-4139-bdf6-5e5e4d3f5617) Starting task 0 1 2 3 ... 58 59 Task completed 10:01:01 madblog-tasks: Job OK (71a3d64c-dd02-4139-bdf6-5e5e4d3f5617) 10:01:01 Result is kept for 500 seconds
rq workerFlask Shell
>>> job.status # 表示任务已经在执行中了 'started' >>> job.result # 任务尚未结束,所以没有返回值 >>> >>> job.status # 1分钟后... 表示任务已经结束了 'finished' >>> job.result # 返回了任务的执行结果 'Done'
job/task任务队列(假设为变量 q)
len(q)q.jobsq.job_idsq.fetch_job('abc123')q.empty()
2. 应用RQ实现后台任务
RQ
2.1 增加数据模型Task
Task一对多
back-end/app/models.py
class User(PaginatedAPIMixin, db.Model):
...
# 用户的RQ后台任务
tasks = db.relationship('Task', backref='user', lazy='dynamic')
class Task(PaginatedAPIMixin, db.Model):
__tablename__ = 'tasks'
# 不使用默认的整数主键,而是用 RQ 为每个任务生成的字符串ID
id = db.Column(db.String(36), primary_key=True)
# 任务名
name = db.Column(db.String(128), index=True)
# 任务描述
description = db.Column(db.String(128))
# 任务所属的用户
user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
# 是否已执行完成
complete = db.Column(db.Boolean, default=False)
def __repr__(self):
return '<Task {}>'.format(self.id)
2.2 数据库迁移
(venv) $ flask db migrate -m "tasks" (venv) $ flask db upgrade
2.3 后台任务函数
rq worker
back-end/app/utils/tasks.py
import sys
import time
from rq import get_current_job
from app import create_app
from app.extensions import db
from app.models import User, Message, Task
from app.utils.email import send_email
from config import Config
# RQ worker 在我们的博客Flask应用之外运行,所以需要创建自己的应用实例
app = create_app(Config)
# 后续会使用Flask-SQLAlchemy来查询数据库,所以需要推送一个上下文使应用成为 "当前" 的应用实例
app.app_context().push()
def test_rq(num):
print('Starting task')
for i in range(num):
print(i)
time.sleep(1)
print('Task completed')
return 'Done'
def send_messages(*args, **kwargs):
'''群发私信'''
try: # 由于 rq worker 运行在单独的进程中,当它出现意外错误时,我们需要捕获异常
# 发送方
sender = User.