Jobsched:   Running jobs with multiple Racket instances
1 Remote-Call:   Remote function calls
remote-call
remote-call?
remote-call-fun-sym
remote-call-pos-args
remote-call-kw-dict
remote-call-mod-path
start-remote-call-worker
2 Lower-level API
3 Simple server /   worker
start-simple-worker
start-simple-server
4 Server
make-server
server-start
server-close
5 Job
job
6 Server
scheduler?
worker?
make-scheduler
scheduler-add-job!
scheduler-n-queued-jobs
scheduler-n-active-jobs
scheduler-start
processor-count
7 Worker
start-worker
8 Utilities
make-racket-cmd
this-file
9 Comparison with other Racket parallelism mechanisms
9.1

Jobsched: Running jobs with multiple Racket instances🔗ℹ

Laurent Orseau

 (require jobsched) package: jobsched

Jobsched is a job scheduler with a server-worker architecture on a single machine, where the server dispatches a number of jobs to the workers.

All workers are identical and are able to process any job assigned by the server. The workers are independent racket programs, each with its own Racket instance. See Comparison with other Racket parallelism mechanisms for more information.

The workflow is as follows:
  1. The server (a Racket program) is started.

  2. N jobs are added to the job queue in the server. A job is specified as read-able data.

  3. The server starts K workers (independent Racket programs). Each worker is spawned as a separate OS process and communicates with the server over a localhost TCP connection (one per worker).

  4. Each worker signals to the server over TCP that it is ready to receive jobs.

  5. As soon as a worker is ready, the server sends it one of the remaining jobs.

  6. When a worker has finished a job, it sends the result to the server, and waits for another job.

  7. When the server receives the result of a job, it processes this result and sends a new job (if any is remaining) to the worker.

  8. When all N jobs are finished, the server finishes.

If your machine has K cores and sufficient memory, jobsched can run K workers in parallel and benefit from a speedup of a factor K. Jobsched has been successfully used with more than 120 workers in parallel, with more than 100 000 fast-paced jobs to dispatch between them.

1 Remote-Call: Remote function calls🔗ℹ

The bindings in this section are also exported by jobsched.

The jobsched/remote-call module provides a higher-level interface where jobs are expressed as function calls.

"server-worker-remote-call.rkt"

#lang racket
(require jobsched)
 
;; Functions used by the server must be `provide`d.
(provide greet total)
 
