This is an old revision of the document!
ActiveMQ。Apache的项目。
http://activemq.apache.org/
<broker ...... schedulerSupport="true">
<!-- 添加访问ActiveMQ的账号密码 --> <plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="guoyi" password="guoyi" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins>
conf/jetty-realm.properties 中配置账号密码
web管理页面:http://localhost:8161/admin
Install the [PHP Stomp client](http://www.php.net/manual/en/book.stomp.php) library.
pecl install stomp
maybe you should add “extension=stomp.so” to php.ini
文档:
http://activemq.apache.org/delay-and-schedule-message-delivery.html
示例:
try { $url = 'tcp://'.$host.":".$port; $stomp = new Stomp($url, $user, $password); $header['AMQ_SCHEDULED_DELAY'] = 10000; $header['AMQ_SCHEDULED_PERIOD'] = 3000; $header['AMQ_SCHEDULED_REPEAT'] = 5; $stomp->send($evt->tar, $evt->msg, $header); QhLog::INF( 'triger event '.$evt->msg.' to '.$evt->tar); } catch(StompException $e) { QhLog::ERR( $e->getMessage()); }
$user = "guoyi"; $password = "guoyi"; $host = "123.56.201.72"; $port = 61613; $destination = '/queue/qhfm'; $param['phone'] = '13422288866'; $param['code'] = '3761'; $msg['id'] = '123'; $msg['type'] = 'sms'; $msg['time'] = strval(time()); $msg['param'] = $param; $body = json_encode($msg); echo 'send: '.$body.PHP_EOL ; try { $url = 'tcp://'.$host.":".$port; $stomp = new Stomp($url, $user, $password); $stomp->send($destination, $body); } catch(StompException $e) { echo $e->getMessage(); }
class QhMsgRoutine { private $stomp; private $count = 0; function __construct($timeout = 60) { $user = "guoyi"; $password = "guoyi"; $host = "123.56.201.72"; $port = 61613; $destination = '/queue/qhfm'; try { $url = 'tcp://'.$host.":".$port; $stomp = new Stomp($url, $user, $password); $stomp->subscribe($destination); $stomp->setReadTimeout($timeout); // var_dump($stomp->getReadTimeout()); } catch(StompException $e) { echo $e->getMessage(); } } function getMsg() { try { echo "Waiting for messages...\n"; while(true) { $frame = $stomp->readFrame(); if( $frame ) { $stomp->ack($frame); if( $frame->command == "MESSAGE" ) { echo "Receive message: ".$frame->body.PHP_EOL; $msg = json_decode($frame->body); var_dump($msg); return $msg; } else { echo "Unexpected frame.\n"; var_dump($frame); } }else{ echo 'waiting for msg...'; } } } catch(StompException $e) { echo $e->getMessage(); } return null; } function processMsg($msg) { switch ($msg['type']) { case 'sms': # code... break; case 'im': # code... break; default: # code... break; } } } $processer = new QhMsgRoutine(60); while(true){ $processer->processMsg($processer->getMsg()); }
# -*-coding:utf-8-*- import stomp import time,json tts_mq_server = [('test.jzyq.ltd',61613)] queue_syn_name = '/queue/tts/syn_test' queue_rst_name = '/queue/tts/rst_test' listener_name = 'TtsListener' class RstJobListener(object): def on_message(self, headers, message): print('headers: %s' % headers) print('message: %s' % message) msg = json.loads(message) print('audio is: ' + msg['audio']) data = msg['data'] #update status if data['cb']: #call call back print('callback is: ' + data['cb']) pass def get_msg(idx): txt = 'hello (%d)' % idx msg = {'id':'123456789','text':txt,'speaker':1,'data': {'cb':'https://www.baidu.com/'} } ret = json.dumps(msg) return ret print('api begin ') conn = stomp.Connection10(tts_mq_server) conn.set_listener(listener_name, RstJobListener()) conn.start() conn.connect(username='system', passcode='manager', wait=True) conn.subscribe(queue_rst_name) # time.sleep(1) # secs for i in range(10): msg = get_msg(i) print('i am a api call:'+ msg) conn.send(queue_syn_name, msg) time.sleep(1) print('api waiting... ') while True: time.sleep(100) conn.disconnect()