User Tools

Site Tools


learn:activemq

This is an old revision of the document!


active MQ

ActiveMQ。Apache的项目。

安装

  • yum install activemq
  • pip install stomp.py

学习网址

  • 学习网址:
    http://activemq.apache.org/ 
  • 是Jave编写;依赖运行环境 JRE 。
  • PHP需要扩展库 Stomp 。
  • 开启schduler:activemq.xml文件中的<broker> 中加入如下属性:
       <broker ...... schedulerSupport="true">
 
  • 配置访问用户名密码:activemq.xml文件中的<broker> 中加入:
<!-- 添加访问ActiveMQ的账号密码 -->
<plugins>
        <simpleAuthenticationPlugin>
                <users>
                        <authenticationUser username="guoyi" password="guoyi" groups="users,admins"/>
                </users>
        </simpleAuthenticationPlugin>
</plugins>

安装

  • yum install -y jre
  • pecl install Stomp
  • centos: yum install activemq
  • OSX: brew install apache-activemq

启动

  • foreground run: activemq console
  • background run: activemq start | stop

web访问:http://localhost:8161/admin config:activemq myConfig.xml

php 访问

Install the [PHP Stomp client](http://www.php.net/manual/en/book.stomp.php) library.

pecl install stomp

maybe you should add “extension=stomp.so” to php.ini

延时

文档:

http://activemq.apache.org/delay-and-schedule-message-delivery.html

示例:

		try {
			$url = 'tcp://'.$host.":".$port;
			$stomp = new Stomp($url, $user, $password);
			$header['AMQ_SCHEDULED_DELAY'] = 10000;
			$header['AMQ_SCHEDULED_PERIOD'] = 3000;
			$header['AMQ_SCHEDULED_REPEAT'] = 5;
 
			$stomp->send($evt->tar, $evt->msg, $header);
 
			QhLog::INF( 'triger event '.$evt->msg.' to '.$evt->tar);
 
		} catch(StompException $e) {
			QhLog::ERR( $e->getMessage());
		}

php代码发送消息示例

$user = "guoyi";
$password = "guoyi";
$host = "123.56.201.72";
$port = 61613;
$destination  = '/queue/qhfm';
 
$param['phone'] = '13422288866';
$param['code'] = '3761';
 
$msg['id'] = '123';
$msg['type'] = 'sms';
$msg['time'] = strval(time());
$msg['param'] = $param;
 
$body = json_encode($msg);
echo 'send: '.$body.PHP_EOL ;
 
try {
  $url = 'tcp://'.$host.":".$port;
  $stomp = new Stomp($url, $user, $password);
 
  $stomp->send($destination, $body);
 
} catch(StompException $e) {
  echo $e->getMessage();
}

php代码处理消息示例

class QhMsgRoutine
{
  private $stomp;
  private $count = 0;
 
  function __construct($timeout = 60)
  {
    $user = "guoyi";
    $password = "guoyi";
    $host = "123.56.201.72";
    $port = 61613;
    $destination  = '/queue/qhfm';
 
    try {
      $url = 'tcp://'.$host.":".$port;
      $stomp = new Stomp($url, $user, $password);
      $stomp->subscribe($destination);
      $stomp->setReadTimeout($timeout);
  // var_dump($stomp->getReadTimeout()); 
 
    } catch(StompException $e) {
      echo $e->getMessage();
    }
  }
 
  function getMsg()
  {
    try {
      echo "Waiting for messages...\n";
      while(true) {
        $frame = $stomp->readFrame();
        if( $frame ) {
          $stomp->ack($frame);
          if( $frame->command == "MESSAGE" ) {
            echo "Receive message: ".$frame->body.PHP_EOL;
            $msg = json_decode($frame->body);
            var_dump($msg);
            return $msg;
          } else {
            echo "Unexpected frame.\n";
            var_dump($frame);
          }
        }else{
          echo 'waiting for msg...';
        }
      }
 
    } catch(StompException $e) {
      echo $e->getMessage();
    }
    return null;
  }
 
  function processMsg($msg)
  {
    switch ($msg['type']) {
      case 'sms':
        # code...
      break;
      case 'im':
        # code...
      break;
 
      default:
        # code...
      break;
    }
  }
}
 
 
$processer = new QhMsgRoutine(60);
 
while(true){
  $processer->processMsg($processer->getMsg());
}
learn/activemq.1561104319.txt.gz · Last modified: 2019/06/21 08:05 by soup