Zero  MQ:   Distributed Messaging
1 Zero  MQ Examples
1.1 Hello World in Zero  MQ
1.2 Weather Reporting in Zero  MQ
2 Zero  MQ Functions
zmq-socket?
zmq-socket
zmq-close
zmq-closed?
zmq-list-endpoints
zmq-get-option
zmq-set-option
zmq-list-options
zmq-connect
zmq-bind
zmq-disconnect
zmq-unbind
zmq-subscribe
zmq-unsubscribe
zmq-send
zmq-send*
zmq-recv
zmq-recv-string
zmq-recv*
3 Zero  MQ Unsafe Functions
zmq-connect
zmq-bind
7.0

ZeroMQ: Distributed Messaging

Ryan Culpepper <[email protected]>

This library provides bindings to the ZeroMQ (or “0MQ”, or “ZMQ”) distributed messaging library.

 (require zeromq) package: zeromq-r-lib

This package is distributed under the GNU Lesser General Public License (LGPL). As a client of this library you must also comply with the libzmq license.

1 ZeroMQ Examples

This section contains examples of using this library adapted from the 0MQ Guide.

1.1 Hello World in ZeroMQ

This example is adapted from Ask and Ye Shall Receive, which illustrates REP-REQ communication.

Here is the “hello world” server:

