This shows you the differences between two versions of the page.
Next revision | Previous revision | ||
learn:activemq [2017/02/22 04:08] soup created |
learn:activemq [2019/06/25 11:05] (current) soup |
||
---|---|---|---|
Line 1: | Line 1: | ||
=====active MQ===== | =====active MQ===== | ||
+ | |||
+ | ActiveMQ。Apache的项目。 | ||
+ | |||
+ | ==== 学习网址 ==== | ||
+ | * 学习网址:<code>http://activemq.apache.org/ </code> | ||
+ | * 是Jave编写;依赖运行环境 JRE 。 | ||
+ | |||
+ | ==== 安装 ==== | ||
+ | * yum install activemq | ||
+ | * pip install stomp.py | ||
+ | * PHP需要扩展库 Stomp 。 | ||
+ | * yum install -y jre | ||
+ | * pecl install Stomp | ||
+ | * centos: yum install activemq | ||
+ | * OSX: brew install apache-activemq | ||
+ | |||
+ | ==== 配置 ==== | ||
+ | * 开启schduler,启用延时投递:conf/activemq.xml文件中的<broker> 中加入如下属性: | ||
+ | <code xml> | ||
+ | <broker ...... schedulerSupport="true"> | ||
+ | </code> | ||
+ | * 配置访问用户名密码:conf/activemq.xml文件中的<broker> 中加入: | ||
+ | <code xml> | ||
+ | <!-- 添加访问ActiveMQ的账号密码 --> | ||
+ | <plugins> | ||
+ | <simpleAuthenticationPlugin> | ||
+ | <users> | ||
+ | <authenticationUser username="guoyi" password="guoyi" groups="users,admins"/> | ||
+ | </users> | ||
+ | </simpleAuthenticationPlugin> | ||
+ | </plugins> | ||
+ | </code> | ||
+ | * 修改管理页面登录密码: | ||
+ | conf/jetty-realm.properties 中配置账号密码 | ||
+ | |||
+ | |||
+ | ===== 启动 ===== | ||
+ | * foreground run: activemq console | ||
+ | * background run: activemq start | stop | ||
+ | |||
+ | web管理页面:http://localhost:8161/admin | ||
+ | |||
+ | |||
+ | ==== php 访问 ==== | ||
+ | Install the [PHP Stomp client](http://www.php.net/manual/en/book.stomp.php) library. | ||
+ | |||
+ | pecl install stomp | ||
+ | |||
+ | maybe you should add "extension=stomp.so" to php.ini | ||
+ | |||
+ | |||
+ | ==== 延时 ==== | ||
+ | 文档: | ||
+ | <code php>http://activemq.apache.org/delay-and-schedule-message-delivery.html | ||
+ | </code> | ||
+ | 示例: | ||
+ | <code php> | ||
+ | try { | ||
+ | $url = 'tcp://'.$host.":".$port; | ||
+ | $stomp = new Stomp($url, $user, $password); | ||
+ | $header['AMQ_SCHEDULED_DELAY'] = 10000; | ||
+ | $header['AMQ_SCHEDULED_PERIOD'] = 3000; | ||
+ | $header['AMQ_SCHEDULED_REPEAT'] = 5; | ||
+ | |||
+ | $stomp->send($evt->tar, $evt->msg, $header); | ||
+ | |||
+ | QhLog::INF( 'triger event '.$evt->msg.' to '.$evt->tar); | ||
+ | |||
+ | } catch(StompException $e) { | ||
+ | QhLog::ERR( $e->getMessage()); | ||
+ | } | ||
+ | </code> | ||
+ | ==== php代码发送消息示例 ==== | ||
+ | <code php> | ||
+ | |||
+ | $user = "guoyi"; | ||
+ | $password = "guoyi"; | ||
+ | $host = "123.56.201.72"; | ||
+ | $port = 61613; | ||
+ | $destination = '/queue/qhfm'; | ||
+ | |||
+ | $param['phone'] = '13422288866'; | ||
+ | $param['code'] = '3761'; | ||
+ | |||
+ | $msg['id'] = '123'; | ||
+ | $msg['type'] = 'sms'; | ||
+ | $msg['time'] = strval(time()); | ||
+ | $msg['param'] = $param; | ||
+ | |||
+ | $body = json_encode($msg); | ||
+ | echo 'send: '.$body.PHP_EOL ; | ||
+ | |||
+ | try { | ||
+ | $url = 'tcp://'.$host.":".$port; | ||
+ | $stomp = new Stomp($url, $user, $password); | ||
+ | | ||
+ | $stomp->send($destination, $body); | ||
+ | |||
+ | } catch(StompException $e) { | ||
+ | echo $e->getMessage(); | ||
+ | } | ||
+ | </code> | ||
+ | |||
+ | |||
+ | ==== php代码处理消息示例 ==== | ||
+ | <code php> | ||
+ | |||
+ | class QhMsgRoutine | ||
+ | { | ||
+ | private $stomp; | ||
+ | private $count = 0; | ||
+ | |||
+ | function __construct($timeout = 60) | ||
+ | { | ||
+ | $user = "guoyi"; | ||
+ | $password = "guoyi"; | ||
+ | $host = "123.56.201.72"; | ||
+ | $port = 61613; | ||
+ | $destination = '/queue/qhfm'; | ||
+ | |||
+ | try { | ||
+ | $url = 'tcp://'.$host.":".$port; | ||
+ | $stomp = new Stomp($url, $user, $password); | ||
+ | $stomp->subscribe($destination); | ||
+ | $stomp->setReadTimeout($timeout); | ||
+ | // var_dump($stomp->getReadTimeout()); | ||
+ | |||
+ | } catch(StompException $e) { | ||
+ | echo $e->getMessage(); | ||
+ | } | ||
+ | } | ||
+ | |||
+ | function getMsg() | ||
+ | { | ||
+ | try { | ||
+ | echo "Waiting for messages...\n"; | ||
+ | while(true) { | ||
+ | $frame = $stomp->readFrame(); | ||
+ | if( $frame ) { | ||
+ | $stomp->ack($frame); | ||
+ | if( $frame->command == "MESSAGE" ) { | ||
+ | echo "Receive message: ".$frame->body.PHP_EOL; | ||
+ | $msg = json_decode($frame->body); | ||
+ | var_dump($msg); | ||
+ | return $msg; | ||
+ | } else { | ||
+ | echo "Unexpected frame.\n"; | ||
+ | var_dump($frame); | ||
+ | } | ||
+ | }else{ | ||
+ | echo 'waiting for msg...'; | ||
+ | } | ||
+ | } | ||
+ | |||
+ | } catch(StompException $e) { | ||
+ | echo $e->getMessage(); | ||
+ | } | ||
+ | return null; | ||
+ | } | ||
+ | |||
+ | function processMsg($msg) | ||
+ | { | ||
+ | switch ($msg['type']) { | ||
+ | case 'sms': | ||
+ | # code... | ||
+ | break; | ||
+ | case 'im': | ||
+ | # code... | ||
+ | break; | ||
+ | | ||
+ | default: | ||
+ | # code... | ||
+ | break; | ||
+ | } | ||
+ | } | ||
+ | } | ||
+ | |||
+ | |||
+ | $processer = new QhMsgRoutine(60); | ||
+ | |||
+ | while(true){ | ||
+ | $processer->processMsg($processer->getMsg()); | ||
+ | } | ||
+ | </code> | ||
+ | |||
+ | |||
+ | |||
+ | ==== python代码收发消息示例 ==== | ||
+ | <code php> | ||
+ | # -*-coding:utf-8-*- | ||
+ | import stomp | ||
+ | import time,json | ||
+ | |||
+ | tts_mq_server = [('test.jzyq.ltd',61613)] | ||
+ | queue_syn_name = '/queue/tts/syn_test' | ||
+ | queue_rst_name = '/queue/tts/rst_test' | ||
+ | |||
+ | listener_name = 'TtsListener' | ||
+ | |||
+ | class RstJobListener(object): | ||
+ | def on_message(self, headers, message): | ||
+ | print('headers: %s' % headers) | ||
+ | print('message: %s' % message) | ||
+ | msg = json.loads(message) | ||
+ | print('audio is: ' + msg['audio']) | ||
+ | data = msg['data'] | ||
+ | #update status | ||
+ | if data['cb']: | ||
+ | #call call back | ||
+ | print('callback is: ' + data['cb']) | ||
+ | pass | ||
+ | |||
+ | |||
+ | def get_msg(idx): | ||
+ | txt = 'hello (%d)' % idx | ||
+ | msg = {'id':'123456789','text':txt,'speaker':1,'data': {'cb':'https://www.baidu.com/'} } | ||
+ | ret = json.dumps(msg) | ||
+ | return ret | ||
+ | |||
+ | |||
+ | print('api begin ') | ||
+ | conn = stomp.Connection10(tts_mq_server) | ||
+ | conn.set_listener(listener_name, RstJobListener()) | ||
+ | conn.start() | ||
+ | conn.connect(username='system', passcode='manager', wait=True) | ||
+ | conn.subscribe(queue_rst_name) | ||
+ | # time.sleep(1) # secs | ||
+ | |||
+ | for i in range(10): | ||
+ | msg = get_msg(i) | ||
+ | print('i am a api call:'+ msg) | ||
+ | conn.send(queue_syn_name, msg) | ||
+ | time.sleep(1) | ||
+ | |||
+ | print('api waiting... ') | ||
+ | |||
+ | while True: | ||
+ | time.sleep(100) | ||
+ | |||
+ | conn.disconnect() | ||
+ | |||
+ | |||
+ | </code> |