ZeroMQ subscribe with xsub

I had a scenario where I wanted multiple publisher to broadcast information to a single subscribe. This subscriber would make a best effort to dedupe the information and present a "realtime" view of what is happening across multiple nodes.

Reliability wasn't the primary concern, back chatter is not wanted, and publishers do not care if anyone is listening. Functionally, this is a validation that the publisher is online and functioning as expected.

Traditional PUB SUB would require the subscribe to connect to the publisher and listen for events. In this scenario, I need many publishers to connect to a single subscribe, enter XSUB.

XSUB is commonly paired with XPUB to provide a many pubs to many subs proxy configuration. XSUB is an extended subscriber socket and allow a subscriber to bind to an address. PUB natively supports connecting to a remote address. Armed with these two, it's possible to build a multi-pub to single-sub topology.

The publisher will connect to the remote subscribe and begin broadcasting messages:

package main

import (  
    "log"
    "time"

    zmq "github.com/pebbe/zmq4"
)

func main() {  
    pub, _ := zmq.NewSocket(zmq.PUB)
    err := pub.Connect("tcp://127.0.0.1:9002")
    if err != nil {
        log.Fatal(err)
    }
    log.Println("zmq publisher connected to 127.0.0.1:9002")
    defer pub.Close()

    for {
        log.Println("sending message")
        pub.SendMessage("test", "testing")
        pub.SendMessage("greet", "hello world")
        time.Sleep(time.Second)
    }
}

The subscribe will create an XSUB socket and bind to and address. The main sticking point is that XSUB does not support setting socket options for subscribing to message filters. Based on the code in the xsub source, we can simply send a message that subscribes to the appropriate channel.

package main

import (  
    "log"

    zmq "github.com/pebbe/zmq4"
)

func main() {  
    // create an XSUB socket and bind it to a port
    xsub, _ := zmq.NewSocket(zmq.XSUB)
    err := xsub.Bind("tcp://*:9002")
    if err != nil {
        log.Fatal(err)
    }
    log.Println("zmq xsub listening on *:9002")
    defer xsub.Close()

    // subscribe the XSUB socket to all messages
    _, err = xsub.SendBytes([]byte{0x01}, 0)
    if err != nil {
        log.Fatal(err)
    }

    // process incoming messages
    for {
        msg, e := xsub.RecvMessageBytes(0)
        if e != nil {
            log.Println(e)
            continue
        }

        msgType := string(msg[0])
        msgBody := string(msg[1])

        switch msgType {
        case "test":
            {
                log.Println("all", "test", msgBody)
            }
        case "greet":
            {
                log.Println("all", "greet", msgBody)
            }
        }
    }
}

In the above code we want to subscribe to all channels, so we simply short circuit this by a sending a single message with a 0x01 payload.

_, err = xsub.SendBytes([]byte{0x01}, 0)  
if err != nil {  
    log.Fatal(err)
}

That's all there is to it!

comments powered by Disqus