php-nsq pub例子:sub例子:Nsq 类方法:Message 类方法与属性:Tips :Changes NSQ 的 PHP 客户端

程序名称:php-nsq pub例子:sub例子:Nsq 类方法:Message 类方法与属性:Tips :Changes

授权协议: 未知

操作系统: 跨平台

开发语言: C/C++

php-nsq pub例子:sub例子:Nsq 类方法:Message 类方法与属性:Tips :Changes 介绍

php-nsq

php-nsq 是nsq的php客户端,采用c扩展编写,性能和稳定性。

请提前安装libevent

Dependencies: libevent  (apt-get install libevent-dev ,yum install libevent-devel)

1. sudo phpize
2. ./configure 
3. make  
4. make install

add in your php.ini:

extension = nsq.so;

pub例子:

$nsqdAddr = array(
    "127.0.0.1:4150",
    "127.0.0.1:4154"
);

$nsq = new Nsq();
$isTrue = $nsq->connectNsqd($nsqdAddr);

for($i = 0; $i < 10000; $i++){
    $nsq->publish("test", "nihao");
}
$nsq->closeNsqdConnection();

// Deferred publish 
//function : deferredPublish(string topic,string message, int millisecond); 
//millisecond default : [0 < millisecond < 3600000]

$deferred = new Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for($i = 0; $i < 20; $i++){
    $deferred->deferredPublish("test", "message daly", 3000); 
}
$deferred->closeNsqdConnection();

sub例子:

<?php

//sub

$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr
$nsq = new Nsq();
$config = array(
    "topic" => "test",
    "channel" => "struggle",
    "rdy" => 2,                //optional , default 1
    "connect_num" => 1,        //optional , default 1   
    "retry_delay_time" => 5000,  //optional, default 0 , if run callback failed, after 5000 msec, message will be retried
    "auto_finish" => true, //default true
);

$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev){

    echo $msg->payload;
    echo $msg->attempts;
    echo $msg->message_id;
    echo $msg->timestamp;


});

Nsq 类方法:

  • connectNsqd($nsqdAddrArr)
    pub的时候连接nsq,你也可以利用此函数做健康检查

  • closeNsqdConnection()
    关闭nsq的连接

  • publish($topic,$msg)
    消息发送

  • deferredPublish($topic,$msg,$msec)
    延迟消息发送

  • subscribe($nsq_lookupd,$config,$callback)
    消息订阅

Message 类方法与属性:

  • timestamp
    消息时间戳

  • attempts
    消息的重试次数,(从1开始)

  • message_id
    消息id

  • payload
    消息内容

  • finish($bev,$msg->message_id)
    主动的 ack消息方法

  • touch($bev,$msg->message_id)
    如果你消息执行太长,可以利用次函数告知nsq 你还活着,一般用于执行频率比较规律的场景。

Tips :

1.如果callback内需要外部变量,可以采用以下use的写法:

$nsq->subscribe($nsq_lookupd, $config, function($msg,$bev) use ($you_variable){

    echo $msg->payload;
    echo $msg->attempts;
    echo $msg->message_id;
    echo $msg->timestamp;


});

2.消息重试,只要抛异常就可以,切记不要陷入死循环,超过自己觉得可以的次数 要return:

subscribe($nsq_lookupd, $config, function($msg){ 
    try{
        echo $msg->payload . " " . "attempts:".$msg->attempts."\n";
        //do something
    }catch(Exception $e){

        if($msg->attempts < 3){
            //the message will be retried after you configure retry_delay_time 
            throw new Exception(""); 
        }else{
            echo $e->getMessage();
            return;
        }
    }

});

3.如果你想增加 客户端的心跳时间与消息的超时时间 :

 第一步 在nsqd启动时要加入相关参数,这个参数是最大的限制,比如--max-heartbeat-interval=1m30s 心跳时间最大不能超过1分30秒:

      nsqd --lookupd-tcp-address=127.0.0.1:4160 --max-heartbeat-interval=1m30s --msg-timeout=10m30s

