This shows you the differences between two versions of the page.
Both sides previous revision Previous revision Next revision | Previous revision | ||
learn:activemq [2017/03/21 07:50] soup |
learn:activemq [2019/06/25 11:05] (current) soup |
||
---|---|---|---|
Line 1: | Line 1: | ||
=====active MQ===== | =====active MQ===== | ||
+ | |||
+ | ActiveMQ。Apache的项目。 | ||
+ | |||
+ | ==== 学习网址 ==== | ||
+ | * 学习网址:<code>http://activemq.apache.org/ </code> | ||
+ | * 是Jave编写;依赖运行环境 JRE 。 | ||
+ | |||
+ | ==== 安装 ==== | ||
+ | * yum install activemq | ||
+ | * pip install stomp.py | ||
+ | * PHP需要扩展库 Stomp 。 | ||
+ | * yum install -y jre | ||
+ | * pecl install Stomp | ||
+ | * centos: yum install activemq | ||
+ | * OSX: brew install apache-activemq | ||
+ | |||
+ | ==== 配置 ==== | ||
+ | * 开启schduler,启用延时投递:conf/activemq.xml文件中的<broker> 中加入如下属性: | ||
+ | <code xml> | ||
+ | <broker ...... schedulerSupport="true"> | ||
+ | </code> | ||
+ | * 配置访问用户名密码:conf/activemq.xml文件中的<broker> 中加入: | ||
+ | <code xml> | ||
+ | <!-- 添加访问ActiveMQ的账号密码 --> | ||
+ | <plugins> | ||
+ | <simpleAuthenticationPlugin> | ||
+ | <users> | ||
+ | <authenticationUser username="guoyi" password="guoyi" groups="users,admins"/> | ||
+ | </users> | ||
+ | </simpleAuthenticationPlugin> | ||
+ | </plugins> | ||
+ | </code> | ||
+ | * 修改管理页面登录密码: | ||
+ | conf/jetty-realm.properties 中配置账号密码 | ||
+ | |||
+ | |||
+ | ===== 启动 ===== | ||
+ | * foreground run: activemq console | ||
+ | * background run: activemq start | stop | ||
+ | |||
+ | web管理页面:http://localhost:8161/admin | ||
+ | |||
==== php 访问 ==== | ==== php 访问 ==== | ||
Line 8: | Line 50: | ||
maybe you should add "extension=stomp.so" to php.ini | maybe you should add "extension=stomp.so" to php.ini | ||
+ | |||
+ | ==== 延时 ==== | ||
+ | 文档: | ||
+ | <code php>http://activemq.apache.org/delay-and-schedule-message-delivery.html | ||
+ | </code> | ||
+ | 示例: | ||
+ | <code php> | ||
+ | try { | ||
+ | $url = 'tcp://'.$host.":".$port; | ||
+ | $stomp = new Stomp($url, $user, $password); | ||
+ | $header['AMQ_SCHEDULED_DELAY'] = 10000; | ||
+ | $header['AMQ_SCHEDULED_PERIOD'] = 3000; | ||
+ | $header['AMQ_SCHEDULED_REPEAT'] = 5; | ||
+ | |||
+ | $stomp->send($evt->tar, $evt->msg, $header); | ||
+ | |||
+ | QhLog::INF( 'triger event '.$evt->msg.' to '.$evt->tar); | ||
+ | |||
+ | } catch(StompException $e) { | ||
+ | QhLog::ERR( $e->getMessage()); | ||
+ | } | ||
+ | </code> | ||
==== php代码发送消息示例 ==== | ==== php代码发送消息示例 ==== | ||
- | <code> | + | <code php> |
$user = "guoyi"; | $user = "guoyi"; | ||
Line 41: | Line 105: | ||
==== php代码处理消息示例 ==== | ==== php代码处理消息示例 ==== | ||
- | <code> | + | <code php> |
class QhMsgRoutine | class QhMsgRoutine | ||
Line 122: | Line 186: | ||
+ | |||
+ | ==== python代码收发消息示例 ==== | ||
+ | <code php> | ||
+ | # -*-coding:utf-8-*- | ||
+ | import stomp | ||
+ | import time,json | ||
+ | |||
+ | tts_mq_server = [('test.jzyq.ltd',61613)] | ||
+ | queue_syn_name = '/queue/tts/syn_test' | ||
+ | queue_rst_name = '/queue/tts/rst_test' | ||
+ | |||
+ | listener_name = 'TtsListener' | ||
+ | |||
+ | class RstJobListener(object): | ||
+ | def on_message(self, headers, message): | ||
+ | print('headers: %s' % headers) | ||
+ | print('message: %s' % message) | ||
+ | msg = json.loads(message) | ||
+ | print('audio is: ' + msg['audio']) | ||
+ | data = msg['data'] | ||
+ | #update status | ||
+ | if data['cb']: | ||
+ | #call call back | ||
+ | print('callback is: ' + data['cb']) | ||
+ | pass | ||
+ | |||
+ | |||
+ | def get_msg(idx): | ||
+ | txt = 'hello (%d)' % idx | ||
+ | msg = {'id':'123456789','text':txt,'speaker':1,'data': {'cb':'https://www.baidu.com/'} } | ||
+ | ret = json.dumps(msg) | ||
+ | return ret | ||
+ | |||
+ | |||
+ | print('api begin ') | ||
+ | conn = stomp.Connection10(tts_mq_server) | ||
+ | conn.set_listener(listener_name, RstJobListener()) | ||
+ | conn.start() | ||
+ | conn.connect(username='system', passcode='manager', wait=True) | ||
+ | conn.subscribe(queue_rst_name) | ||
+ | # time.sleep(1) # secs | ||
+ | |||
+ | for i in range(10): | ||
+ | msg = get_msg(i) | ||
+ | print('i am a api call:'+ msg) | ||
+ | conn.send(queue_syn_name, msg) | ||
+ | time.sleep(1) | ||
+ | |||
+ | print('api waiting... ') | ||
+ | |||
+ | while True: | ||
+ | time.sleep(100) | ||
+ | |||
+ | conn.disconnect() | ||
+ | |||
+ | |||
+ | </code> |