majordomo2
1 Introduction
2 Demonstration
2.1 Simple Example
2.2 Sorting Task Results
2.3 Pre- and Post-Processing
2.4 Filtering Task Results
2.5 Restarting
2.6 Parallelization
2.7 Unwrapping
2.8 Flattening
2.9 Convenience Functions
3 API
start-majordomo
stop-majordomo
majordomo.id
majordomo-id
majordomo?
task-status/  c
task+  +
task?
task.id
task-id
task.status
task-status
task.data
task-data
set-task-id
set-task-status
set-task-data
current-task
keepalive
success
failure
update-data
3.1 Task Data
3.2 Task Results
4 Running tasks
add-task
task-return-value
get-task-data
5 Parallel Processing
6 Notes
8.2

majordomo2

David K. Storrs

 (require majordomo2) package: majordomo2

1 Introduction

majordomo2 is a task manager. It obsoletes the original majordomo package.

Major features include:

2 Demonstration

The provided functions and data types are defined in the API section, but here are examples of practical usage.

2.1 Simple Example

> (define (build-email-text msg names)
    (for/list ([name names])
      (format "Dear ~a: ~a" name msg)))
> (define names '("fred" "barney" "betty" "wilma" "bam-bam"))
> (define jarvis (start-majordomo))
> (define result-channel
    (add-task jarvis build-email-text "hi there" names))
> (channel? result-channel)

#t

> (define result  (sync result-channel))
> (task? result)

#t

> (pretty-print (task.data result))

'("Dear fred: hi there"

  "Dear barney: hi there"

  "Dear betty: hi there"

  "Dear wilma: hi there"

  "Dear bam-bam: hi there")

; stop-majordomo shuts down all tasks that were added to that instance.  This
; shuts down the instance custodian which will shut down the custodian for all
; the tasks.
> (stop-majordomo jarvis)

2.2 Sorting Task Results