第二步  因为第一步是指定最大时间,所以还需要第二步在客户端指定所需要的值 具体请看 example目录中的identify开头的文件例子。

4.如果你想增强消费能力,可以加大rdy参数

5.你可以用supervisor管理,但是因为是多进程消费,你需要在supervisor job的配置文件 添加:

    stopasgroup=true
    killasgroup=true

Changes

  • 3.0

    • 修复因libevent 超过4096消息被截断问题

    • 增加identify指令功能,可以增加客户端心跳时间 与 消息超时时间

  • 2.4.0

    • 修复 pub bug

    • 修复 sub coredump

    • 修覆盖 touch bug

    • 增加等待,当刚初始化的topic没消息时

  • 2.3.1

    • pub支持域名

    • 修复 pub coredump

php-nsq pub例子:sub例子:Nsq 类方法:Message 类方法与属性:Tips :Changes 官网

https://github.com/yunnian/php-nsq

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


memcached-session-manager 将session存储到memchached实现方案时。他主要功能是修改tomcat的session存储机制,使之能够把session序列化存放到memcached中。
Tomcat Native 这个项目可以让 Tomcat 使用 Apache 的 apr 包来处理包括文件和网络IO操作,以提升性能。
EasyTomcat 是一个用来帮助简化 Tomcat 和MySQL 管理的系统,你可以启动、停止和配置 Tomcat和MySQL
riak-session-manager 是使用 Riak 来存储Tomcat session 信息的项目。 配置方法:
tomcat-redis-session-manager 是一个用来将 Tomcat 的 Session 数据存储在 Redis 库中的项目。
这是一款在 Oracle 的 JDeveloper 开发环境下管理Tomcat 的插件,如下图所示:
扩展Tomcat 6.x,使用redis存放session信息!是一个Eclipse项目,最好用EGit来Clone(因为里面有个中文文件名的说明文件).
dhcpcd 是一个兼容 RFC2131的DHCP客户端程序,支持DHCP的全部功能并且体积非常小,只有差不多 46k。
phpDHCPAdmin 是一个基于 Web 的动态主机配置协议(DHCP Daemon)的管理工具,可单独设置组、用户级别;PXE、多子网;空间租赁管理功能。可对数据进行可视化展示、分类。适合大规模的 dhcpd 环境管理。
JDHCP 项目的目的是为 Java 应用增加简单操作 DHCP 协议的方法,DHCP是动态主机配置协议的简称。使用这个API可以轻松的发送、接收和解析DHCP消息,可用于编写DHCP的客户端、服务器端应用。
DHCP服务器为客户端计算机分配IP地址,通常应用在企业网络中以减小配置成本,所有客户端的IP地址都保存在服务器端。
dhcp4java是一个用于操作DHCP信息包的纯Java类库。适用于DHCP服务器, DHCP客户端或DHCP转发。
dhcp-forwarder 是一个 DHCP 中继代理,它将在不同的子网广播域中转发 DHCP 广播信息。
不用看都知道是一个开源的 DHCP 服务器。 Open DHCP Server is a multi-subnet DHCP server. It supports both dynamic and
GAdmin-ProFTPD是一个基于GTK的可视化DHCP服务端管理工具。 更多的屏幕截图请看:http://mange.dynalias.org/linux/gadmin-dhcpd/screenshots/
Dual DHCP DNS Server 是一个提供 DHCP 和 DNS 服务的服务器软件,每一个功能都可以单独启用或者关闭。
Dhcpy6d 是一个开源的 DHCPv6 的服务器软件,相当于为 IPv6 客户端提供 DHCP 协议。
DHCP as a filesystem,要求 FUSE 的支持,使用 Go 语言开发。 安装: GOFUSE=github.com/hanwen/go-fuse
简易图床支持 HDFS 本地存储远端存储等。 Status Esay Graph bed Use HDFS Use Qiniu Use upyun Use Local
一个使用python开发的简单好用的 PXE (DHCP/TFTP/HTTP) 服务器,同时支持netboot、dhcp-