Zero  MQ:   Distributed Messaging
1 Zero  MQ Examples
1.1 Hello World in Zero  MQ
1.2 Weather Reporting in Zero  MQ
1.3 Divide and Conquer in Zero  MQ
2 Zero  MQ API
zmq-available?
zmq-version
2.1 Managing Zero  MQ Sockets
zmq-socket
zmq-socket?
zmq-close
zmq-closed?
zmq-closed-evt
zmq-list-endpoints
zmq-get-option
zmq-set-option
zmq-list-options
zmq-connect
zmq-bind
zmq-disconnect
zmq-unbind
zmq-subscribe
zmq-unsubscribe
2.2 Sending and Receiving Zero  MQ Messages
zmq-message?
zmq-message
zmq-send-message
zmq-recv-message
zmq-send
zmq-send*
zmq-recv
zmq-recv-string
zmq-recv*
zmq-proxy
3 Zero  MQ Unsafe Functions
zmq-unsafe-connect
zmq-unsafe-bind
4 Zero  MQ Requirements
7.3

ZeroMQ: Distributed Messaging

Ryan Culpepper <ryanc@racket-lang.org>

This library provides bindings to the ZeroMQ (or “0MQ”, or “ZMQ”) distributed messaging library.

 (require zeromq) package: zeromq-r-lib

This package is distributed under the GNU Lesser General Public License (LGPL). As a client of this library you must also comply with the libzmq license.

1 ZeroMQ Examples

This section contains examples of using this library adapted from the 0MQ Guide.

1.1 Hello World in ZeroMQ

This example is adapted from Ask and Ye Shall Receive, which illustrates REP-REQ communication.

Here is the “hello world” server:

