{"id":724,"date":"2022-05-28T18:15:13","date_gmt":"2022-05-28T10:15:13","guid":{"rendered":"https:\/\/usei.cn\/?p=724"},"modified":"2022-05-28T18:15:14","modified_gmt":"2022-05-28T10:15:14","slug":"rabbitmq-%e6%b6%88%e6%81%af%e9%98%9f%e5%88%97-php%e4%bd%bf%e7%94%a8","status":"publish","type":"post","link":"https:\/\/usei.cn\/index.php\/2022\/05\/28\/rabbitmq-%e6%b6%88%e6%81%af%e9%98%9f%e5%88%97-php%e4%bd%bf%e7%94%a8\/","title":{"rendered":"RabbitMQ \u6d88\u606f\u961f\u5217-php\u4f7f\u7528"},"content":{"rendered":"\n<p id=\"e738e3b7208e4fd012af81998d6d24e1\">tips: \u6253\u5f00\u5b9d\u5854\u548c\u963f\u91cc\u4e91\u4e0a\u7684\u7aef\u53e3\u653e\u884c<\/p>\n\n\n\n<p id=\"d0bc23573dbac9f6211bf1acdbd3384a\">\u7aef\u53e3<\/p>\n\n\n\n<ul class=\"wp-block-list\"><li>15672: <a href=\"https:\/\/www.rabbitmq.com\/management.html\" target=\"_blank\" rel=\"noreferrer noopener\">HTTP API<\/a> clients, <a href=\"https:\/\/www.rabbitmq.com\/management.html\" target=\"_blank\" rel=\"noreferrer noopener\">management UI<\/a> and <a href=\"https:\/\/www.rabbitmq.com\/management-cli.html\" target=\"_blank\" rel=\"noreferrer noopener\">rabbitmqadmin<\/a> (only if the <a href=\"https:\/\/www.rabbitmq.com\/management.html\" target=\"_blank\" rel=\"noreferrer noopener\">management plugin<\/a> is enabled)<\/li><li>61613, 61614: <a href=\"https:\/\/stomp.github.io\/stomp-specification-1.2.html\" target=\"_blank\" rel=\"noreferrer noopener\">STOMP clients<\/a> without and with TLS (only if the <a href=\"https:\/\/www.rabbitmq.com\/stomp.html\" target=\"_blank\" rel=\"noreferrer noopener\">STOMP plugin<\/a> is enabled)<\/li><li>1883, 8883: (<a href=\"http:\/\/mqtt.org\/\" target=\"_blank\" rel=\"noreferrer noopener\">MQTT clients<\/a> without and with TLS, if the <a href=\"https:\/\/www.rabbitmq.com\/mqtt.html\" target=\"_blank\" rel=\"noreferrer noopener\">MQTT plugin<\/a> is enabled<\/li><li>15674: STOMP-over-WebSockets clients (only if the <a href=\"https:\/\/www.rabbitmq.com\/web-stomp.html\" target=\"_blank\" rel=\"noreferrer noopener\">Web STOMP plugin<\/a> is enabled)<\/li><li>15675: MQTT-over-WebSockets clients (only if the <a href=\"https:\/\/www.rabbitmq.com\/web-mqtt.html\" target=\"_blank\" rel=\"noreferrer noopener\">Web MQTT plugin<\/a> is enabled)<\/li><li>15692: Prometheus metrics (only if the <a href=\"https:\/\/www.rabbitmq.com\/prometheus.html\" target=\"_blank\" rel=\"noreferrer noopener\">Prometheus plugin<\/a> is enabled)<\/li><\/ul>\n\n\n\n<p id=\"1472f0b62a92b0edef892c3c866638ea\">\u7ba1\u7406UI&nbsp;http:\/\/{node-hostname}:15672\/<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">\u751f\u4ea7\u8005 Producer<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>composer require php-amqplib\/php-amqplib<\/code><\/pre>\n\n\n\n<pre class=\"wp-block-code\"><code>&lt;?php\r\n\/**\r\n * RabbitMQ \u6d88\u606f\u961f\u5217\u53d1\u9001\u64cd\u4f5c\u7c7b\r\n * Author        \u8303\u6587\u521a\r\n * Email         464884785@qq.com\r\n * Time          2020\/7\/15 \u4e0a\u53489:05\r\n * Version       1.0 \u7248\u672c\u53f7\r\n *\/\r\n\r\nnamespace app\\payment\\common;\r\nuse PhpAmqpLib\\Connection\\AMQPStreamConnection;\r\nuse PhpAmqpLib\\Message\\AMQPMessage;\r\nuse think\\facade\\Log;\r\n\r\nclass MQService\r\n{\r\n    private $host = '';\r\n    private $login_name = '';\r\n    private $password = '';\r\n    private $port ='';\r\n    private $exchange ='\/';\r\n    private $conn =null;\r\n    public function __construct($config){\r\n        $this->host = $config&#91;\"host\"];\r\n        $this->port = $config&#91;'port']??'5672';\r\n        $this->login_name = $config&#91;\"login_name\"];\r\n        $this->password = $config&#91;\"password\"];\r\n        $this->conn =  new AMQPStreamConnection($this->host, $this->port, $this->login_name, $this->password, '\/');\r\n    }\r\n    public function __destruct(){\r\n        if($this->conn)$this->conn->close();\r\n    }\r\n\r\n    public function send($queueName,$data){\r\n        $routingKey = ''; \/\/\u8def\u7531\u5173\u952e\u5b57(\u4e5f\u53ef\u4ee5\u7701\u7565)\r\n        Log::record('MQService \u53d1\u9001\u6d88\u606f\u5f00\u59cb');\r\n        Log::record($queueName);\r\n        Log::record(json_encode($data));\r\n        $channel = $this->conn->channel(); \/\/\u5728\u5df2\u8fde\u63a5\u57fa\u7840\u4e0a\u5efa\u7acb\u751f\u4ea7\u8005\u4e0emq\u4e4b\u95f4\u7684\u901a\u9053\r\n        $channel->exchange_declare($this->exchange, 'direct', false, true, false); \/\/\u58f0\u660e\u521d\u59cb\u5316\u4ea4\u6362\u673a\r\n        $channel->queue_declare($queueName, false, true, false, false); \/\/\u58f0\u660e\u521d\u59cb\u5316\u4e00\u6761\u961f\u5217\r\n        $channel->queue_bind($queueName, $this->exchange, $routingKey); \/\/\u5c06\u961f\u5217\u4e0e\u67d0\u4e2a\u4ea4\u6362\u673a\u8fdb\u884c\u7ed1\u5b9a\uff0c\u5e76\u4f7f\u7528\u8def\u7531\u5173\u952e\u5b57\r\n\r\n        $msgBody = json_encode($data);\r\n        $msg = new AMQPMessage($msgBody, &#91;'content_type' => 'text\/plain', 'delivery_mode' => 2]); \/\/\u751f\u6210\u6d88\u606f\r\n        $channel->basic_publish($msg, $this->exchange, $routingKey); \/\/\u63a8\u9001\u6d88\u606f\u5230\u67d0\u4e2a\u4ea4\u6362\u673a\r\n        $channel->close();\r\n        Log::record('MQService \u53d1\u9001\u6d88\u606f\u5b8c\u6210');\r\n    }\r\n}\r\n\r\n\/\/======\r\n$config = Config::get('mq_config');\/\/\u5546\u6237\u901a\u77e5\r\n$mq_service  =  new MQService($config);\r\n\r\n$queueName = 'split_maccount_1';\r\n$data= &#91;\r\n        'api'=>'notify.split_neworder',\r\n        'data'=>&#91;\r\n            'amount'=>'dsadasd',\r\n            'order_id' => '21212121212',\r\n        ]\r\n    ];\r\n$mq_service->send($queueName,$data);<\/code><\/pre>\n\n\n\n<h2 class=\"wp-block-heading\">\u6d88\u8d39\u8005 Consumer<\/h2>\n\n\n\n<pre class=\"wp-block-code\"><code>npm install stompjs<\/code><\/pre>\n\n\n\n<pre class=\"wp-block-code\"><code>import Stomp from 'stompjs'\r\nimport { find } from '@\/utils\/index'\r\n\r\nconst modulesFiles = require.context('.\/modules', false, \/\\.js$\/)\r\nconst modules = modulesFiles.keys().reduce((modules, modulePath) => {\r\n  const value = modulesFiles(modulePath)\r\n  const moduleName = modulePath.replace(\/^\\.\\\/(.*)\\.\\w+$\/, '$1')\r\n  modules&#91;moduleName] = value.default\r\n  return modules\r\n}, {})\r\n\r\n\/\/ Rabiit MQ \u6d88\u606f\u63a8\u9001\r\nclass PushMessage {\r\n  constructor () {\r\n    this.state = 'stop' \/\/ \u542f\u52a8\u72b6\u6001\r\n    this.url = 'ws:\/\/39.104.144.247:15674\/ws'\r\n    this.login = 'loginname'\r\n    this.passcode = 'password'\r\n    this.subscribes = {} \/\/ \u5df2\u7ecf\u8ba2\u9605\u7684\u6d88\u606f\r\n    if (typeof WebSocket === 'undefined') {\r\n      alert('\u4e0d\u652f\u6301websocket')\r\n    }\r\n  }\r\n  \/**\r\n   * \u542f\u52a8MQ\u670d\u52a1\r\n   * @param {*} callback \u542f\u52a8\u6210\u529f\u540e\u56de\u8c03\u51fd\u6570\r\n   *\/\r\n  start (callback) {\r\n    console.info('\u542f\u52a8\u4e13\u9879\u76d1\u542c\u670d\u52a1')\r\n    if (!this.client) {\r\n      var ws = new WebSocket(this.url) \/\/ \u521d\u59cb\u5316 ws \u5bf9\u8c61\r\n      this.client = Stomp.over(ws) \/\/ \u83b7\u5f97Stomp client\u5bf9\u8c61\r\n    }\r\n    if (this.client.connected) { \/\/ \u5df2\u7ecf\u542f\u52a8\r\n      callback &amp;&amp; callback()\r\n    } else { \/\/ \u521d\u6b21\u542f\u52a8\r\n      this.client.connect(this.login, this.passcode, (x) => {\r\n        console.info('ZHUKE-MQ \u542f\u52a8\u6210\u529f')\r\n        this.state = 'running'\r\n        callback &amp;&amp; callback()\r\n      }, this.connect_error, '\/')\r\n    }\r\n  }\r\n  \/**\r\n   * \u5173\u95edMQ \u8fde\u63a5\r\n   *\/\r\n  stop () {\r\n    this.client.disconnect(() => {\r\n      this.state = 'stop'\r\n      console.info('ZHUKE-MQ STOP')\r\n    })\r\n  }\r\n  \/**\r\n   * \u8ba2\u9605\u6d88\u606f\r\n   * @param {*} query_name \u961f\u5217\u540d\u79f0\r\n   *\/\r\n  subscribe (query_name) {\r\n    console.info('\u8ba2\u9605\u5f00\u59cb\u3010' + query_name + '\u3011 \u72b6\u6001\uff1a' + this.state)\r\n    if (this.subscribes&#91;query_name] === undefined) { \/\/ \u6dfb\u52a0\u5230\u8ba2\u9605\u5217\u8868\r\n      this.subscribes&#91;query_name] = null\r\n    }\r\n    this.RefreshSubscribe()\r\n  }\r\n  \/**\r\n   * \u5237\u65b0\u8ba2\u9605\u961f\u5217\r\n   *\/\r\n  RefreshSubscribe () {\r\n    var sub = () => {\r\n      for (var key in this.subscribes) {\r\n        if (this.subscribes&#91;key] === null) {\r\n          console.info('\u5f00\u59cb\u6ce8\u518c\u8ba2\u9605\uff1a' + key)\r\n          this.subscribes&#91;key] = this.client.subscribe(key, this.receive)\r\n        }\r\n      }\r\n    }\r\n    \/\/ \u5df2\u7ecf\u542f\u52a8\u5219\u76f4\u63a5\u8ba2\u9605 \u4e0d\u542f\u52a8\u5219\u7b49\u5f85\u542f\u52a8\u540e\u8ba2\u9605\r\n    if (this.state === 'running') { sub() } else { this.start(sub) }\r\n  }\r\n  \/**\r\n   * \u53d6\u6d88\u8ba2\u9605\u67d0\u6761\u961f\u5217\u6d88\u606f\r\n   * @param {*} query_name \u53d6\u6d88\u8ba2\u9605\u7684\u961f\u5217\u540d\u79f0\r\n   *\/\r\n  unsubscribe (query_name, callback) {\r\n    if (this.subscribes&#91;query_name]) {\r\n      this.subscribes&#91;query_name].unsubscribe()\r\n    }\r\n    callback &amp;&amp; callback()\r\n  }\r\n  \/**\r\n   * \u63a5\u6536\u670d\u52a1\u5668\u53d1\u9001\u7684\u6d88\u606f\r\n   * @param {*} data \u6570\u636e\u5bf9\u8c61\r\n   *\/\r\n  receive (data) {\r\n    console.info('ZHUKE-MQ receive')\r\n    console.info(data)\r\n    try {\r\n      var json = JSON.parse(data.body)\r\n      var cb = find(modules, json.path)\r\n      if (cb) {\r\n        cb(json.data)\r\n      } else {\r\n        console.error(json.path + '\u672a\u6ce8\u518c\u65b9\u6cd5')\r\n      }\r\n    } catch (err) {\r\n      console.info('PushMessage data json format error')\r\n      console.log(err)\r\n    }\r\n  }\r\n  connect_error (err) {\r\n    console.log('ZHUKE-MQ \u94fe\u63a5\u9519\u8bef')\r\n    console.log(err)\r\n  }\r\n}\r\nexport default PushMessage\r\n<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>tips: \u6253\u5f00\u5b9d\u5854\u548c\u963f\u91cc\u4e91\u4e0a\u7684\u7aef\u53e3\u653e\u884c \u7aef\u53e3 15672: HTTP API clients, manage [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[67],"tags":[],"class_list":["post-724","post","type-post","status-publish","format-standard","hentry","category-rabbitmq"],"_links":{"self":[{"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/posts\/724","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/comments?post=724"}],"version-history":[{"count":1,"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/posts\/724\/revisions"}],"predecessor-version":[{"id":725,"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/posts\/724\/revisions\/725"}],"wp:attachment":[{"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/media?parent=724"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/categories?post=724"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/usei.cn\/index.php\/wp-json\/wp\/v2\/tags?post=724"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}