A calculus of mediated exchange (cmx)
1 Exchanges
1.1 Simple push-based exchanges
say
say*
hear
ask
tell
tell*
1.2 Forwarding exchanges
forward
filter
dispatch
1.3 Multi-sender exchanges
collect
quorum
1.4 Multi-receiver exchanges
broadcast
multicast
2 Mediator
handler/  c
mediator
make-mediator
void-mediator
offer
offer*
accept
put
put*
get
2.1 Handlers
bind-offer
bind-accept
bind-put
bind-get
2.2 Hooks
on-offer
on-accept
on-put
on-get
6.12

A calculus of mediated exchange (cmx)

Eric Griffis <[email protected]>

This package implements a calculus of mediated exchange for expressing synchronizable rendezvous events with any number of participants linked arbitrarily. The calculus models a discrete communication as a series of operations carried out by programmable forwarding constructs called mediators.

1 Exchanges

 (require cmx) package: cmx

An exchange is a process by which some number of threads transfer values through one or more mediators. An exchange is push-based when the sender initiates and pull-based when the receiver initiates.

1.1 Simple push-based exchanges

A simple exchange is a three-step process with no mediators, one sender, and one receiver:

  1. The initiator offers a base mediator.

  2. The accepter counter-offers a final mediator through the base mediator.

  3. The sender puts some values into the final mediator as the receiver gets the values out.

This protocol encapsulates the subtleties of third-party exchange synchronization. The initiator establishes the link and offers a base set of behaviors. The acceptor expresses the behavior of the final synchronizing operation, typically as a chain of mediators including the base behavior and ending with the default behavior.

procedure

(say m v ...)  evt?

  m : mediator?
  v : any/c

procedure

(say* m vs [m0])  evt?

  m : mediator?
  vs : (listof any/c)
  m0 : mediator? = (make-mediator)
image image image

Returns a synchronizable event that performs the initiating side of a simple push-based exchange. Offers m0 to m, accepts a final mediator m* from m0, and puts vs into m*. Becomes ready for synchronization when the receiver is ready to accept vs from the data channel of m*.

> (define M (make-mediator))
> (thread (λ () (sync (say M 123))))
> (sync (hear M))

123

procedure

(hear m [make-m*])  evt?

  m : mediator?
  make-m* : (-> mediator? mediator?) = values
image image image

Returns a synchronizable event that performs the passive side of a simple push-based exchange. Accepts a base mediator m0 from m, creates a final mediator m* by applying make-m* to m0, offers m* to m0, and gets values from m*. Becomes ready for synchronization when the sender is ready to provide values through the data channel of m*. The synchronization result is the provided values.

procedure

(ask m [m0])  evt?

  m : mediator?
  m0 : mediator? = (make-mediator)
image image image

Returns a synchronizable event that performs the initiating side of a simple pull-based exchange. Offers a base mediator m0 to m, accepts a final mediator m* from m0, and gets values from m*. Becomes ready for synchronization when the sender is ready to provide values through the data channel of the final mediator. The synchronization result is the provided values.

> (define M (make-mediator))
> (thread (λ () (sync (tell M 123))))
> (sync (ask M))

123

procedure

(tell m v ...)  evt?

  m : mediator?
  v : any/c

procedure

(tell* m vs)  evt?

  m : mediator?
  vs : (listof any/c)
image image image

Returns a synchronizable event that performs the passive side of a simple pull-based exchange. Accepts a base mediator m0 from m, creates a final mediator m* by applying make-m* to m0, offers m* to m0, and puts vs into m*. Becomes ready for synchronization when the receiver is ready to accept vs from the data channel of m*.

1.2 Forwarding exchanges

procedure

(forward m1 m2)  evt?

  m1 : mediator?
  m2 : mediator?
image image

Returns a synchronizable event that accepts a base mediator from m1 and then offers it to m2. Becomes ready for synchronization when m2 accepts the base mediator, possibly before the exchange is completed.

> (define M1 (make-mediator))
> (define M2 (make-mediator))
> (begin
    (thread (λ () (sync (say M1 1))))
    (thread (λ () (sync (forward M1 M2))))
    (sync (hear M2)))

1

procedure

