User Tools

Site Tools


learn:activemq

Differences

This shows you the differences between two versions of the page.

Link to this comparison view

Next revision
Previous revision
learn:activemq [2017/02/22 04:08]
soup created
learn:activemq [2019/06/25 11:05] (current)
soup
Line 1: Line 1:
 =====active MQ===== =====active MQ=====
 +
 + ​ActiveMQ。Apache的项目。
 +
 +==== 学习网址 ====
 +  * 学习网址:<​code>​http://​activemq.apache.org/​ </​code>​
 +  * 是Jave编写;依赖运行环境 JRE 。
 +
 +==== 安装 ====
 +  * yum install activemq ​
 +  * pip install stomp.py
 +  * PHP需要扩展库 Stomp 。
 +  * yum install -y jre
 +  * pecl install Stomp
 +  * centos: yum install activemq
 +  * OSX:  brew install apache-activemq
 +
 +==== 配置 ====
 +  * 开启schduler,​启用延时投递:conf/​activemq.xml文件中的<​broker>​ 中加入如下属性:
 +       <​code xml>
 +       <​broker ...... schedulerSupport="​true">​
 +       </​code>​
 +  * 配置访问用户名密码:conf/​activemq.xml文件中的<​broker>​ 中加入:
 +<code xml>
 +<!-- 添加访问ActiveMQ的账号密码 -->
 +<​plugins>​
 +        <​simpleAuthenticationPlugin>​
 +                <​users>​
 +                        <​authenticationUser username="​guoyi"​ password="​guoyi"​ groups="​users,​admins"/>​
 +                </​users>​
 +        </​simpleAuthenticationPlugin>​
 +</​plugins>​
 +</​code>​
 +  * 修改管理页面登录密码:
 +       ​conf/​jetty-realm.properties 中配置账号密码
 +
 +
 +===== 启动 =====
 +  * foreground run: activemq console
 +  * background run: activemq start | stop
 +
 +web管理页面:http://​localhost:​8161/​admin
 +
 +
 +==== php 访问 ====
 +Install the [PHP Stomp client](http://​www.php.net/​manual/​en/​book.stomp.php) library.
 +
 +pecl install stomp
 +
 +maybe you should add "​extension=stomp.so"​ to php.ini
 +
 +
 +==== 延时 ====
 +文档:
 +<code php>​http://​activemq.apache.org/​delay-and-schedule-message-delivery.html
 +</​code>​
 +示例:
 +<code php>
 + try {
 + $url = '​tcp://'​.$host.":"​.$port;​
 + $stomp = new Stomp($url, $user, $password);
 + $header['​AMQ_SCHEDULED_DELAY'​] = 10000;
 + $header['​AMQ_SCHEDULED_PERIOD'​] = 3000;
 + $header['​AMQ_SCHEDULED_REPEAT'​] = 5;
 +
 + $stomp->​send($evt->​tar,​ $evt->​msg,​ $header);
 +
 + QhLog::​INF( '​triger event '​.$evt->​msg.'​ to '​.$evt->​tar);​
 +
 + } catch(StompException $e) {
 + QhLog::​ERR( $e->​getMessage());​
 + }
 +</​code>​
 +==== php代码发送消息示例 ====
 +<code php>
 +
 +$user = "​guoyi";​
 +$password = "​guoyi";​
 +$host = "​123.56.201.72";​
 +$port = 61613;
 +$destination ​ = '/​queue/​qhfm';​
 +
 +$param['​phone'​] = '​13422288866';​
 +$param['​code'​] = '​3761';​
 +
 +$msg['​id'​] = '​123';​
 +$msg['​type'​] = '​sms';​
 +$msg['​time'​] = strval(time());​
 +$msg['​param'​] = $param;
 +
 +$body = json_encode($msg);​
 +echo 'send: '​.$body.PHP_EOL ;
 +
 +try {
 +  $url = '​tcp://'​.$host.":"​.$port;​
 +  $stomp = new Stomp($url, $user, $password);
 +  ​
 +  $stomp->​send($destination,​ $body);
 +
 +} catch(StompException $e) {
 +  echo $e->​getMessage();​
 +}
 +</​code>​
 +
 +
 +==== php代码处理消息示例 ====
 +<code php>
 +
 +class QhMsgRoutine
 +{
 +  private $stomp;
 +  private $count = 0;
 +
 +  function __construct($timeout = 60)
 +  {
 +    $user = "​guoyi";​
 +    $password = "​guoyi";​
 +    $host = "​123.56.201.72";​
 +    $port = 61613;
 +    $destination ​ = '/​queue/​qhfm';​
 +
 +    try {
 +      $url = '​tcp://'​.$host.":"​.$port;​
 +      $stomp = new Stomp($url, $user, $password);
 +      $stomp->​subscribe($destination);​
 +      $stomp->​setReadTimeout($timeout);​
 +  // var_dump($stomp->​getReadTimeout()); ​
 +
 +    } catch(StompException $e) {
 +      echo $e->​getMessage();​
 +    }
 +  }
 +
 +  function getMsg()
 +  {
 +    try {
 +      echo "​Waiting for messages...\n";​
 +      while(true) {
 +        $frame = $stomp->​readFrame();​
 +        if( $frame ) {
 +          $stomp->​ack($frame);​
 +          if( $frame->​command == "​MESSAGE"​ ) {
 +            echo "​Receive message: "​.$frame->​body.PHP_EOL;​
 +            $msg = json_decode($frame->​body);​
 +            var_dump($msg);​
 +            return $msg;
 +          } else {
 +            echo "​Unexpected frame.\n";​
 +            var_dump($frame);​
 +          }
 +        }else{
 +          echo '​waiting for msg...';​
 +        }
 +      }
 +
 +    } catch(StompException $e) {
 +      echo $e->​getMessage();​
 +    }
 +    return null;
 +  }
 +
 +  function processMsg($msg)
 +  {
 +    switch ($msg['​type'​]) {
 +      case '​sms':​
 +        # code...
 +      break;
 +      case '​im':​
 +        # code...
 +      break;
 +      ​
 +      default:
 +        # code...
 +      break;
 +    }
 +  }
 +}
 +
 +
 +$processer = new QhMsgRoutine(60);​
 +
 +while(true){
 +  $processer->​processMsg($processer->​getMsg());​
 +}
 +</​code>​
 +
 +
 +
 +==== python代码收发消息示例 ====
 +<code php>
 +# -*-coding:​utf-8-*-
 +import stomp
 +import time,json
 +
 +tts_mq_server = [('​test.jzyq.ltd',​61613)]
 +queue_syn_name = '/​queue/​tts/​syn_test'​
 +queue_rst_name = '/​queue/​tts/​rst_test'​
 +
 +listener_name = '​TtsListener'​
 + 
 +class RstJobListener(object):​
 +    def on_message(self,​ headers, message):
 +        print('​headers:​ %s' % headers)
 +        print('​message:​ %s' % message)
 +        msg = json.loads(message)
 +        print('​audio is: ' + msg['​audio'​])
 +        data = msg['​data'​]
 +        #update status
 +        if data['​cb'​]:​
 +            #call call back
 +            print('​callback is: ' + data['​cb'​])
 +            pass
 +
 + 
 +def get_msg(idx):​
 +    txt = 'hello (%d)' % idx
 +    msg = {'​id':'​123456789','​text':​txt,'​speaker':​1,'​data':​ {'​cb':'​https://​www.baidu.com/'​} }
 +    ret =  json.dumps(msg)
 +    return ret
 + 
 +
 +print('​api begin ')
 +conn = stomp.Connection10(tts_mq_server)
 +conn.set_listener(listener_name,​ RstJobListener())
 +conn.start()
 +conn.connect(username='​system',​ passcode='​manager',​ wait=True)
 +conn.subscribe(queue_rst_name)
 +# time.sleep(1) # secs
 +
 +for i in range(10):
 +    msg = get_msg(i)
 +    print('​i am a api call:'​+ msg)
 +    conn.send(queue_syn_name, ​ msg)
 +    time.sleep(1) ​
 +
 +print('​api waiting... ')
 +
 +while True:
 +    time.sleep(100)
 +
 +conn.disconnect()
 +
 +
 +</​code>​
learn/activemq.1487736524.txt.gz · Last modified: 2017/02/22 04:08 by soup