Reliable ZeroMQ Pub Sub
After Friday's ZeroMQ Pub Sub post, Jérôme Petazzoni taught me a bit more about ZeroMQ.
@technoweenie Once a PUB/SUB socket is connected, it IS reliable. Use socket identity to be sure not to lose any message on reconnects.
Wow, so even ZeroMQ PUB sockets queue messages to subscribers. It looks
like they get buffered in memory. You can configure the ZMQ_HWM
option (ZMQ::HWM in ruby) to limit how many messages will be buffered. You can
also set the ZMQ_SWAP option to set the size of an on-disk
swap for messages that cross the high water mark.
Armed with this bit of knowledge, I updated the publisher
script to set an identity of channel-username:
context = ZMQ::Context.new
chan = ARGV[0]
user = ARGV[1]
pub = context.socket ZMQ::PUB
pub.setsockopt ZMQ::IDENTITY, "#{chan}-#{user}"
pub.bind 'tcp://*:5555'
To really highlight reliable pub/sub, I wrote a custom publisher script that just pings every second.
require 'zmq'
context = ZMQ::Context.new
pub = context.socket ZMQ::PUB
pub.setsockopt ZMQ::IDENTITY, 'ping-pinger'
pub.bind 'tcp://*:5555'
i=0
loop do
pub.send "ping pinger #{i+=1}" ; sleep 1
end
Updating the subscriber should've been just as simple, but the while
statement didn't allow for good error handling:
while msg = STDIN.gets
msg.strip!
pub.send "#{chan} #{user} #{msg}"
end
Any interruption in the process would lose a single message. I instead used a method:
def process(line = nil)
line ||= @socket.recv
chan, user, msg = line.split ' ', 3
puts "##{chan} [#{user}]: #{msg}"
true
rescue SignalException
process(line) if line
false
end
This way any exception doesn't interrupt the processing of a message. Here's what the loop looks like now:
subscriber = Subscriber.new ARGV[0]
subscriber.connect ZMQ::Context.new, 'tcp://127.0.0.1:5555'
subscriber.subscribe_to 'rubyonrails', 'ruby-lang', 'ping'
loop do
unless subscriber.process
subscriber.close
puts "Quitting..."
exit
end
end
This is what the console output looks like:
#ping [pinger]: 21
#ping [pinger]: 22
^CQuitting...
ruby-1.9.2-p180 ~p/zcollab/pubsub git:(master) ✗$ ruby sub.rb abc
#ping [pinger]: 23
#ping [pinger]: 24
I still run into rare cases where the Interrupt is raised inside the
socket.recv call. For a more advanced script, you could also try
trapping signals to control how your script exits.
You can comment on this through the HN discussion...