概述
业务实现过程中,即便没有高并发与大流量,业务的解耦与异步化也是需要考虑实现的,此时MQ就显得很重要,中小型业务开发中,RabbitMQ就显得过重,这种业务下需要的就是一个轻量级的MQ,此时用Redis就刚刚好.
流程
基于Redis的轻量级的MQ用到了Redis的三个特性:
- List存储类型
- BRLPOP或者BLPOP操作List
- 发布/订阅模式
大体流程是:主业务完成操作后写入消息到MQ,此时并发布消息到指定Channel,而之前已经订阅了该Channel的Worker收到消息后就去MQ获取并消费消息,如下图:
程序实现
主业务
主业务主要功能是接收两个参数:姓名和手机号,将这两个参数作为用户信息写入用户表,写入成功后插入一条消息到消息队列register_users,并且发送消息OK到频道register_success,主要代码如下:
<?php
require './lib.php';
$name = $argv[1];
$mobile = $argv[2];
if(empty($name) || empty($mobile)){
die("参数错误");
}
$connection = getDBConnection('127.0.0.1', 'homestead', 'secret', 'mq');
// 开启事务
mysqli_begin_transaction($connection);
$sql = "insert into mq_user(name, mobile) values ('$name', '$mobile')";
if(!mysqli_query($connection, $sql)){
die("写入用户信息失败,原因:".$connection->error);
}
$redis = getRedis();
// 添加消息
$result = $redis->lpush('register_users', json_encode(array('name'=>$name, 'mobile'=>$mobile), JSON_UNESCAPED_UNICODE));
if($result === false){
mysqli_rollback($connection);
die("添加消息队列失败");
}
// 发布消息
$redis->publish('register_success', 'ok');
// 所有操作完成后提交事务
mysqli_commit($connection);
$connection->close();
$redis->close();
Worker
Worker的主要功能是订阅频道register_success,如果收到消息且消息内容是OK就从队列register_users获取并消费消息,主要代码如下:
<?php
require './lib.php';
$redis = getRedis();
$redis->subscribe(['register_success'], function ($instance, $channelName, $message){
if($channelName == "register_success" && $message = "ok"){
$redis = getRedis();
$arr = $redis->brPop(['register_users'], 20);
if(count($arr)){
$userInfo = json_decode($arr[1], true);
stdout("新注册用户信息:");
stdout("姓名:".$userInfo['name']);
stdout("手机号:".$userInfo['mobile']);
stdout();
}
}
});
完整代码访问php-redis-mq
运行结果