概述
Resque是一个基于Redis的处理后台作业的库,通过将多个作业放在一个或多个队列上进行后台处理.本身是由Ruby编写的,并且被用各种语言实现,原项目访问resque/resque ,php版本访问chrisboulton/php-resque .
角色划分
php-resque包含如下3种角色:
- Job: 需要被执行的任务,例如发送邮件\销库存\配货发货等
- Queue: 消息队列,基于Redis实现
- Worker: 负责消费消息队列中的任务,需要以守护进程运行在后台
一个Worker可以处理一个或多个队列,并且可以通过增加Worker的进程/线程来加快处理速度.
注意事项
- Job运行时,Worker会将参数组装以数组的方式设置到对象的属性,可用
$this->args
访问 - 任何异常都会导致任务失败,所以需要特别注意下异常处理
- Job除了
perform
方法外还拥有两个方法:setUp
和tearDown
,分别在perform
前后执行.
具体实现
安装扩展
由于涉及到进程的开辟与管理,php-resque使用了php的PCNTL函数,所以只能在Linux下运行,并且需要php编译PCNTL函数,具体的运行环境为:
- PHP 5.3+
- Redis 2.2+
- 推荐使用 Composer
vagrant@homestead:~/code/php-resque-mq$ composer require chrisboulton/php-resque -vvv
创建Job
将需要处理的任务编写为Job,即一个独立的Class,并且这个Class必须实现一个perform方法,Worker执行的时候会自动运行这个方法.
<?php
require './lib.php';
class SmsJob{
private $name;
private $mobile;
/**
* 预处理
*/
public function setUp(){
$this->name = $this->args['name'];
$this->mobile = $this->args['mobile'];
stdout('新注册用户信息:');
stdout('姓名:'.$this->name);
stdout('手机号:'.$this->mobile);
}
/**
* 具体任务
*/
public function perform(){
for($i=0;$i<10;$i++){
if($i === 0){
stdout('开始执行具体任务');
}
stdout('发送短信中--第'.($i+1).'秒');
if($i === 10){
stdout('任务执行完毕');
}
sleep(1);
}
}
/**
* 后续处理
*/
public function tearDown(){
stdout('任务执行完毕');
stdout();
}
}
插入队列
- 任务入队
使用
Resque::enqueue
函数将Job加入Queue,包含四个参数:
- queue: 队列名称
- jobClass: 任务Class
- jobArgs: 任务参数
- trackStatus: 为true时返回任务token,用于后续跟踪任务状态
<?php
require './lib.php';
require './SmsJob.php';
require './vendor/autoload.php';
$name = $argv[1];
$mobile = $argv[2];
if(empty($name) || empty($mobile)){
die("参数错误");
}
$connection = getDBConnection('127.0.0.1', 'homestead', 'secret', 'mq');
$sql = "insert into mq_user(name, mobile) values ('$name', '$mobile')";
if(!mysqli_query($connection, $sql)){
die("写入用户信息失败,原因:".$connection->error);
}
$connection->close();
$token = Resque::enqueue('sms', SmsJob::class, array('name'=>$name, 'mobile'=>$mobile),true);
echo 'Job Token: '.$token.chr(10);
查询任务状态
根据Job Token来获取作业状态,状态有四个:
- Resque_Job_Status::STATUS_WAITING: 等待执行
- Resque_Job_Status::STATUS_RUNNING: 正在运行
- Resque_Job_Status::STATUS_FAILED: 执行失败
- Resque_Job_Status::STATUS_COMPLETE: 执行完毕
<?php
require_once './lib.php';
require_once './vendor/autoload.php';
$token = $argv[1];
if(empty($token)){
die("参数错误");
}
$obj = new Resque_Job_Status($token);
if(!$obj->isTracking()) {
die("该任务没有设定状态跟踪");
}
while(true) {
$status = $obj->get();
$statusLabels = ['', '等待执行', '正在运行', '执行失败', '执行完毕'];
stdout('任务['.$token.']的当前状态:'.$statusLabels[$status]);
if($status == 4){
break;
}else{
sleep(1);
}
}
启动Worker
从Queue中取出任务并处理,当队列中有Job时,实例化Job对应的Class对象并执行perform中方法.
- QUEUE: 要处理的队列,可以设定為 QUEUE=* 表示执行所有任务
- LOGLEVEL: 日志等级 0:不记录 1:记录简单日志 2:记录详细日志
- COUNT: 启动Worker的数量,默认是1
- INTERVAL: Worker检查Queue的间隔,默认是5秒
- PIDFILE: 如果是单个Woker,可以指定Pid文件
- PREFIX: 前缀
- REDIS_BACKEND: Redis的IP和端口,默认是127.0.0.1:6379
完整代码访问php-resque-mq
测试流程
- 执行主业务并将Job插入队列
php register.php 小毛 18702124521
- 查看任务状态
php status 26bf81f1943d1a940ffd68a287a7d16c
- 启动Worker
QUEUE=sms LOGLEVEL=2 php worker.php
- 测试结果
Resque Web
resque-web是用Ruby开发的Web管理界面,安装步骤:
- 安装Ruby及Ruby Dev
vagrant@homestead:~/code$ sudo apt install ruby ruby-dev -y
- 安装resque-web
vagrant@homestead:~/code$ sudo gem install resque-web -v 0.0.8
- 启动resque-web
vagrant@homestead:~/code$ sudo resque-web -p 666
[2018-09-21 10:24:31 +0000] Starting 'resque-web'...
[2018-09-21 10:24:31 +0000] trying port 666...
- 截图示例