RabbitMQ 消息队列-php使用

tips: 打开宝塔和阿里云上的端口放行

端口

管理UI http://{node-hostname}:15672/

生产者 Producer

composer require php-amqplib/php-amqplib
<?php
/**
 * RabbitMQ 消息队列发送操作类
 * Author        范文刚
 * Email         464884785@qq.com
 * Time          2020/7/15 上午9:05
 * Version       1.0 版本号
 */

namespace app\payment\common;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\facade\Log;

class MQService
{
    private $host = '';
    private $login_name = '';
    private $password = '';
    private $port ='';
    private $exchange ='/';
    private $conn =null;
    public function __construct($config){
        $this->host = $config["host"];
        $this->port = $config['port']??'5672';
        $this->login_name = $config["login_name"];
        $this->password = $config["password"];
        $this->conn =  new AMQPStreamConnection($this->host, $this->port, $this->login_name, $this->password, '/');
    }
    public function __destruct(){
        if($this->conn)$this->conn->close();
    }

    public function send($queueName,$data){
        $routingKey = ''; //路由关键字(也可以省略)
        Log::record('MQService 发送消息开始');
        Log::record($queueName);
        Log::record(json_encode($data));
        $channel = $this->conn->channel(); //在已连接基础上建立生产者与mq之间的通道
        $channel->exchange_declare($this->exchange, 'direct', false, true, false); //声明初始化交换机
        $channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
        $channel->queue_bind($queueName, $this->exchange, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字

        $msgBody = json_encode($data);
        $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
        $channel->basic_publish($msg, $this->exchange, $routingKey); //推送消息到某个交换机
        $channel->close();
        Log::record('MQService 发送消息完成');
    }
}

//======
$config = Config::get('mq_config');//商户通知
$mq_service  =  new MQService($config);

$queueName = 'split_maccount_1';
$data= [
        'api'=>'notify.split_neworder',
        'data'=>[
            'amount'=>'dsadasd',
            'order_id' => '21212121212',
        ]
    ];
$mq_service->send($queueName,$data);

消费者 Consumer

npm install stompjs
import Stomp from 'stompjs'
import { find } from '@/utils/index'

const modulesFiles = require.context('./modules', false, /\.js$/)
const modules = modulesFiles.keys().reduce((modules, modulePath) => {
  const value = modulesFiles(modulePath)
  const moduleName = modulePath.replace(/^\.\/(.*)\.\w+$/, '$1')
  modules[moduleName] = value.default
  return modules
}, {})

// Rabiit MQ 消息推送
class PushMessage {
  constructor () {
    this.state = 'stop' // 启动状态
    this.url = 'ws://39.104.144.247:15674/ws'
    this.login = 'loginname'
    this.passcode = 'password'
    this.subscribes = {} // 已经订阅的消息
    if (typeof WebSocket === 'undefined') {
      alert('不支持websocket')
    }
  }
  /**
   * 启动MQ服务
   * @param {*} callback 启动成功后回调函数
   */
  start (callback) {
    console.info('启动专项监听服务')
    if (!this.client) {
      var ws = new WebSocket(this.url) // 初始化 ws 对象
      this.client = Stomp.over(ws) // 获得Stomp client对象
    }
    if (this.client.connected) { // 已经启动
      callback && callback()
    } else { // 初次启动
      this.client.connect(this.login, this.passcode, (x) => {
        console.info('ZHUKE-MQ 启动成功')
        this.state = 'running'
        callback && callback()
      }, this.connect_error, '/')
    }
  }
  /**
   * 关闭MQ 连接
   */
  stop () {
    this.client.disconnect(() => {
      this.state = 'stop'
      console.info('ZHUKE-MQ STOP')
    })
  }
  /**
   * 订阅消息
   * @param {*} query_name 队列名称
   */
  subscribe (query_name) {
    console.info('订阅开始【' + query_name + '】 状态:' + this.state)
    if (this.subscribes[query_name] === undefined) { // 添加到订阅列表
      this.subscribes[query_name] = null
    }
    this.RefreshSubscribe()
  }
  /**
   * 刷新订阅队列
   */
  RefreshSubscribe () {
    var sub = () => {
      for (var key in this.subscribes) {
        if (this.subscribes[key] === null) {
          console.info('开始注册订阅:' + key)
          this.subscribes[key] = this.client.subscribe(key, this.receive)
        }
      }
    }
    // 已经启动则直接订阅 不启动则等待启动后订阅
    if (this.state === 'running') { sub() } else { this.start(sub) }
  }
  /**
   * 取消订阅某条队列消息
   * @param {*} query_name 取消订阅的队列名称
   */
  unsubscribe (query_name, callback) {
    if (this.subscribes[query_name]) {
      this.subscribes[query_name].unsubscribe()
    }
    callback && callback()
  }
  /**
   * 接收服务器发送的消息
   * @param {*} data 数据对象
   */
  receive (data) {
    console.info('ZHUKE-MQ receive')
    console.info(data)
    try {
      var json = JSON.parse(data.body)
      var cb = find(modules, json.path)
      if (cb) {
        cb(json.data)
      } else {
        console.error(json.path + '未注册方法')
      }
    } catch (err) {
      console.info('PushMessage data json format error')
      console.log(err)
    }
  }
  connect_error (err) {
    console.log('ZHUKE-MQ 链接错误')
    console.log(err)
  }
}
export default PushMessage
上一篇
下一篇