Reliable ZeroMQ Pub Sub

After Friday's ZeroMQ Pub Sub post, Jérôme Petazzoni taught me a bit more about ZeroMQ.

@technoweenie Once a PUB/SUB socket is connected, it IS reliable. Use socket identity to be sure not to lose any message on reconnects.less than a minute ago via web Favorite Retweet Reply

Wow, so even ZeroMQ PUB sockets queue messages to subscribers. It looks like they get buffered in memory. You can configure the ZMQ_HWM option (ZMQ::HWM in ruby) to limit how many messages will be buffered. You can also set the ZMQ_SWAP option to set the size of an on-disk swap for messages that cross the high water mark.

Armed with this bit of knowledge, I updated the publisher script to set an identity of channel-username:

context = ZMQ::Context.new
chan    = ARGV[0]
user    = ARGV[1]
pub     = context.socket ZMQ::PUB
pub.setsockopt ZMQ::IDENTITY, "#{chan}-#{user}"

pub.bind 'tcp://*:5555'

To really highlight reliable pub/sub, I wrote a custom publisher script that just pings every second.

require 'zmq'
context = ZMQ::Context.new
pub = context.socket ZMQ::PUB
pub.setsockopt ZMQ::IDENTITY, 'ping-pinger'
pub.bind 'tcp://*:5555'

i=0
loop do
  pub.send "ping pinger #{i+=1}" ; sleep 1
end

Updating the subscriber should've been just as simple, but the while statement didn't allow for good error handling:

while msg = STDIN.gets
  msg.strip!
  pub.send "#{chan} #{user} #{msg}"
end

Any interruption in the process would lose a single message. I instead used a method:

def process(line = nil)
  line ||= @socket.recv
  chan, user, msg = line.split ' ', 3
  puts "##{chan} [#{user}]: #{msg}"
  true
rescue SignalException
  process(line) if line
  false
end

This way any exception doesn't interrupt the processing of a message. Here's what the loop looks like now:

subscriber = Subscriber.new ARGV[0]
subscriber.connect ZMQ::Context.new, 'tcp://127.0.0.1:5555'
subscriber.subscribe_to 'rubyonrails', 'ruby-lang', 'ping'

loop do
  unless subscriber.process
    subscriber.close
    puts "Quitting..."
    exit
  end
end

This is what the console output looks like:

#ping [pinger]: 21
#ping [pinger]: 22
^CQuitting...
ruby-1.9.2-p180 ~p/zcollab/pubsub git:(master) ✗$ ruby sub.rb abc
#ping [pinger]: 23
#ping [pinger]: 24

I still run into rare cases where the Interrupt is raised inside the socket.recv call. For a more advanced script, you could also try trapping signals to control how your script exits.

You can comment on this through the HN discussion...