=====active MQ=====
ActiveMQ。Apache的项目。
==== 学习网址 ====
* 学习网址:http://activemq.apache.org/
* 是Jave编写;依赖运行环境 JRE 。
==== 安装 ====
* yum install activemq
* pip install stomp.py
* PHP需要扩展库 Stomp 。
* yum install -y jre
* pecl install Stomp
* centos: yum install activemq
* OSX: brew install apache-activemq
==== 配置 ====
* 开启schduler,启用延时投递:conf/activemq.xml文件中的 中加入如下属性:
* 配置访问用户名密码:conf/activemq.xml文件中的 中加入:
* 修改管理页面登录密码:
conf/jetty-realm.properties 中配置账号密码
===== 启动 =====
* foreground run: activemq console
* background run: activemq start | stop
web管理页面:http://localhost:8161/admin
==== php 访问 ====
Install the [PHP Stomp client](http://www.php.net/manual/en/book.stomp.php) library.
pecl install stomp
maybe you should add "extension=stomp.so" to php.ini
==== 延时 ====
文档:
http://activemq.apache.org/delay-and-schedule-message-delivery.html
示例:
try {
$url = 'tcp://'.$host.":".$port;
$stomp = new Stomp($url, $user, $password);
$header['AMQ_SCHEDULED_DELAY'] = 10000;
$header['AMQ_SCHEDULED_PERIOD'] = 3000;
$header['AMQ_SCHEDULED_REPEAT'] = 5;
$stomp->send($evt->tar, $evt->msg, $header);
QhLog::INF( 'triger event '.$evt->msg.' to '.$evt->tar);
} catch(StompException $e) {
QhLog::ERR( $e->getMessage());
}
==== php代码发送消息示例 ====
$user = "guoyi";
$password = "guoyi";
$host = "123.56.201.72";
$port = 61613;
$destination = '/queue/qhfm';
$param['phone'] = '13422288866';
$param['code'] = '3761';
$msg['id'] = '123';
$msg['type'] = 'sms';
$msg['time'] = strval(time());
$msg['param'] = $param;
$body = json_encode($msg);
echo 'send: '.$body.PHP_EOL ;
try {
$url = 'tcp://'.$host.":".$port;
$stomp = new Stomp($url, $user, $password);
$stomp->send($destination, $body);
} catch(StompException $e) {
echo $e->getMessage();
}
==== php代码处理消息示例 ====
class QhMsgRoutine
{
private $stomp;
private $count = 0;
function __construct($timeout = 60)
{
$user = "guoyi";
$password = "guoyi";
$host = "123.56.201.72";
$port = 61613;
$destination = '/queue/qhfm';
try {
$url = 'tcp://'.$host.":".$port;
$stomp = new Stomp($url, $user, $password);
$stomp->subscribe($destination);
$stomp->setReadTimeout($timeout);
// var_dump($stomp->getReadTimeout());
} catch(StompException $e) {
echo $e->getMessage();
}
}
function getMsg()
{
try {
echo "Waiting for messages...\n";
while(true) {
$frame = $stomp->readFrame();
if( $frame ) {
$stomp->ack($frame);
if( $frame->command == "MESSAGE" ) {
echo "Receive message: ".$frame->body.PHP_EOL;
$msg = json_decode($frame->body);
var_dump($msg);
return $msg;
} else {
echo "Unexpected frame.\n";
var_dump($frame);
}
}else{
echo 'waiting for msg...';
}
}
} catch(StompException $e) {
echo $e->getMessage();
}
return null;
}
function processMsg($msg)
{
switch ($msg['type']) {
case 'sms':
# code...
break;
case 'im':
# code...
break;
default:
# code...
break;
}
}
}
$processer = new QhMsgRoutine(60);
while(true){
$processer->processMsg($processer->getMsg());
}
==== python代码收发消息示例 ====
# -*-coding:utf-8-*-
import stomp
import time,json
tts_mq_server = [('test.jzyq.ltd',61613)]
queue_syn_name = '/queue/tts/syn_test'
queue_rst_name = '/queue/tts/rst_test'
listener_name = 'TtsListener'
class RstJobListener(object):
def on_message(self, headers, message):
print('headers: %s' % headers)
print('message: %s' % message)
msg = json.loads(message)
print('audio is: ' + msg['audio'])
data = msg['data']
#update status
if data['cb']:
#call call back
print('callback is: ' + data['cb'])
pass
def get_msg(idx):
txt = 'hello (%d)' % idx
msg = {'id':'123456789','text':txt,'speaker':1,'data': {'cb':'https://www.baidu.com/'} }
ret = json.dumps(msg)
return ret
print('api begin ')
conn = stomp.Connection10(tts_mq_server)
conn.set_listener(listener_name, RstJobListener())
conn.start()
conn.connect(username='system', passcode='manager', wait=True)
conn.subscribe(queue_rst_name)
# time.sleep(1) # secs
for i in range(10):
msg = get_msg(i)
print('i am a api call:'+ msg)
conn.send(queue_syn_name, msg)
time.sleep(1)
print('api waiting... ')
while True:
time.sleep(100)
conn.disconnect()