(define responder (zmq-socket 'rep))
(zmq-bind responder "tcp://*:5555")
(let loop ()
  (define msg (zmq-recv-string responder))
  (printf "Server received: ~s\n" msg)
  (zmq-send responder "World")
  (loop))

The first two lines of the server could also be combined into one, as follows:

(define responder (zmq-socket 'rep #:bind "tcp://*:5555"))

Here is the “hello world” client:

(define requester (zmq-socket 'req #:connect "tcp://localhost:5555"))
(for ([request-number (in-range 10)])
  (printf "Client sending Hello #~s\n" request-number)
  (zmq-send requester "Hello")
  (define response (zmq-recv-string requester))
  (printf "Client received ~s (#~s)\n" response request-number))
(zmq-close requester)

1.2 Weather Reporting in ZeroMQ

This example is adapted from Getting the Message Out, which illustrates PUB-SUB communication.

Here’s the weather update server:

(define publisher (zmq-socket 'pub #:bind "tcp://*:5556"))
(let loop ()
  (define zip (~r (random 100000) #:precision 5 #:pad-string "0"))
  (define temp (- (random 215) 80))
  (define rhumid (+ (random 50) 10))
  (zmq-send publisher (format "~a ~a ~a" zip temp rhumid))
  (loop))

Here is the weather client:

(define subscriber (zmq-socket 'sub #:connect "tcp://localhost:5556"))
(define myzip (~r (random 100000) #:precision 5 #:pad-string "0"))
(printf "Subscribing to ZIP code ~a only\n" myzip)
(zmq-subscribe subscriber (format "~a " myzip))
(define total-temp
  (for/sum ([update-number (in-range 100)])
    (define msg (zmq-recv-string subscriber))
    (define temp (let ([in (open-input-string msg)]) (read in) (read in)))
    (printf "Client got temperature update #~s: ~s\n" update-number temp)
    temp))
(printf "Average temperature for ZIP code ~s was ~s\n"
        myzip (~r (/ total-temp 100)))
(zmq-close subscriber)

2 ZeroMQ Functions

procedure

(zmq-socket? v)  boolean?

  v : any/c
Returns #t if v is a ZeroMQ socket, #f otherwise.

procedure

(zmq-socket type    
  [#:identity identity    
  #:bind bind-endpoints    
  #:connect connect-endpoints    
  #:subscribe subscriptions])  zmq-socket?
  type : 
(or/c 'pair 'pub 'sub 'req 'rep 'dealer 'router
      'pull 'push 'xpub 'xsub 'stream)
  identity : (or/c bytes? #f) = #f
  bind-endpoints : (or/c string? (listof string?)) = null
  connect-endpoints : (or/c string? (listof string?)) = null
  subscriptions : (or/c bytes? string? (listof (or/c bytes? string?)))
   = null
Creates a new ZeroMQ socket of the given socket type and initializes it with identity, subscriptions, bind-endpoints, and connect-endpoints (in that order).

See the zmq_socket documentation for brief descriptions of the different types of sockets, and see the 0MQ Guide for more detailed explanations.

Unlike libzmq, zmq-socket creates sockets with a short default “linger” period (ZMQ_LINGER), to avoid blocking the Racket VM when the underlying context is shut down. The linger period can be changed with zmq-set-option.

procedure

(zmq-close s)  void?

  s : zmq-socket?
Close the socket. Further operations on the socket will raise an error, except that zmq-close may be called on an already-closed socket.

procedure

(zmq-closed? s)  boolean?

  s : zmq-socket?
Returns #t if the socket is closed, #f otherwise.

procedure

(zmq-list-endpoints s mode)  (listof string?)

  s : zmq-socket?
  mode : (or/c 'bind 'connect)
List the endpoints the socket is bound or connected to (when mode is 'bind or 'connect, respectively).

procedure

(zmq-get-option s option)  (or/c exact-integer? bytes?)

  s : zmq-socket?
  option : symbol?

procedure

(zmq-set-option s option value)  void?

  s : zmq-socket?
  option : symbol?
  value : (or/c exact-integer? bytes?)
Gets or sets a socket option; see the API documentation for zmq_getsockopt and zmq_setsockopt, respectively. An option’s symbol is obtained from the name of the corresponding C constant by removing the ZMQ_ prefix and converting it to lower-case. For example, ZMQ_IPV6 becomes 'ipv6 and ZMQ_LAST_ENDPOINT becomes 'last_endpoint. Not all options are supported. See also zmq-list-options.

procedure

(zmq-list-options filter)  (listof symbol?)

  filter : (or/c 'get 'set)
Lists the options that this library supports for zmq-get-option or zmq-set-option when filter is 'get or 'set, respectively.

procedure

(zmq-connect s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?

procedure

(zmq-bind s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?
Connect or bind the socket s to the given endpoint(s).

See the transport documentation pages (tcp, pgm, ipc, inproc, vmci, udp) for more information about transports and their endpoint notations.

If endpoint refers to a filesystem path or network address, access is checked against (current-security-guard). This library cannot parse and check all endpoint formats supported by libzmq; if endpoint is not in a supported format, an exception is raised with the message “invalid endpoint or unsupported endpoint format.” Clients may skip the parsing and access control check by using zmq-unsafe-connect or zmq-unsafe-bind.

procedure

(zmq-disconnect s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?

procedure

(zmq-unbind s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?
Disconnect or unbind the socket s from the given endpoint(s).

Note that in some cases endpoint must be more specific than the argument to zmq-bind or zmq-connect. For example, see the section labeled “Unbinding wild-card address from a socket” in zmq_tcp.

procedure

(zmq-subscribe s topic ...)  void?

  s : zmq-socket?
  topic : (or/c bytes? string?)

procedure

(zmq-unsubscribe s topic ...)  void?

  s : zmq-socket?
  topic : (or/c bytes? string?)
Adds or removes topic from a SUB ('sub) socket’s subscription list. A SUB socket starts out with no subscriptions, and thus receives no messages.

A topic matches a message if topic is a prefix of the message. The empty topic accepts all messages.

procedure

(zmq-send s msg-frame ...+)  void?

  s : zmq-socket?
  msg-frame : (or/c bytes? string?)
Sends a message on socket s. The message has as many frames as msg-frame arguments, with at least one frame required.

procedure

(zmq-send* s msg)  void?

  s : zmq-socket?
  msg : (non-empty-listof (or/c bytes? string?))
Sends the message msg on socket s, where msg consists of a non-empty list of frames.

procedure

(zmq-recv s)  bytes?

  s : zmq-socket?

procedure

(zmq-recv-string s)  string?

  s : zmq-socket?
Receives a one-frame message from the socket s and returns the single frame as a byte string or character string, respectively.

If a multi-frame message is received from s, an error is raised. (The message is still consumed.)

procedure

(zmq-recv* s)  (listof bytes?)

  s : zmq-socket?
Receives a message from the socket s. The message is represented as a list of byte strings, one for each frame.

3 ZeroMQ Unsafe Functions

The functions provided by this module are unsafe.

 (require zeromq/unsafe) package: zeromq-r-lib

procedure

(zmq-connect s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?

procedure

(zmq-bind s endpoint ...)  void?

  s : zmq-socket?
  endpoint : string?
Like zmq-connect and zmq-bind, but do not attempt to parse the endpoint arguments and perform security guard checks.

These functions are unsafe, not in the sense that misuse is likely to cause memory corruption, but in the sense that they do not respect the security guard mechanism.