Table of Contents

active MQ

ActiveMQ。Apache的项目。

学习网址

安装

配置

       <broker ...... schedulerSupport="true">
 
<!-- 添加访问ActiveMQ的账号密码 -->
<plugins>
        <simpleAuthenticationPlugin>
                <users>
                        <authenticationUser username="guoyi" password="guoyi" groups="users,admins"/>
                </users>
        </simpleAuthenticationPlugin>
</plugins>

conf/jetty-realm.properties 中配置账号密码

启动

web管理页面:http://localhost:8161/admin

php 访问

Install the [PHP Stomp client](http://www.php.net/manual/en/book.stomp.php) library.

pecl install stomp

maybe you should add “extension=stomp.so” to php.ini

延时

文档:

http://activemq.apache.org/delay-and-schedule-message-delivery.html

示例:

		try {
			$url = 'tcp://'.$host.":".$port;
			$stomp = new Stomp($url, $user, $password);
			$header['AMQ_SCHEDULED_DELAY'] = 10000;
			$header['AMQ_SCHEDULED_PERIOD'] = 3000;
			$header['AMQ_SCHEDULED_REPEAT'] = 5;
 
			$stomp->send($evt->tar, $evt->msg, $header);
 
			QhLog::INF( 'triger event '.$evt->msg.' to '.$evt->tar);
 
		} catch(StompException $e) {
			QhLog::ERR( $e->getMessage());
		}

php代码发送消息示例

$user = "guoyi";
$password = "guoyi";
$host = "123.56.201.72";
$port = 61613;
$destination  = '/queue/qhfm';
 
$param['phone'] = '13422288866';
$param['code'] = '3761';
 
$msg['id'] = '123';
$msg['type'] = 'sms';
$msg['time'] = strval(time());
$msg['param'] = $param;
 
$body = json_encode($msg);
echo 'send: '.$body.PHP_EOL ;
 
try {
  $url = 'tcp://'.$host.":".$port;
  $stomp = new Stomp($url, $user, $password);
 
  $stomp->send($destination, $body);
 
} catch(StompException $e) {
  echo $e->getMessage();
}

php代码处理消息示例

class QhMsgRoutine
{
  private $stomp;
  private $count = 0;
 
  function __construct($timeout = 60)
  {
    $user = "guoyi";
    $password = "guoyi";
    $host = "123.56.201.72";
    $port = 61613;
    $destination  = '/queue/qhfm';
 
    try {
      $url = 'tcp://'.$host.":".$port;
      $stomp = new Stomp($url, $user, $password);
      $stomp->subscribe($destination);
      $stomp->setReadTimeout($timeout);
  // var_dump($stomp->getReadTimeout()); 
 
    } catch(StompException $e) {
      echo $e->getMessage();
    }
  }
 
  function getMsg()
  {
    try {
      echo "Waiting for messages...\n";
      while(true) {
        $frame = $stomp->readFrame();
        if( $frame ) {
          $stomp->ack($frame);
          if( $frame->command == "MESSAGE" ) {
            echo "Receive message: ".$frame->body.PHP_EOL;
            $msg = json_decode($frame->body);
            var_dump($msg);
            return $msg;
          } else {
            echo "Unexpected frame.\n";
            var_dump($frame);
          }
        }else{
          echo 'waiting for msg...';
        }
      }
 
    } catch(StompException $e) {
      echo $e->getMessage();
    }
    return null;
  }
 
  function processMsg($msg)
  {
    switch ($msg['type']) {
      case 'sms':
        # code...
      break;
      case 'im':
        # code...
      break;
 
      default:
        # code...
      break;
    }
  }
}
 
 
$processer = new QhMsgRoutine(60);
 
while(true){
  $processer->processMsg($processer->getMsg());
}

python代码收发消息示例

# -*-coding:utf-8-*-
import stomp
import time,json
 
tts_mq_server = [('test.jzyq.ltd',61613)]
queue_syn_name = '/queue/tts/syn_test'
queue_rst_name = '/queue/tts/rst_test'
 
listener_name = 'TtsListener'
 
class RstJobListener(object):
    def on_message(self, headers, message):
        print('headers: %s' % headers)
        print('message: %s' % message)
        msg = json.loads(message)
        print('audio is: ' + msg['audio'])
        data = msg['data']
        #update status
        if data['cb']:
            #call call back
            print('callback is: ' + data['cb'])
            pass
 
 
def get_msg(idx):
    txt = 'hello (%d)' % idx
    msg = {'id':'123456789','text':txt,'speaker':1,'data': {'cb':'https://www.baidu.com/'} }
    ret =  json.dumps(msg)
    return ret
 
 
print('api begin ')
conn = stomp.Connection10(tts_mq_server)
conn.set_listener(listener_name, RstJobListener())
conn.start()
conn.connect(username='system', passcode='manager', wait=True)
conn.subscribe(queue_rst_name)
# time.sleep(1) # secs

for i in range(10):
    msg = get_msg(i)
    print('i am a api call:'+ msg)
    conn.send(queue_syn_name,  msg)
    time.sleep(1) 
 
print('api waiting... ')
 
while True:
    time.sleep(100)
 
conn.disconnect()