tips: 打开宝塔和阿里云上的端口放行
端口
- 15672: HTTP API clients, management UI and rabbitmqadmin (only if the management plugin is enabled)
- 61613, 61614: STOMP clients without and with TLS (only if the STOMP plugin is enabled)
- 1883, 8883: (MQTT clients without and with TLS, if the MQTT plugin is enabled
- 15674: STOMP-over-WebSockets clients (only if the Web STOMP plugin is enabled)
- 15675: MQTT-over-WebSockets clients (only if the Web MQTT plugin is enabled)
- 15692: Prometheus metrics (only if the Prometheus plugin is enabled)
管理UI http://{node-hostname}:15672/
生产者 Producer
composer require php-amqplib/php-amqplib
<?php
/**
* RabbitMQ 消息队列发送操作类
* Author 范文刚
* Email 464884785@qq.com
* Time 2020/7/15 上午9:05
* Version 1.0 版本号
*/
namespace app\payment\common;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\facade\Log;
class MQService
{
private $host = '';
private $login_name = '';
private $password = '';
private $port ='';
private $exchange ='/';
private $conn =null;
public function __construct($config){
$this->host = $config["host"];
$this->port = $config['port']??'5672';
$this->login_name = $config["login_name"];
$this->password = $config["password"];
$this->conn = new AMQPStreamConnection($this->host, $this->port, $this->login_name, $this->password, '/');
}
public function __destruct(){
if($this->conn)$this->conn->close();
}
public function send($queueName,$data){
$routingKey = ''; //路由关键字(也可以省略)
Log::record('MQService 发送消息开始');
Log::record($queueName);
Log::record(json_encode($data));
$channel = $this->conn->channel(); //在已连接基础上建立生产者与mq之间的通道
$channel->exchange_declare($this->exchange, 'direct', false, true, false); //声明初始化交换机
$channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
$channel->queue_bind($queueName, $this->exchange, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字
$msgBody = json_encode($data);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
$channel->basic_publish($msg, $this->exchange, $routingKey); //推送消息到某个交换机
$channel->close();
Log::record('MQService 发送消息完成');
}
}
//======
$config = Config::get('mq_config');//商户通知
$mq_service = new MQService($config);
$queueName = 'split_maccount_1';
$data= [
'api'=>'notify.split_neworder',
'data'=>[
'amount'=>'dsadasd',
'order_id' => '21212121212',
]
];
$mq_service->send($queueName,$data);
消费者 Consumer
npm install stompjs
import Stomp from 'stompjs'
import { find } from '@/utils/index'
const modulesFiles = require.context('./modules', false, /\.js$/)
const modules = modulesFiles.keys().reduce((modules, modulePath) => {
const value = modulesFiles(modulePath)
const moduleName = modulePath.replace(/^\.\/(.*)\.\w+$/, '$1')
modules[moduleName] = value.default
return modules
}, {})
// Rabiit MQ 消息推送
class PushMessage {
constructor () {
this.state = 'stop' // 启动状态
this.url = 'ws://39.104.144.247:15674/ws'
this.login = 'loginname'
this.passcode = 'password'
this.subscribes = {} // 已经订阅的消息
if (typeof WebSocket === 'undefined') {
alert('不支持websocket')
}
}
/**
* 启动MQ服务
* @param {*} callback 启动成功后回调函数
*/
start (callback) {
console.info('启动专项监听服务')
if (!this.client) {
var ws = new WebSocket(this.url) // 初始化 ws 对象
this.client = Stomp.over(ws) // 获得Stomp client对象
}
if (this.client.connected) { // 已经启动
callback && callback()
} else { // 初次启动
this.client.connect(this.login, this.passcode, (x) => {
console.info('ZHUKE-MQ 启动成功')
this.state = 'running'
callback && callback()
}, this.connect_error, '/')
}
}
/**
* 关闭MQ 连接
*/
stop () {
this.client.disconnect(() => {
this.state = 'stop'
console.info('ZHUKE-MQ STOP')
})
}
/**
* 订阅消息
* @param {*} query_name 队列名称
*/
subscribe (query_name) {
console.info('订阅开始【' + query_name + '】 状态:' + this.state)
if (this.subscribes[query_name] === undefined) { // 添加到订阅列表
this.subscribes[query_name] = null
}
this.RefreshSubscribe()
}
/**
* 刷新订阅队列
*/
RefreshSubscribe () {
var sub = () => {
for (var key in this.subscribes) {
if (this.subscribes[key] === null) {
console.info('开始注册订阅:' + key)
this.subscribes[key] = this.client.subscribe(key, this.receive)
}
}
}
// 已经启动则直接订阅 不启动则等待启动后订阅
if (this.state === 'running') { sub() } else { this.start(sub) }
}
/**
* 取消订阅某条队列消息
* @param {*} query_name 取消订阅的队列名称
*/
unsubscribe (query_name, callback) {
if (this.subscribes[query_name]) {
this.subscribes[query_name].unsubscribe()
}
callback && callback()
}
/**
* 接收服务器发送的消息
* @param {*} data 数据对象
*/
receive (data) {
console.info('ZHUKE-MQ receive')
console.info(data)
try {
var json = JSON.parse(data.body)
var cb = find(modules, json.path)
if (cb) {
cb(json.data)
} else {
console.error(json.path + '未注册方法')
}
} catch (err) {
console.info('PushMessage data json format error')
console.log(err)
}
}
connect_error (err) {
console.log('ZHUKE-MQ 链接错误')
console.log(err)
}
}
export default PushMessage