(filter m1    
  m2    
  [#:offer offer-hook    
  #:accept accept-hook    
  #:put put-hook    
  #:get get-hook])  evt?
  m1 : mediator?
  m2 : mediator?
  offer-hook : (or/c procedure? #f) = #f
  accept-hook : (or/c procedure? #f) = #f
  put-hook : (or/c procedure? #f) = #f
  get-hook : (or/c procedure? #f) = #f
image image

Returns a synchronizable event that accepts a base mediator from m1, extends it with any non-#f hooks, and offers it to m2. Becomes ready for synchronization when m2 accepts the extended base mediator, pussibly before the exchange is completed.

> (define M1 (make-mediator))
> (define M2 (make-mediator))
> (begin
    (thread (λ () (sync (say M1 4))))
    (thread (λ () (sync (filter M1 M2
                                #:put (curry * 2)
                                #:get (curry + 3)))))
    (sync (hear M2)))

11

procedure

(dispatch m ms default)  evt?

  m : mediator?
  ms : (hash/c any/c mediator?)
  default : mediator?
Returns a synchronizable event that forwards a set of values from m to an element of ms. If the first value is a key of ms, the remaining values are delivered to the keyed element. Otherwise, the remaining values are forwarded to default.

The dispatch event may finish before the values are delivered, but the sender and receiver are guaranteed to synchronize on each other.

1.3 Multi-sender exchanges

procedure

(collect m N)  evt?

  m : mediator?
  N : exact-nonnegative-integer?
Returns a synchronizable event that becomes ready for synchronization after receiving N values from m. The synchronization result is a list of the N values.

> (define M (make-mediator))
> (for ([i 10]) (thread (λ () (sync (say M i)) (display i))))
> (sync (collect M 3))

876

'(8 7 6)

procedure

(quorum m N)  evt?

  m : mediator?
  N : exact-nonnegative-integer?
Returns a synchronizable event that performs the passive side of a simple push-based exchange many times concurrently. Blocks until N senders are ready to provide values. Becomes ready for synchronization when the exchange is complete. The synchronization result is the provided values.

> (define M (make-mediator))
> (for ([i 10]) (thread (λ () (sync (say M i)))))
> (sync (quorum M 6))

'(6 5 4 3 2 1)

1.4 Multi-receiver exchanges

procedure

(broadcast m ms)  evt?

  m : mediator?
  ms : (listof mediator?)
Returns a synchronizable event that performs the initiating side of a simple push-based exchange many times concurrently. Blocks the sender on m until all ms have a receiver ready. Becomes ready for synchronization when the exchange is complete.

> (define M (make-mediator))
> (define M1 (make-mediator))
> (define M2 (make-mediator))
> (sync
   (async-void
    (thread (λ () (sync (say M 'X))))
    (thread (λ () (sync (broadcast M (list M1 M2)))))
    (thread (λ () (write (sync (hear M1)))))
    (thread (λ () (write (sync (hear M2)))))))

XX

procedure

(multicast m ms [default])  evt?

  m : mediator?
  ms : (hash/c any/c mediator?)
  default : (-> (listof (or/c mediator? #f)) list? list? mediator?)
   = (λ _ void-mediator)
Returns a synchronizable event that forwards an exchange on m to at least one of the ms or a mediator created by default. If the value being exchanged is a list and its first element is a list of keys of ms, the remaining elements are broadcasted to the keyed mediators. Otherwise, the operation is restarted on a mediator created by applying default to a list of the hash lookup results, a list of the lookup keys, and a list of the remaining elements of the value being exchanged.

> (define M (make-mediator))
> (define Ms (for/hash ([i 10]) (values i (make-mediator))))
> (sync
   (async-void
    (thread (λ () (sync (say M '(1 3 4 7 8) 'X))))
    (thread (λ () (sync (multicast M Ms))))
    (thread (λ () (sync (hear (hash-ref Ms 1))) (write 1)))
    (thread (λ () (sync (hear (hash-ref Ms 3))) (write 3)))
    (thread (λ () (sync (hear (hash-ref Ms 4))) (write 4)))
    (thread (λ () (sync (hear (hash-ref Ms 7))) (write 7)))
    (thread (λ () (sync (hear (hash-ref Ms 8))) (write 8)))))

84137

2 Mediator

 (require cmx/mediator) package: cmx

A mediator is an extensible synchronization primitive modeled as a pair of channels and a set of handler functions implementing the four basic mediated operations: offer, accept, put, and get. If one thread offers a value to a mediator and another thread accepts from the same mediator, the value goes through the control channel of the mediator. Similarly, if one thread puts while another thread gets, the values goes through the data channel of the mediator.

The behavior of a mediator under a given mediated operation is completely determined by its handler. The default behaviors are modeled as operations on its control channel or data channel, and may be extended or overridden. The full set of structure type bindings for the mediator type is provided. Mutators for the handler functions are also provided: bind-offer, bind-accept, bind-put, and bind-get.

A hook is a function that extends the behavior of an existing mediator. A set of functional extenders is provided: on-offer, on-accept, on-put, and on-get.

A flat contract that accepts handler functions.

struct

(struct mediator (offer-handler
    accept-handler
    put-handler
    get-handler))
  offer-handler : handler/c
  accept-handler : handler/c
  put-handler : handler/c
  get-handler : handler/c
A structure type for mediators. The behavior of a mediator is determined by four handler functions.

These functions may be extended with hooks and overridden with bind-offer, bind-accept, bind-put, and bind-get.

procedure

(make-mediator)  mediator?

Creates and returns a new mediator with default handlers and no hooks.

A mediator that does nothing and always returns #<void>.

procedure

(offer m v ...)  evt?

  m : mediator?
  v : any/c

procedure

(offer* m vs)  evt?

  m : mediator?
  vs : (listof any/c)
Applies the offer-handler of m to vs and returns a synchronizable event which by default provides vs through the control channel of m.

procedure

(accept m)  evt?

  m : mediator?
Returns a synchronizable event which by default blocks until values are provided through the control channel of m, applies the accept-handler of m to the provided values, and produces the results as its synchronization result.

procedure

(put m v ...)  evt?

  m : mediator?
  v : any/c

procedure

(put* m vs)  evt?

  m : mediator?
  vs : (listof any/c)
Applies the put-handler of m to vs and returns a synchronizable event which by default provides vs through the data channel of m.

procedure

(get m)  evt?

  m : mediator?
Returns a synchronizable event which by default blocks until values are provided through the data channel of m, applies the get-handler of m to the provided values, and produces the results as its synchronization result.

2.1 Handlers

procedure

(bind-offer m f)  mediator?

  m : mediator?
  f : 
(-> (unconstrained-domain-> evt?)
    (unconstrained-domain-> evt?))
Returns a copy of m with a new offer-handler created by applying f to the old offer-handler.

> (define M
    (bind-offer
     (make-mediator)
     (λ (next) (λ vs (apply next (map add1 vs))))))
> (thread (λ () (sync (offer M 1 2 3))))
> (sync (accept M))

2

3

4

> (define M (bind-offer (make-mediator) (λ _ (λ _ (pure 0)))))
> (sync (offer M 1 2 3)) ; no accept

0

procedure

(bind-accept m f)  mediator?

  m : mediator?
  f : (-> evt? evt?)
Returns a copy of m with a new accept-handler created by applying f to the old accept-handler.

> (define M
    (bind-accept
     (make-mediator)
     (λ (next) (λ () (fmap (λ vs (map add1 vs)) (next))))))
> (thread (λ () (sync (offer M 1 2 3))))
> (sync (accept M))

'(2 3 4)

> (define M (bind-accept (make-mediator) (λ _ (λ _ (pure 0)))))
> (sync (accept M)) ; no offer

0

procedure

(bind-put m f)  mediator?

  m : mediator?
  f : 
(-> (unconstrained-domain-> evt?)
    (unconstrained-domain-> evt?))
Returns a copy of m with a new put-handler created by applying f to the old put-handler.

> (define M
    (bind-put
     (make-mediator)
     (λ (next) (λ vs (apply next (map add1 vs))))))
> (thread (λ () (sync (put M 1 2 3))))
> (sync (get M))

2

3

4

> (define M (bind-put (make-mediator) (λ _ (λ _ (pure 0)))))
> (sync (put M 1 2 3)) ; no get

0

procedure

(bind-get m f)  mediator?

  m : mediator?
  f : (-> evt? evt?)
Returns a copy of m with a new get-handler created by applying f to the old get-handler.

> (define M
    (bind-get
     (make-mediator)
     (λ (next) (λ _ (fmap (λ vs (map add1 vs)) (next))))))
> (thread (λ () (sync (put M 1 2 3))))
> (sync (get M))

'(2 3 4)

> (define M (bind-get (make-mediator) (λ _ (λ _ (pure 0)))))
> (sync (get M)) ; no put

0

2.2 Hooks

procedure

(on-offer m f)  mediator?

  m : mediator?
  f : procedure?
Returns a copy of m extended to apply f to any values offered through m. When a mediator has more than one on-offer hook, the hooks are applied in reverse order.

> (define M (make-mediator))
> (set! M (on-offer M (curry cons 1)))
> (set! M (on-offer M (curry cons 2)))
> (thread (λ () (sync (offer M null))))
> (sync (accept M))

'(1 2)

procedure

(on-accept m f)  mediator?

  m : mediator?
  f : procedure?
Returns a copy of m extended to apply f to any values accepted through m. When a mediator has more than one on-accept hook, the hooks are applied in order.

> (define M (make-mediator))
> (set! M (on-accept M (curry cons 1)))
> (set! M (on-accept M (curry cons 2)))
> (thread (λ () (sync (offer M null))))
> (sync (accept M))

'(2 1)

procedure

(on-put m f)  mediator?

  m : mediator?
  f : procedure?
Returns a copy of m extended to apply f to any values put through m. When a mediator has more than one on-put hook, the hooks are applied in reverse order.

> (define M (make-mediator))
> (set! M (on-put M (curry cons 1)))
> (set! M (on-put M (curry cons 2)))
> (thread (λ () (sync (put M null))))
> (sync (get M))

'(1 2)

procedure

(on-get m f)  mediator?

  m : mediator?
  f : procedure?
Returns a copy of m extended to apply f to any values gotten through m. When a mediator has more than one on-get hook, the hooks are applied in order.

> (define M (make-mediator))
> (set! M (on-get M (curry cons 1)))
> (set! M (on-get M (curry cons 2)))
> (thread (λ () (sync (put M null))))
> (sync (get M))

'(2 1)