使用luarocks安装这些库:lzmq, pl.lapp
1、如不需要查看发送或接受消息的时间,可以使用lzmq.timer
将函数timeStamp()及其使用的地方删除即可
2、为简化验证,也可不使用命令行工具pl.lapp,将lapp部分删除,并将address用固定地址替换掉
publisher.lua
--publisher.lua
local zmq = require("lzmq")
local timer = require("lzmq.timer")
local lapp = require("pl.lapp")
local zassert = zmq.assert
local args = lapp[[
-a,--address (default "tcp://127.0.0.1:10000") Config module to load.
]]
local context = zmq.context()
local address = args.address
local zmq_pub,err = context:socket(zmq.PUB,{bind = address})
zassert(zmq_pub,err)
print("Create publisher server: ",address)
-- get the time stamp string wih format: %Y-%m-%d %H:%M:%S.ms
function timeStamp()
local ms = timer.absolute_time() -- get ms use lzmp.timmer.absolute_time()
local s = math.floor(ms/1000) -- second
local date = os.date("%Y-%m-%d %H:%M:%S.", s) -- second to data
local sub_ms = ms-s*1000 -- just ms
return date..tostring(sub_ms)
end
-- publish message
while true do
io.write("Publisher> ") -- prompt keyword: "Publisher> "
io.flush()
local cmd = io.read("*line") -- read message from cmd line
if (cmd and #cmd>0) then
local ret,err=zmq_pub:send("101",zmq.SNDMORE) -- zmq.SNDMORE: 表示发送的消息由多个消息帧组成
local ret,err=zmq_pub:send(cmd.."\r\n")
if (ret) then
print(timeStamp(),"[SEND]:",cmd)
else
print("[ERROR]:",err)
end
end
end
subscriber.lua
-- subscriber.lua
local zmq = require("lzmq")
local timer = require("lzmq.timer")
local lapp = require("pl.lapp")
local zassert = zmq.assert
local zpoller = require("lzmq.poller")
local args = lapp[[
-a,--address (default "tcp://127.0.0.1:10000") Config module to load.
]]
local context = zmq.context();
local address = args.address
local zmq_sub, err = context:socket{zmq.SUB, subscribe = "101"; connect = address; }
zassert(zmq_sub,err);
print("[Subscriber]: ","Create subscriber with address : ",address);
function timeStamp()
local ms = timer.absolute_time()
local s = math.floor(ms/1000)
local date = os.date("%Y-%m-%d %H:%M:%S.", s)
local sub_ms = ms-s*1000
return date..tostring(sub_ms)
end
--poller()解决一个线程中有多个sokect同时需要收发数据时,不用在send()或者recv()时阻塞socket
--在recv()端接受信息的用zmq.POLLIN
--在send()端发送消息的用zmq.POLLOUT
local poller = zpoller.new(2)
--此处暂时只能用闭包函数,尝试单独写函数来实现时出错
poller:add(zmq_sub, zmq.POLLIN, function()
io.write("Receiver> ")
io.flush()
print(timeStamp().."\t[REV:]\t"..zmq_sub:recv())
end)
poller:start()