> (define responder-thread
    (thread
      (lambda ()
        (define responder (zmq-socket 'rep))
        (zmq-bind responder "tcp://*:5555")
        (let loop ()
          (define msg (zmq-recv-string responder))
          (printf "Server received: ~s\n" msg)
          (zmq-send responder "World")
          (loop)))))

The responder socket could have been created and connected to its address in one call, as follows:

(define responder (zmq-socket 'rep #:bind "tcp://*:5555"))

Here is the “hello world” client:

> (define requester (zmq-socket 'req #:connect "tcp://localhost:5555"))
> (for ([request-number (in-range 3)])
    (zmq-send requester "Hello")
    (define response (zmq-recv-string requester))
    (printf "Client received ~s (#~s)\n" response request-number))

Server received: "Hello"

Client received "World" (#0)

Server received: "Hello"

Client received "World" (#1)

Server received: "Hello"

Client received "World" (#2)

> (zmq-close requester)

1.2 Weather Reporting in ZeroMQ

This example is adapted from Getting the Message Out, which illustrates PUB-SUB communication.

Here’s the weather update server:

> (define (zip->string zip) (~r zip #:precision 5 #:pad-string "0"))
> (define (random-zip) (random 100000))
> (define publisher-thread
    (thread
      (lambda ()
        (define publisher (zmq-socket 'pub #:bind "tcp://*:5556"))
        (let loop ()
          (define zip (zip->string (random-zip)))
          (define temp (- (random 215) 80))
          (define rhumid (+ (random 50) 10))
          (zmq-send publisher (format "~a ~a ~a" zip temp rhumid))
          (loop)))))

Here is the weather client:

> (define subscriber (zmq-socket 'sub #:connect "tcp://localhost:5556"))
> (define myzip (zip->string (random-zip)))
> (printf "Subscribing to ZIP code ~a only\n" myzip)

Subscribing to ZIP code 10001 only

> (zmq-subscribe subscriber myzip)
> (define total-temp
    (for/sum ([update-number (in-range 10)])
      (define msg (zmq-recv-string subscriber))
      (define temp (let ([in (open-input-string msg)]) (read in) (read in)))
      (printf "Client got temperature update #~s: ~s\n" update-number temp)
      temp))

Client got temperature update #0: 40

Client got temperature update #1: 53

Client got temperature update #2: 130

Client got temperature update #3: 22

Client got temperature update #4: -77

Client got temperature update #5: -4

Client got temperature update #6: 85

Client got temperature update #7: -2

Client got temperature update #8: 100

Client got temperature update #9: 96

> (printf "Average temperature for ZIP code ~s was ~s\n"
          myzip (~r (/ total-temp 10)))

Average temperature for ZIP code "10001" was "44.3"

> (zmq-close subscriber)

1.3 Divide and Conquer in ZeroMQ

This example is adapted from Divide and Conquer, which illustrates PUSH-PULL communication.

Here’s the ventilator:

; Task ventilator
; Binds PUSH socket to tcp://localhost:5557
; Sends batch of tasks to workers via that socket
> (define (ventilator go-sema)
    (define sender (zmq-socket 'push #:bind "tcp://*:5557"))
    (define sink (zmq-socket 'push #:connect "tcp://localhost:5558"))
    (semaphore-wait go-sema)
    (zmq-send sink "0") ; message 0 signals start of batch
    (define total-msec
      (for/fold ([total 0]) ([task-number (in-range 100)])
        (define workload (add1 (random 100)))
        (zmq-send sender (format "~s" workload))
        (+ total workload)))
    (printf "Total expected cost: ~s msec\n" total-msec)
    (zmq-close sender)
    (zmq-close sink))

Here are the workers:

; Task worker
; Connects PULL socket to tcp://localhost:5557
; Collects workloads from ventilator via that socket
; Connects PUSH socket to tcp://localhost:5558
; Sends results to sink via that socket
> (define (worker)
    (define receiver (zmq-socket 'pull #:connect "tcp://localhost:5557"))
    (define sender (zmq-socket 'push #:connect "tcp://localhost:5558"))
    (let loop ()
      (define s (zmq-recv-string receiver))
      (sleep (/ (read (open-input-string s)) 1000)) ; do the work
      (zmq-send sender "")
      (loop)))

Here is the sink:

; Task sink
; Binds PULL socket to tcp://localhost:5558
; Collects results from workers via that socket
> (define (sink)
    (define receiver (zmq-socket 'pull #:bind "tcp://*:5558"))
    (void (zmq-recv receiver)) ; start of batch
    (time (for ([task-number (in-range 100)])
            (void (zmq-recv receiver))))
    (zmq-close receiver))

Now we create a sink thread, a ventilator thread, and 10 worker threads. We give them a little time to connect to each other, then we start the task ventilator and wait for the sink to collect the results.

> (let ()
    (define go-sema (make-semaphore 0))
    (define sink-thread (thread sink))
    (define ventilator-thread (thread (lambda () (ventilator go-sema))))
    (define worker-threads (for/list ([i 10]) (thread worker)))
    ; Give the threads some time to connect...
    (begin (sleep 1) (semaphore-post go-sema))
    (void (sync sink-thread)))

Total expected cost: 5291 msec

cpu time: 292 real time: 623 gc time: 0

Note that to achieve the desired parallel speedup here, it’s important to give all of the worker threads time to connect their receiver sockets—the (sleep 1) is a blunt way of doing this. Otherwise, the first thread to connect might end up doing all of the work—an example of the “slow joiner” problem (see the end of Divide and Conquer for more details).

2 ZeroMQ API

procedure

(zmq-available?)  boolean?

Returns #t if the ZeroMQ library (libzmq) was loaded successfully, #f otherwise. If the result is #f, calling zmq-socket will raise an exception. See ZeroMQ Requirements.

Added in version 1.1 of package zeromq-r-lib.

procedure

(zmq-version)  
(or/c (list/c exact-nonnegative-integer?
              exact-nonnegative-integer?
              exact-nonnegative-integer?)
      #f)
Returns the version of the ZeroMQ library (libzmq) if it was loaded successfully, #f otherwise. The version is represented by a list of three integers—for example, '(4 3 2).

Added in version 1.1 of package zeromq-r-lib.

2.1 Managing ZeroMQ Sockets

procedure

(zmq-socket type    
  [#:identity identity    
  #:bind bind-endpoints    
  #:connect connect-endpoints    
  #:subscribe subscriptions])  zmq-socket?
  type : 
(or/c 'pair 'pub 'sub 'req 'rep 'dealer 'router
      'pull 'push 'xpub 'xsub 'stream)
  identity : (or/c bytes? #f) = #f
  bind-endpoints : (or/c string? (listof string?)) = null
  connect-endpoints : (or/c string? (listof string?)) = null
  subscriptions : (or/c bytes? string? (listof (or/c bytes? string?)))
   = null
Creates a new ZeroMQ socket of the given socket type and initializes it with identity, subscriptions, bind-endpoints, and connect-endpoints (in that order).

See the zmq_socket documentation for brief descriptions of the different types of sockets, and see the 0MQ Guide for more detailed explanations.

A ZeroMQ socket acts as a synchronizable event (evt?) that is ready when zmq-recv-message would receive a message without blocking; the synchronization result is the received message (zmq-message?). If the socket is closed, it is never ready for synchronization; use zmq-closed-evt to detect closed sockets.

Unlike libzmq, zmq-socket creates sockets with a short default “linger” period (ZMQ_LINGER), to avoid blocking the Racket VM when the underlying context is shut down. The linger period can be changed with zmq-set-option.

procedure

(zmq-socket? v)  boolean?

  v : any/c
Returns #t if v is a ZeroMQ socket, #f otherwise.

procedure

(zmq-close s)  void?

  s : zmq-socket?
Close the socket. Further operations on the socket will raise an error, except that zmq-close may be called on an already-closed socket.

procedure

(zmq-closed? s)  boolean?

  s : zmq-socket?
Returns #t if the socket is closed, #f otherwise.

procedure

(zmq-closed-evt s)  evt?

  s : zmq-socket?
Returns a synchronizable event (evt?) that is ready for synchronization when (zmq-closed? s) would return #t. The synchronization result is the event itself.

procedure

(zmq-list-endpoints s mode)  (listof string?)

  s : zmq-socket?
  mode : (or/c 'bind 'connect)
List the endpoints the socket is bound or connected to (when mode is 'bind or 'connect, respectively).

procedure

(zmq-get-option s option)  (or/c exact-integer? bytes?)

  s : zmq-socket?
  option : symbol?

procedure

(zmq-set-option s option value)  void?

  s : zmq-socket?
  option : symbol?
  value : (or/c exact-integer? bytes?)
Gets or sets a socket option; see the API documentation for zmq_getsockopt and zmq_setsockopt, respectively. An option’s symbol is obtained from the name of the corresponding C constant by removing the ZMQ_ prefix and converting it to lower-case. For example, ZMQ_IPV6 becomes 'ipv6 and ZMQ_LAST_ENDPOINT becomes 'last_endpoint. Not all options are supported. See also zmq-list-options.

procedure

(zmq-list-options filter)  (listof symbol?)

  filter : (or/c 'get 'set)
Lists the options that this library supports for zmq-get-option or zmq-set-option when filter is 'get or 'set, respectively.

procedure

(zmq-connect s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?

procedure

(zmq-bind s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?
Connect or bind the socket s to the given endpoint(s).

See the transport documentation pages (tcp, pgm, ipc, inproc, vmci, udp) for more information about transports and their endpoint notations.

If endpoint refers to a filesystem path or network address, access is checked against (current-security-guard). This library cannot parse and check all endpoint formats supported by libzmq; if endpoint is not in a supported format, an exception is raised with the message “invalid endpoint or unsupported endpoint format.” Clients may skip the parsing and access control check by using zmq-unsafe-connect or zmq-unsafe-bind.

procedure

(zmq-disconnect s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?

procedure

(zmq-unbind s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?
Disconnect or unbind the socket s from the given endpoint(s).

Note that in some cases endpoint must be more specific than the argument to zmq-bind or zmq-connect. For example, see the section labeled “Unbinding wild-card address from a socket” in zmq_tcp.

procedure

(zmq-subscribe s topic ...)  void?

  s : zmq-socket?
  topic : (or/c bytes? string?)

procedure

(zmq-unsubscribe s topic ...)  void?

  s : zmq-socket?
  topic : (or/c bytes? string?)
Adds or removes topic from a SUB ('sub) socket’s subscription list. A SUB socket starts out with no subscriptions, and thus receives no messages.

A topic matches a message if topic is a prefix of the message. The empty topic accepts all messages.

2.2 Sending and Receiving ZeroMQ Messages

A ZeroMQ message consists of one or more frames (represented by byte strings). The procedures in this library support sending and receiving only complete messages (as opposed to the frame-at-a-time operations in the libzmq C library).

procedure

(zmq-message? v)  boolean?

  v : any/c
Returns #t if v is a ZeroMQ message value, #f otherwise.

procedure & match pattern

(zmq-message frames)  zmq-message?

  frames : (or/c bytes? string? (listof (or/c bytes? string?)))
Returns a ZeroMQ message value consisting of the given frames. Strings are automatically coerced to bytestrings using string->bytes/utf-8, and if a single string or bytestring is given, it is converted to a singleton list.

When used a match pattern, the frames subpattern is always matched against a list of bytestrings.

Examples:
> (define msg (zmq-message "hello world"))
> (match msg [(zmq-message frames) frames])

'(#"hello world")

In libzmq version 4.3.2, the draft (unstable) API has additional operations on messages to support the draft socket types; for example, a message used with a CLIENT or SERVER socket has a routing-id field. Support will be added to zmq-message when the corresponding draft APIs become stable.

Added in version 1.1 of package zeromq-r-lib.

procedure

(zmq-send-message s msg)  void?

  s : zmq-socket
  msg : zmq-message?
Sends the message msg on socket s.

Added in version 1.1 of package zeromq-r-lib.

procedure

(zmq-recv-message s)  zmq-message?

  s : zmq-socket?
Receives a message from socket s.

Added in version 1.1 of package zeromq-r-lib.

procedure

(zmq-send s msg-frame ...+)  void?

  s : zmq-socket?
  msg-frame : (or/c bytes? string?)
Sends a message on socket s. The message has as many frames as msg-frame arguments, with at least one frame required.

procedure

(zmq-send* s msg)  void?

  s : zmq-socket?
  msg : (non-empty-listof (or/c bytes? string?))
Sends the message msg on socket s, where msg consists of a non-empty list of frames.

procedure

(zmq-recv s)  bytes?

  s : zmq-socket?

procedure

(zmq-recv-string s)  string?

  s : zmq-socket?
Receives a one-frame message from the socket s and returns the single frame as a byte string or character string, respectively.

If a multi-frame message is received from s, an error is raised. (The message is still consumed.)

procedure

(zmq-recv* s)  (listof bytes?)

  s : zmq-socket?
Receives a message from the socket s. The message is represented as a list of byte strings, one for each frame.

procedure

(zmq-proxy sock1    
  sock2    
  [#:capture capture    
  #:other-evt other-evt])  any
  sock1 : zmq-socket?
  sock2 : zmq-socket?
  capture : (-> zmq-socket? zmq-message? any) = void
  other-evt : evt? = never-evt
Runs a proxy connecting sock1 and sock2; a loop reads a message from either socket and sends it to the other. For each message received, the capture procedure is called on the receiving socket and the received message, and then the message is forwarded to the other socket.

This procedure returns only when the proxy is finished, either because one of the sockets is closed—in which case (void) is returned—or because other-evt became ready for synchronization—in which case other-evt’s synchronization result is returned. The procedure might also raise an exception due to a failed send or receive or if capture or other-evt raise an exception.

Added in version 1.1 of package zeromq-r-lib.

3 ZeroMQ Unsafe Functions

The functions provided by this module are unsafe.

 (require zeromq/unsafe) package: zeromq-r-lib

procedure

(zmq-unsafe-connect s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?

procedure

(zmq-unsafe-bind s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?
Like zmq-connect and zmq-bind, but do not attempt to parse the endpoint arguments and perform security guard checks.

These functions are unsafe, not in the sense that misuse is likely to cause memory corruption, but in the sense that they do not respect the security guard mechanism.

4 ZeroMQ Requirements

This library requires the libzmq foreign library to be installed in either the operating system’s default library search path or in Racket’s extended library search path (see get-lib-search-dirs).

On Linux, libzmq.so.5 is required. On Debian-based systems, it is available from the libzmq5 package. On RedHat-based systems, it is available from the zeromq package.

On Mac OS, libzmq.5.dylib is required.
  • With Homebrew: Run brew install zeromq. The library will be installed in /usr/local/lib, which is in the operating system’s default search path.

  • With MacPorts: Install the zmq port. The library will be instaled in /opt/local/lib, which is not in the operating system’s default search path. Manually copy or link the library into one of the directories returned by (get-lib-search-dirs).

On Windows, libzmq.dll is required; it is automatically provided by the zeromq-win32-{i386,x86_64} package.