0%

php结合php-ampqlib做rabbitMQ队列

内容概述

php使用rabbitMQ做消息队列,这是一种常见的异步的工作方式,在很多的工作场景都能遇到,在阅读该文章之前,需要提前了解rabbitMQ的工作原理以及下面的几种工作模式,这样更有助于阅读这篇文章,以下代码是通过自行封装对amqplib的各种模式的使用,便于记忆,在此记录。

composer安装扩展

1
2
composer安装
composer require PHP-amqplib/PHP-amqplib

代码展示

1、加入队列

1
2
3
4
5
6
//RabbitMQ使用
public function test4(){
$client = Producer::getInstance();
$res = $client->publishMsg('exchange_name',['queue_name'],'111111','','');
dump($res);
}

2.生产者类库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
<?PHP
namespace services\rabbitmq;
use PHPAmqpLib\Connection\AMQPStreamConnection;
use PHPAmqpLib\Exception\AMQPIOException;
use PHPAmqpLib\Exchange\AMQPExchangeType;
use PHPAmqpLib\Message\AMQPMessage;
/**生产者类
* Class Producer
* @package services\rabbitmq
*/
class Producer
{
private $host;
private $port;
private $user;
private $pwd;
private $vhost;
private static $client;
private static $instance;
private function __construct($host,$port,$user,$pwd,$vhost)
{
if (empty($host) || empty($port) || empty($user) || empty($pwd) || empty($vhost)){
$info = config('config')['RabbitMQ'];
$this->host = $info['address'];
$this->port = $info['port'];
$this->user = $info['user'];
$this->pwd = $info['pwd'];
$this->vhost = $info['vhost'];
}else{
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->pwd = $pwd;
$this->vhost = $vhost;
}
self::$client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
}

/**
* - 获取实例
* @param $host
* @param $port
* @param $user
* @param $pwd
* @param $vhost
* @return Producer
*/
public static function getInstance($host = '',$port = '',$user = '',$pwd = '',$vhost = '') {
if (!(self::$instance instanceof self)) {
self::$instance = new self($host,$port,$user,$pwd,$vhost);
}
return self::$instance;
}


public function publishMsg($exchange,$QueueArr,$msg,$message_id,$route_key = '',$expiration = 3600 * 90){
$channel = self::$client->channel();
/**交换机声明
* $exchange 交换机名称
* AMQPExchangeType::DIRECT 路由模式
* passive: false
* durable: true 持久化 交换器将在服务器重启后继续存在
* auto_delete: false 一旦通道关闭,交换器将不会被删除
*/
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false, false, false, [], null);
//绑定多个对列
foreach ($QueueArr as $key=>$value){
/**声明队列(设置队列的时间必须设置一次,如要修改需要删除这个队列)new AMQPTable(['x-message-ttl'=>10000])
* $value 队列名称
* passive false
* 持久durable true 队列将在服务器重启后继续存在
* 互斥exclusive false 队列可以通过其他渠道访问
* auto_delete false 通道关闭后,队列不会被删除
*/
$channel->queue_declare($value,false,true,false,false,false);
//队列和交换机绑定
$channel->queue_bind($value, $exchange,$route_key);
}
//发送消息
$message = new AMQPMessage($msg,array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,'expiration'=>$expiration * 1000,'message_id'=>$message_id));
$channel->basic_publish($message,$exchange,$route_key);
$channel->close();
self::$client->close();
return true;
}
}

3.消费者类库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?PHP
namespace services\rabbitmq;
use PHPAmqpLib\Connection\AMQPStreamConnection;
class Consumer
{
private $host;
private $port;
private $user;
private $pwd;
private $vhost;
private $client;
private $channel;
public function __construct()
{
$info = config('config')['RabbitMQ'];
$this->host = $info['address'];
$this->port = $info['port'];
$this->user = $info['user'];
$this->pwd = $info['pwd'];
$this->vhost = $info['vhost'];
$this->client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
$this->channel = $this->client->channel();
}
public function start(){
/**
*
* queue: queue_name // 被消费的队列名称
* consumer_tag: consumer_tag // 消费者客户端身份标识,用于区分多个客户端
* no_local: false // 这个功能属于AMQP的标准,但是RabbitMQ并没有做实现
* no_ack: true // 收到消息后,是否不需要回复确认即被认为被消费
* exclusive: false // 是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
* Nowait: false // 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
* callback: $callback // 回调逻辑处理函数
*
*/
$this->channel->basic_consume('queue_name','',false, false, false, false,[$this,'process_message']);
register_shutdown_function([$this, 'shutdown'], $this->channel, $this->client);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function shutdown($channel, $connection){
$channel->close();
$connection->close();
save_log('close');
}
public function process_message($message){
echo $message->body."\n";
//手动发送ack
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
}

4、PHP中 register_shutdown_function 函数的基础介绍与用法详解

参考链接:https://www.jb51.net/article/129213.htm