(define (greet name #:greeting [greeting "Hello"])
  (format "~a, ~a!" greeting name))
 
(define (total prices #:tax-rate [tax-rate 0])
  (define subtotal (apply + prices))
  (exact->inexact (* subtotal (+ 1 tax-rate))))
 
;;=== Worker ===;;
 
;; The worker resolves and calls the functions automatically.
(module+ worker (start-remote-call-worker))
 
;;=== Server ===;;
 
(module+ main
  (start-simple-server
   #:worker-file (this-file)
   ;; These look like function calls, but they are captured and
   ;; sent to the workers for execution. Arguments are evaluated
   ;; here; the function itself is resolved on the worker.
   #:data-list (list (remote-call (greet "Alice"))
                     (remote-call (greet "Bob" #:greeting "Hi"))
                     (remote-call (total '(10 20 30)))
                     (remote-call (total '(100 200) #:tax-rate 0.2)))
   #:process-result (λ (data result) (displayln result))
   #:n-workers 3))
 

Try it with:
racket -l- jobsched/examples/server-worker-remote-call

The remote-call macro captures:
  • The function’s module path (resolved at compile time via identifier-binding).

  • The function’s symbol name.

  • The evaluated arguments (positional and keyword).

The worker then uses dynamic-require to resolve the function and applies it with the captured arguments. This means the server and worker are guaranteed to call the same function—eliminating the risk of module path mismatches.

Important: The arguments to the function are evaluated on the server before being serialized and sent to the worker. The worker receives only the function name and the serialized argument values, and calls the function on these values. In particular, procedures and closures are not serializable (via racket/fasl) and cannot be passed as arguments. Only the top-level function itself is resolved on the worker via dynamic-require.

Remark: In the example above, the functions, the worker and the server are all defined in the same file. This is not a requirement, and they can be in three different files.

syntax

(remote-call (fun arg ...))

Creates a remote-call job for the function call (fun arg ...).

At compile time, the macro:
  • Extracts the module path and symbol for fun via identifier-binding.

  • Raises a compile-time error if fun is not provided by any module.

We recommend requiring define2 in the module where the functions are defined, so as to raise compile-time errors about the function’s signature.

At run time, the arguments are evaluated and stored in the job data, but the function is not called—it will be called on the worker.

Caveats:
  • The function must be provided by its module, otherwise a compile-time error is raised.

  • All arguments must be serializable via racket/fasl. Procedures, closures, and other non-serializable values cannot be passed as arguments.

  • Renamed imports (e.g., (require (rename-in racket/list [first my-first]))) are not supported. The embedded module path refers to the original name. Use re-exported names instead.

  • planet modules are untested.

The data argument received by the #:process-result callback is the remote-call value. You can inspect it with the following accessors and predicate:

procedure

(remote-call? v)  boolean?

  v : any/c
(remote-call-fun-sym rc)  symbol?
  rc : remote-call?
(remote-call-pos-args rc)  list?
  rc : remote-call?
(remote-call-kw-dict rc)  dict?
  rc : remote-call?
(remote-call-mod-path rc)  module-path?
  rc : remote-call?
remote-call? checks whether a value is a remote-call. The accessors return, respectively, the function symbol name, the list of positional arguments, the association list of keyword arguments, and the resolved module path.

Starts a worker that processes remote-call jobs. No arguments are needed—the module path is embedded in each job by the remote-call macro.

For each received job, the worker uses dynamic-require to load the function from the embedded module path, then calls it with the stored arguments.

2 Lower-level API🔗ℹ

The following sections document the lower-level server/worker API for cases where remote-call does not provide enough control—for example, when the worker needs to perform custom setup, or when job data is not a function call.

All definitions exported by the various modules below are also exported by jobsched.

3 Simple server / worker🔗ℹ

procedure

(start-simple-worker run [#:silent? silent?])  void?

  run : (-> readable? readable?)
  silent? : any/c = #f
Like start-worker except that run accepts the data of the job rather than the job. This masks the job object.

Note that start-simple-worker can be used with start-simple-server, server-start and scheduler-start.

IMPORTANT: See the remarks for start-worker.

procedure

(start-simple-server #:worker-file worker-file    
  #:data-list data-list    
  #:process-result process-result    
  [#:submod-name submod-name    
  #:n-workers n-workers])  void?
  worker-file : path-string?
  data-list : (listof readable?)
  process-result : (procedure-arity-includes/c 2)
  submod-name : symbol? = 'worker
  n-workers : integer?
   = (min (length data-list) (processor-count))
Creates and starts a server, like scheduler-start, but hiding the scheduler and the job objects. The worker is assumed to be in the racket file worker-file in the submodule submod-name. See start-simple-worker.

The workers will receive one element of data-list at a time, and return a result to be processed by process-result.

The server starts n-workers workers in separate OS processes. Refer to scheduler-start for close-workers?.

Note: By contrast to scheduler-start, the simple server does not allow to add more tasks while it is running.

start-simple-server is essentially a combination of make-server, server-start and server-close.

Try the following example with:
racket -l- jobsched/examples/server-worker-simple

"server-worker-simple.rkt"

#lang racket
(require jobsched)
 
;=== Worker ===;
 
(define (worker-run data)
  (match data
    [(list x y)
     (* x y)]))
 
(module+ worker
  (start-simple-worker worker-run))
 
;=== Server ===;
 
(module+ main
 
  (define (process-result data result)
    (printf "~a × ~a = ~a\n" (first data) (second data) result))
 
  (define data-list
    (for*/list ([x 5] [y 5]) (list x y)))
 
  (start-simple-server
   #:worker-file (this-file)
   #:data-list data-list
   #:process-result process-result))
 

4 Server🔗ℹ

procedure

(make-server #:worker-file worker-file    
  [#:submod-name submod-name    
  #:n-workers n-workers]    
  #:errortrace? errortrace?    
  #:worker-args worker-args)  scheduler?
  worker-file : path-string?
  submod-name : symbol? = 'worker
  n-workers : (or/c #f integer?) = #f
  errortrace? : any/c
  worker-args : (listof (or/c #f string? path?))
Returns a new scheduler. See start-simple-server. If n-workers is not #f, then the workers are started asynchronously. See make-racket-worker for the arguments errortrace? and worker-args.

procedure

(server-start sched    
  #:data-list data-list    
  #:process-result process-result    
  [#:n-workers n-workers    
  #:close-workers? close-workers?])  void?
  sched : scheduler?
  data-list : (listof readable?)
  process-result : (procedure-arity-includes/c 2)
  n-workers : integer?
   = (min (length data-list) (processor-count))
  close-workers? : any/c = #false
Starts an existing scheduler (likely made with make-server). See start-simple-server. If close-workers? is not #f, then the workers are not closed when all jobs are done, so that they can be re-used for subsequent calls of server-start.

procedure

(server-close sched)  void?

  sched : scheduler?
If all workers of sched to exit gracefully, and blocks until all workers have exited.

5 Job🔗ℹ

The bindings in this section are also exported by jobsched.

struct

(struct job (index cost data start-ms stop-ms)
    #:transparent)
  index : nonnegative-integer?
  cost : number?
  data : readable?
  start-ms : nonnegative-integer?
  stop-ms : nonnegative-integer?
This structure is usually used only for accessing information by a worker or by the server.

The cost field is used by the priority queue of the server.

The data field contains readable? (in the sense of racket/fasl) information that is sent to the worker for processing.

The fields start-ms and stop-ms are set automatically by the server when a job is sent to a worker and when the result is received.

6 Server🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(scheduler? v)  boolean?

  v : any/c

procedure

(worker? v)  boolean?

  v : any/c

procedure

(make-scheduler make-worker-command)  scheduler?

  make-worker-command : (-> nonnegative-integer? list?)
Returns a scheduler which will use make-worker-command to start the workers’ Racket processes.

See also make-racket-cmd.

procedure

(scheduler-add-job! sched    
  #:data data    
  [#:cost cost])  void?
  sched : scheduler?
  data : readable?
  cost : number? = 0
Adds a job to the scheduler’s queue. A job can be added either before starting the scheduler, or during, using the #:before-start and #:after-stop arguments of scheduler-start.

The data will be sent to the worker, who will receive it on its input port and will be accessible via job-data.

The cost is used for ordering the job in the priority queue, which is ordered by minimum cost.

Returns the number of jobs pending in the queue.

Returns the number of jobs that have started and not yet finished.

procedure

(scheduler-start sched    
  [n-workers    
  #:before-start before-start    
  #:after-stop after-stop    
  #:close-workers? close-workers?])  void?
  sched : scheduler?
  n-workers : (or/c #f exact-nonnegative-integer?) = #f
  before-start : (-> scheduler? job? any) = void
  after-stop : (-> scheduler? job? readable? any) = void
  close-workers? : any/c = #t
Starts a scheduler, making sure that n-workers racket worker instances are running on the same machine.

If close-workers? is not #f, then all worker instances are closed when returning from the function call, and the workers normally terminate gracefully. If close-workers? is #f, workers are not closed when returning from the call, so they can be re-used for a subsequent call to scheduler-start without starting the workers (and the corresponding racket instances) again. If n-workers is #f, the number of running worker instances is not changed, and previous running instances are re-used. If there are already more running worker instances than n-workers, some workers are killed to match n-workers.

The callback before-start is called before a job is sent to a worker. The callback after-stop is called when a job is finished and the result is received from the worker. Both callbacks can be used to add new jobs to the queue, using scheduler-add-job!.

See an example of using scheduler-start in examples/server-worker.

procedure

(processor-count)  nonnegative-integer?

Re-exported from racket/future.

7 Worker🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(start-worker run-job [#:silent? silent?])  void?

  run-job : (-> job? readable?)
  silent? : any/c = #f
Starts a worker which waits for jobs. Each time a job is received, the run-job procedure is called. The data of the job can be retrieved with (job-data job). If silent? is not #f, all output of run-job to its output port is suppressed—the error port remains untouched.

See example at the top.

Communication between server and worker uses TCP on localhost. The worker’s stdout/stderr are redirected to the server’s stderr, so printf and displayln calls in the worker are visible for debugging and do not interfere with the protocol.

run-job must return a readable? value (in the sense of racket/fasl), and (void) is not readable.

8 Utilities🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(make-racket-cmd path-to-prog 
  [#:submod submod 
  #:errortrace? errortrace?] 
  args ...) 
  (listof path-string?)
  path-to-prog : path-string?
  submod : (or/c symbol? #f) = #f
  errortrace? : any/c = #f
  args : path-string?
Creates a command line to call the racket program path-to-prog. If submod is specificied, the corresponding submodule is called instead. (For example I like to use a worker submodule.) By default, the main submodule is used if available, or the main function if available. The additional command-line arguments args are passed to the program, which may choose to parse them. Note that path? arguments are turned automatically into strings by Racket’s primitives.

syntax

(this-file)

’Returns’ the path-string of the enclosing file, or #f if there is no enclosing file.

9 Comparison with other Racket parallelism mechanisms🔗ℹ

Futures can make use of many cores and can share memory, but are limited in the kind of operations they can handle without blocking. Futures are well suited for numerical applications where garbage collection can be greatly reduced, allowing to make the most of the CPUs without requiring multiple Racket instances.

Places allow more free-form racket computation and can make use of multiple cores and can share memory, but are still constrained by a single garbage collector, which prevents from making full use of all the cores.

Distributed Places create separate Racket instances like jobsched but can also spawn workers on remote machines. When all workers are on the same machine, jobsched may be simpler to use.