> (define jarvis (start-majordomo))
> (pretty-print
   ; Do the same thing as we did in the prior section, but get the results back
   ; sorted.
   (task.data (sync (add-task jarvis build-email-text "hi there" names
                              #:sort-op string<?))))

'("Dear bam-bam: hi there"

  "Dear barney: hi there"

  "Dear betty: hi there"

  "Dear fred: hi there"

  "Dear wilma: hi there")

> (define (mock-network-function . ip-addrs)
    ; In real life, this would (e.g.) connect to the specified IP address,
    ; send a message, and return whether or not it succeeded.  For purposes of
    ; this demonstration we'll have it return an arbitrary status.
    ; 
    ; Note that tasks include a unique ID that is being returned.  This is
    ; autogenerated if not provided.
    (for/list ([addr ip-addrs]
               [status (in-cycle '(success failure timeout))])
      (list (task.id (current-task)) addr status)))
> (pretty-print
   ; This shows the baseline return value from mock-network-function before we do
   ; any sorting.
   (task.data (sync (add-task jarvis mock-network-function
                              "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235"))))

'((task-11660 "4.4.4.4" success)

  (task-11660 "8.8.8.8" failure)

  (task-11660 "172.67.188.90" timeout)

  (task-11660 "104.21.48.235" success))

> (pretty-print
   ; Now we'll sort the results based on their status.  NB: sort-cache-keys? is
   ; unnecessary here since the key is cheap to calculate, but it's included for
   ; sake of demonstration.
   (task.data (sync (add-task jarvis mock-network-function
                              "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235"
                              #:sort-op symbol<?
                              #:sort-key last
                              #:sort-cache-keys? #t))))

'((task-11661 "8.8.8.8" failure)

  (task-11661 "4.4.4.4" success)

  (task-11661 "104.21.48.235" success)

  (task-11661 "172.67.188.90" timeout))

> (stop-majordomo jarvis)

2.3 Pre- and Post-Processing

> (define jarvis (start-majordomo))
> (pretty-print
   ; This is a contrived and overly fancy example, but it demonstrates the
   ; functionality.  We'll generate the strings, append some text to the end after
   ; they've been generated, sort the strings by their length, and then put it all
   ; into titlecase.
   (task.data (sync (add-task jarvis build-email-text "hi there" names
                              #:sort-op <
                              #:sort-key string-length
                              #:pre  (curry map (curryr string-append ", my friend."))
                              #:post (curry map string-titlecase)))))

'("Dear Fred: Hi There, My Friend."

  "Dear Betty: Hi There, My Friend."

  "Dear Wilma: Hi There, My Friend."

  "Dear Barney: Hi There, My Friend."

  "Dear Bam-Bam: Hi There, My Friend.")

> (stop-majordomo jarvis)

2.4 Filtering Task Results

If you’re going to use pre/post processing and/or sorting, it’s a good idea to ensure that all elements in your data set are appropriate. To make this easy you can filter the results before preprocessing, sorting, and postprocessing happen in order to get rid of bad results.

> (define jarvis (start-majordomo))
> (define (simple)
    (success '(1 2 3 4 5 6 #f 'error)))
;  Filtering the results happens before pre-processing
> (define result (sync (add-task jarvis
                                 simple
                                 #:filter  (and/c number? even?)
                                 #:pre     (λ (lst)
                                             (if (memf odd? lst)
                                                 (raise 'failed)
                                                 lst))
                                 #:sort-op >)))
> (task.data result)

'(6 4 2)

> (stop-majordomo jarvis)

2.5 Restarting

When a task fails, either because it threw an error or simply timed out, it will be restarted if there are retries left. By default there will be 3 retries (meaning a total of 4 attempts to run the task), but you can use the #:retries keyword to specify how many you want. The argument must be a natural number or +inf.0.

> (define jarvis (start-majordomo))
> (define (failing-func)
    (displayln "inside failing-func")
    (raise-arguments-error 'failing-func "I don't feel so good"))
> (pretty-print
   ; If a function raises an error then the result will contain the value raised.
   ; By default it will restart 3 times, but in this example we don't want it to
   ; restart at all so we will specify 0 retries.
   (format "Data after failure was: ~a"
           (task.data (sync (add-task jarvis failing-func #:retries 0)))))

inside failing-func

"Data after failure was: #(struct:exn:fail:contract failing-func: I don't feel so good #<continuation-mark-set>)"

> (define (func-times-out . args)
    ; Sometimes an action will time out without explicitly throwing an error.  If
    ; so then if it has retries left it will be restarted again with all of its
    ; original arguments.  We can use the 'data' value in (current-task) to carry
    ; state across the restart.
    (define state (task.data (current-task)))
    (match state
      [(? hash-empty?)
       (displayln "Initial start.  Hello!")
       (update-data (hash-set* state
                               'remaining-args args
                               'results '()))
       (displayln "Intentional timeout to demonstrate restart.")
       (sync never-evt)]
      [(hash-table ('remaining-args (list current remaining ...)))
       (displayln (format "Marking ~a as processed..." current))
       (update-data (hash-set* state
                               'remaining-args remaining
                               'results (cons current
                                              (hash-ref state 'results '()))))
       (displayln "Intentional timeout to demonstrate restart.")
       (sync never-evt)])
    (displayln "Goodbye."))
> (let ([result (sync (add-task jarvis func-times-out 1 2 3 4 5 6
                                #:keepalive 0.1 ; max time to complete/keepalive
                                #:post (λ (h)
                                         (hash-set h
                                                   'results (reverse (hash-ref h 'results '()))))))])
    ; Be sure to use struct* in your match pattern instead of struct.  The
    ; task struct contains private fields that are not documented
    ; or provided. Use the struct-plus-plus reflection API if you
    ; really need to dig out the full field list.
    (match-define (struct* task ([status status] [data data])) result)
    (displayln (format "Final status was: ~v" status))
    (displayln (format "Final data was: ~v" data)))

Initial start.  Hello!

Intentional timeout to demonstrate restart.

Marking 1 as processed...

Intentional timeout to demonstrate restart.

Marking 2 as processed...

Intentional timeout to demonstrate restart.

Marking 3 as processed...

Intentional timeout to demonstrate restart.

Final status was: 'timeout

Final data was: '#hash((remaining-args . (4 5 6)) (results . (1 2 3)))

> (define (long-running-task)
    ; If a task is going to take a long time, it can periodically send a keepalive
    ; so that the manager knows not to restart it.  You can use update-data,
    ; success, failure, or keepalive for that.
    (for ([i 10])
      (sleep 0.1)
      (keepalive))
    (success 'finished))
> (let ([result (sync (add-task jarvis long-running-task #:retries 0 #:keepalive 0.25))])
    ; In this case we are saying that the task has failed if the manager doesn't
    ; hear from it after 0.25 seconds.
    (displayln (format "Final status of long-running task was: ~v" (task.status result)))
    (displayln (format "Final data of long-running task was: ~v" (task.data result))))

Final status of long-running task was: 'success

Final data of long-running task was: 'finished

> (let ([result (sync (add-task jarvis long-running-task #:retries 0 #:keepalive 0.05))])
    ; Here we let the task timeout.
    (displayln (format "Final status of long-running task was: ~v" (task.status result)))
    (displayln (format "Final data of long-running task was: ~v" (task.data result))))

Final status of long-running task was: 'timeout

Final data of long-running task was: '#hash()

> (stop-majordomo jarvis)

2.6 Parallelization

> (define jarvis (start-majordomo))
> (pretty-print
   ; Run the task in series to start with in order to show the baseline data.  (We
   ; already did this in a previous section but it's useful to repeat it here.)
   ; Note that the task ID is the same throughout.
   (task.data (sync (add-task jarvis mock-network-function
                              "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235"))))

'((task-12082 "4.4.4.4" success)

  (task-12082 "8.8.8.8" failure)

  (task-12082 "172.67.188.90" timeout)

  (task-12082 "104.21.48.235" success))

> (pretty-print
   ; Parallelize the task such that each argument in the list is farmed out to a
   ; separate sub task and the results are compiled back together. NB: Because of
   ; how mock-network-function is defined, this results in an extra layer of
   ; listing, as we're getting multiple lists each containing the results of
   ; processing a single argument instead of one list containing the results of
   ; processing each argument in turn.  See below for how to handle this.  Note
   ; that the task ID is different for each entry.
   (task.data (sync (add-task jarvis mock-network-function
                              #:parallel? #t
                              "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235"))))

'(((task-12083 "4.4.4.4" success))

  ((task-12084 "8.8.8.8" success))

  ((task-12085 "172.67.188.90" success))

  ((task-12086 "104.21.48.235" success)))

> (pretty-print
   ; Same as above, but we'll append the sublists together in order to get back to
   ; the original results.  Note that once again the task ID is different for each
   ; entry.
   (task.data (sync (add-task jarvis mock-network-function
                              #:parallel? #t
                              #:post (curry apply append)
                              "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235"))))

'((task-12088 "4.4.4.4" success)

  (task-12089 "8.8.8.8" success)

  (task-12090 "172.67.188.90" success)

  (task-12091 "104.21.48.235" success))

> (stop-majordomo jarvis)

2.7 Unwrapping

Often you’ll have a function that wants individual arguments but you have the arguments as a list, because you generated them using a map or got them from a database using query-rows or etc. You can save yourself an unnecessary apply by using the #:unwrap? argument.

> (define jarvis (start-majordomo))
> (define args (for/list ([i 5]) i))
; This fails because we need to pass 0 1 2 3 4 separately instead of as a list
> (pretty-print
   (task.data (sync (add-task jarvis + args))))

(exn:fail:contract

 "+: contract violation\n  expected: number?\n  given: '(0 1 2 3 4)"

 #<continuation-mark-set>)

; This works because we are telling the task to unwrap the list into separate args
> (task.data (sync (add-task jarvis + args #:unwrap? #t)))

10

2.8 Flattening

Sometimes, usually when running in parallel, you’ll have a task where the data field ends up containing one or more tasks, each of which might also be nested, and what you actually want is the data from those subtasks. You could dig that out on the far end or you could have the system do it for you.

> (define jarvis (start-majordomo))
> (define (make-task x)
    (task++ #:data x))
> (let* ([jarvis (start-majordomo)]
       [without-flatten (sync (add-task jarvis make-task 0 1 2 #:parallel? #t))]
       [nested (sync (add-task jarvis make-task (for/list ([i 3]) (make-task i)) #:unwrap? #t #:parallel? #t))]
       [with-flatten    (sync (add-task jarvis make-task 0 1 2 (make-task (make-task 3))
                                        #:parallel? #t
                                        #:flatten-nested-tasks? #t))])
  (printf "data from without-flatten: ~v\n" (task.data without-flatten))
  (printf "data from nested tasks in without-flatten: ~v\n" (map task.data (task.data without-flatten)))
  (printf "data via explicit call to flatten-nested-tasks: ~v\n" (task.data (flatten-nested-tasks without-flatten)))
  ; 
  ; Data from nested tasks gets lifted regardless of nesting depth
  (printf "data via #:flatten-nested-tasks?: ~v\n" (task.data with-flatten))
  
  (stop-majordomo jarvis))

data from without-flatten: '(#<task> #<task> #<task>)

data from nested tasks in without-flatten: '(0 1 2)

data via explicit call to flatten-nested-tasks: '(0 1 2)

data via #:flatten-nested-tasks?: '(0 1 2 3)

Note that flattening loses the id and status fields of the subtasks.

2.9 Convenience Functions

Some operations are common enough that it’s worth having a short form to avoid boilerplate.

; A task that simply returns a specified value.  Generates its own majordomo
> (task.data (sync (task-return-value 'hello)))

'hello

; Simplify adding task, syncing for result, and fetching data
> (get-task-data (start-majordomo) add1 7)

8

3 API

Start and stop a majordomo instance and all tasks it manages. An instance contains a custodian which manages resources created by any tasks given to that instance. These resources will be cleaned up when stop-majordomo is called.

procedure

(majordomo.id instance)  any/c

  instance : majordomo
(majordomo-id instance)  any/c
  instance : majordomo
Retrieve the unique ID for this particular instance. It’s possible to have as many majordomo instances as desired. You might have multiple ones in order to group tasks together so that it’s easy to shut down a specific group without disturbing others.

procedure

(majordomo? val)  boolean?

  val : any/c
Predicate for the majordomo struct.

procedure

(task-status/c val)  boolean?

  val : (or/c 'success 'failure 'unspecified 'timeout)
Contract for legal task status values.

task constructor

(task++ [#:id id #:status status #:data data])  task?

  id : symbol? = (gensym "task-")
  status : task-status/c = 'unspecified
  data : any/c = (hash)
A task struct encapsulates the details of a task. It has a unique ID as an aid in debugging and organization, a status to identify what the outcome of the task was, and the data generated by the execution of the task.

procedure

(task? the-task)  boolean?

  the-task : any/c
Predicate for identifying tasks.

procedure

(task.id the-task)  symbol?

  the-task : task?
(task-id the-task)  symbol?
  the-task : task?
Accessors for the id field of a task struct.

procedure

(task.status the-task)  symbol?

  the-task : task?
(task-status the-task)  symbol?
  the-task : task?
Accessors for the status field of a task struct.

procedure

(task.data the-task)  symbol?

  the-task : task?
(task-data the-task)  symbol?
  the-task : task?
Accessors for the data field of a task struct. See Task Data for how the data field is used.

procedure

(set-task-id the-task val)  task?

  the-task : task?
  val : symbol?
Functional setter for the id field.

procedure

(set-task-status the-task val)  task?

  the-task : task?
  val : task-status/c
Functional setter for the status field.

procedure

(set-task-data the-task val)  task?

  the-task : task?
  val : any/c
Functional setter for the data field. Prefer update-data instead of using this directly, since that will also update the current-task parameter and send a keepalive to the manager thread to delay restarting.

parameter

(current-task)  task?

(current-task t)  void?
  t : task?
 = #f
A parameter that tracks the currently-running task. It is updated when you call the update-data, success, and failure functions.

procedure

(keepalive)  void?

Notify the manager that the task is still running. This resets the timer and prevents the manager from restarting the task.

procedure

(success [arg])  void?

  arg : any/c = the-unsupplied-arg
If arg was NOT the-unsupplied-arg, set the data field of current-task to arg.

Set the status field of current-task to 'success. Tell the manager that the worker has completed. This will cause the manager to send the value of current-task to the customer.

procedure

(failure [arg])  void?

  arg : any/c = the-unsupplied-arg
If arg was NOT the-unsupplied-arg, set the data field of current-task to arg.

Set the status field of current-task to 'failure. Tell the manager that the worker has completed. This will cause the manager to send the value of current-task to the customer.

procedure

(update-data val)  void?

  val : any/c
Set the data field of current-task to val. Send a keepalive to the manager.

3.1 Task Data

The current-task parameter holds a task structure that can be used to carry state across restarts, such as which arguments have already been processed. (Since the task is started with the same arguments every time.) The data field is also useful for returning a value from the action. See Task Results for details.

There are three functions that an action can call to manipulate the contents of the current-task struct:

If none of these functions is ever called then the data field will be set as follows:

3.2 Task Results

add-task returns a channel. When the task finishes, the content of the current-task parameter is placed onto the channel. The customer may retrieve the struct (via sync, channel-get, etc) and examine the status and data fields in order to determine how the task completed and what the final result was.

4 Running tasks

Tasks are created inside, and managed by, a majordomo instance.

procedure

(add-task jarvis 
  action 
  arg ... 
  [#:keepalive keepalive-time 
  #:retries retries 
  #:parallel? parallel? 
  #:unwrap? unwrap? 
  #:flatten-nested-tasks? flatten-nested-tasks? 
  #:filter filter-func 
  #:pre pre 
  #:sort-op sort-op 
  #:sort-key sort-key 
  #:sort-cache-keys? cache-keys? 
  #:post post]) 
  channel?
  jarvis : majordomo?
  action : (unconstrained-domain-> any/c)
  arg : any/c
  keepalive-time : (and/c real? (not/c negative?)) = 5
  retries : (or/c natural-number/c +inf.0) = 3
  parallel? : boolean? = #f
  unwrap? : boolean? = #f
  flatten-nested-tasks? : boolean? = #f
  filter-func : (or/c #f procedure?) = #f
  pre : procedure? = identity
  sort-op : (or/c #f (-> any/c any/c any/c)) = #f
  sort-key : (-> any/c any/c) = identity
  cache-keys? : boolean? = #f
  post : procedure? = identity
Add a task to a majordomo instance.

Two threads are created, a worker and a manager. The worker does (apply action args). (See Parallel Processing for an exception.) The manager does a sync/timeout keepalive-time on the worker. If the worker times out then the worker thread is killed and a new thread is started using the same arguments as the original and the most recent value of current-task, thereby allowing state to be carried over from one attempt to the next. The keepalive timer resets whenever the worker does any of the following:

Arguments are as follows:

#:keepalive-time The duration within which the worker must either terminate or notify the manager that it’s still running.

#:retries The number of times that the manager should restart the worker before giving up. (Note: This counts retries, not maximum attempts. The maximum number of times your action will be started is retries + 1, with the +1 being the initial attempt.)

#:parallel? Whether the action should be run in parallel. See Parallel Processing.

#:unwrap? Whether the arguments should be used as-is or one layer of listing should be removed. Saves you the trouble of currying in an apply. See Unwrapping.

#:flatten-nested-tasks? If your action procedure produces a task that contains tasks you can have the system automatically lift the subtasks out and combine their data into the top-level task so that you don’t have to dig it out on the far end. You will lose the id and status fields of the subtasks if you do this. See Flattening.

#:filter Filter the data after flattening (if any) and before passing it to the preprocessing function. See Filtering Task Results.

#:pre Pre-processes the results of the action. The default preprocessor is identity. This happens after flattening and before sorting.

#:sort-op, #:sort-key, #:sort-cache-keys? Whether and how to sort the results of the action. They are passed as the (respectively) less-than?, #:key extract-key and #:cache-keys? cache-keys? arguments to a sort call. Sorting happens after preprocessing and before postprocessing.

#:post Postprocesses the results of the action immediately before returning them. The default is identity.

Obviously, if the results of your function are not a list, either leave #:sort-op as #f so that it doesn’t try to sort and fail, or else use #:pre list to make it a list before sorting is applied.

Consolidating previous information, the pipeline goes:

flatten > preprocess > sort > postprocess

Each step in the pipeline is optional.

procedure

(task-return-value val)  channel?

  val : any/c
This provides a way to return a specific value using an interface equivalent to the normal add-task system. It returns a channel which syncs to a task with a data field containing the specified value. A convenience function equivalent to:

(add-task (start-majordomo) identity val)

procedure

(get-task-data jarvis 
  action 
  arg ... 
  [#:keepalive keepalive-time 
  #:retries retries 
  #:parallel? parallel? 
  #:unwrap? unwrap? 
  #:flatten-nested-tasks? flatten-nested-tasks? 
  #:filter filter-func 
  #:pre pre 
  #:sort-op sort-op 
  #:sort-key sort-key 
  #:sort-cache-keys? cache-keys? 
  #:post post]) 
  channel?
  jarvis : majordomo?
  action : (unconstrained-domain-> any/c)
  arg : any/c
  keepalive-time : (and/c real? (not/c negative?)) = 5
  retries : (or/c natural-number/c +inf.0) = 3
  parallel? : boolean? = #f
  unwrap? : boolean? = #f
  flatten-nested-tasks? : boolean? = #f
  filter-func : (or/c #f procedure?) = #f
  pre : procedure? = identity
  sort-op : (or/c #f (-> any/c any/c any/c)) = #f
  sort-key : (-> any/c any/c) = identity
  cache-keys? : boolean? = #f
  post : procedure? = identity
A convenience function that fetches the data for an action. It is equivalent to:

(task.data (sync (add-task jarvis action args keyword-args)))

The arguments and keywords all have the same meaning as add-task.

5 Parallel Processing

As shown in the Parallelization demonstration, tasks can be parallelized simply by passing #:parallel? #t. In this case add-task will call itself recursively, creating subtasks and passing each of them one of the arguments in turn. The main task will then wait for the subtasks to complete, aggregate their results into a list, and treat it as normal by running it through some or all of flattening, preprocessing, sorting, and postprocessing. The subtasks will be run with the same #:keepalive and #:retries values as the main tasks but everything else will be the default, meaning that all preprocessing and sorting will happen in the main task.

Caveats:

6 Notes

The task structure is defined via the struct-plus-plus module, giving it a keyword constructor, dotted accessors, reflection data, etc. As stated above, not all accessors are exported, so if you need to use match on a struct, use struct* instead of struct.