Jobsched: Running jobs with multiple Racket instances
| (require jobsched) | package: jobsched |
Jobsched is a job scheduler with a server-worker architecture on a single machine, where the server dispatches a number of jobs to the workers.
All workers are identical and are able to process any job assigned by the server. The workers are independent racket programs, each with its own Racket instance. See Comparison with other Racket parallelism mechanisms for more information.
The server (a Racket program) is started.
N jobs are added to the job queue in the server. A job is specified as read-able data.
The server starts K workers (independent Racket programs). Each worker is spawned as a separate OS process and communicates with the server over a localhost TCP connection (one per worker).
Each worker signals to the server over TCP that it is ready to receive jobs.
As soon as a worker is ready, the server sends it one of the remaining jobs.
When a worker has finished a job, it sends the result to the server, and waits for another job.
When the server receives the result of a job, it processes this result and sends a new job (if any is remaining) to the worker.
When all N jobs are finished, the server finishes.
If your machine has K cores and sufficient memory, jobsched can run K workers in parallel and benefit from a speedup of a factor K. Jobsched has been successfully used with more than 120 workers in parallel, with more than 100 000 fast-paced jobs to dispatch between them.
1 Remote-Call: Remote function calls
| (require jobsched/remote-call) | package: jobsched |
The jobsched/remote-call module provides a higher-level interface where jobs are expressed as function calls.
"server-worker-remote-call.rkt"
#lang racket (require jobsched) ;; Functions used by the server must be `provide`d. (provide greet total) (define (greet name #:greeting [greeting "Hello"]) (format "~a, ~a!" greeting name)) (define (total prices #:tax-rate [tax-rate 0]) (define subtotal (apply + prices)) (exact->inexact (* subtotal (+ 1 tax-rate)))) ;;=== Worker ===;; ;; The worker resolves and calls the functions automatically. (module+ worker (start-remote-call-worker)) ;;=== Server ===;; (module+ main (start-simple-server #:worker-file (this-file) ;; These look like function calls, but they are captured and ;; sent to the workers for execution. Arguments are evaluated ;; here; the function itself is resolved on the worker. #:data-list (list (remote-call (greet "Alice")) (remote-call (greet "Bob" #:greeting "Hi")) (remote-call (total '(10 20 30))) (remote-call (total '(100 200) #:tax-rate 0.2))) #:process-result (λ (data result) (displayln result)) #:n-workers 3))
racket -l- jobsched/examples/server-worker-remote-call
The function’s module path (resolved at compile time via identifier-binding).
The function’s symbol name.
The evaluated arguments (positional and keyword).
The worker then uses dynamic-require to resolve the function and
applies it with the captured arguments. This means the server and worker
are guaranteed to call the same function—
Important: The arguments to the function are evaluated on the server before being serialized and sent to the worker. The worker receives only the function name and the serialized argument values, and calls the function on these values. In particular, procedures and closures are not serializable (via racket/fasl) and cannot be passed as arguments. Only the top-level function itself is resolved on the worker via dynamic-require.
Remark: In the example above, the functions, the worker and the server are all defined in the same file. This is not a requirement, and they can be in three different files.
syntax
(remote-call (fun arg ...))
Extracts the module path and symbol for fun via identifier-binding.
Raises a compile-time error if fun is not provided by any module.
We recommend requiring define2 in the module where the functions are defined, so as to raise compile-time errors about the function’s signature.
At run time, the arguments are evaluated and stored in the job data,
but the function is not called—
The function must be provided by its module, otherwise a compile-time error is raised.
All arguments must be serializable via racket/fasl. Procedures, closures, and other non-serializable values cannot be passed as arguments.
Renamed imports (e.g., (require (rename-in racket/list [first my-first]))) are not supported. The embedded module path refers to the original name. Use re-exported names instead.
planet modules are untested.
The data argument received by the #:process-result callback is the remote-call value. You can inspect it with the following accessors and predicate:
procedure
(remote-call? v) → boolean?
v : any/c (remote-call-fun-sym rc) → symbol? rc : remote-call? (remote-call-pos-args rc) → list? rc : remote-call? (remote-call-kw-dict rc) → dict? rc : remote-call? (remote-call-mod-path rc) → module-path? rc : remote-call?
procedure
For each received job, the worker uses dynamic-require to load the function from the embedded module path, then calls it with the stored arguments.
2 Lower-level API
The following sections document the lower-level server/worker API for cases
where remote-call does not provide enough control—
All definitions exported by the various modules below are also exported by jobsched.
3 Simple server / worker
procedure
(start-simple-worker run [#:silent? silent?]) → void?
run : (-> readable? readable?) silent? : any/c = #f
Note that start-simple-worker can be used with start-simple-server, server-start and scheduler-start.
IMPORTANT: See the remarks for start-worker.
procedure
(start-simple-server #:worker-file worker-file #:data-list data-list #:process-result process-result [ #:submod-name submod-name #:n-workers n-workers]) → void? worker-file : path-string? data-list : (listof readable?) process-result : (procedure-arity-includes/c 2) submod-name : symbol? = 'worker
n-workers : integer? = (min (length data-list) (processor-count))
The workers will receive one element of data-list at a time, and return a result to be processed by process-result.
The server starts n-workers workers in separate OS processes. Refer to scheduler-start for close-workers?.
Note: By contrast to scheduler-start, the simple server does not allow to add more tasks while it is running.
start-simple-server is essentially a combination of make-server, server-start and server-close.
racket -l- jobsched/examples/server-worker-simple
"server-worker-simple.rkt"
#lang racket (require jobsched) ;=== Worker ===; (define (worker-run data) (match data [(list x y) (* x y)])) (module+ worker (start-simple-worker worker-run)) ;=== Server ===; (module+ main (define (process-result data result) (printf "~a × ~a = ~a\n" (first data) (second data) result)) (define data-list (for*/list ([x 5] [y 5]) (list x y))) (start-simple-server #:worker-file (this-file) #:data-list data-list #:process-result process-result))
4 Server
procedure
(make-server #:worker-file worker-file [ #:submod-name submod-name #:n-workers n-workers] #:errortrace? errortrace? #:worker-args worker-args) → scheduler? worker-file : path-string? submod-name : symbol? = 'worker n-workers : (or/c #f integer?) = #f errortrace? : any/c worker-args : (listof (or/c #f string? path?))
procedure
(server-start sched #:data-list data-list #:process-result process-result [ #:n-workers n-workers #:close-workers? close-workers?]) → void? sched : scheduler? data-list : (listof readable?) process-result : (procedure-arity-includes/c 2)
n-workers : integer? = (min (length data-list) (processor-count)) close-workers? : any/c = #false
procedure
(server-close sched) → void?
sched : scheduler?
5 Job
| (require jobsched/job) | package: jobsched |
struct
(struct job (index cost data start-ms stop-ms) #:transparent) index : nonnegative-integer? cost : number? data : readable? start-ms : nonnegative-integer? stop-ms : nonnegative-integer?
The cost field is used by the priority queue of the server.
The data field contains readable? (in the sense of racket/fasl) information that is sent to the worker for processing.
The fields start-ms and stop-ms are set automatically by the server when a job is sent to a worker and when the result is received.
6 Server
| (require jobsched/server) | package: jobsched |
procedure
(scheduler? v) → boolean?
v : any/c
procedure
(make-scheduler make-worker-command) → scheduler?
make-worker-command : (-> nonnegative-integer? list?)
See also make-racket-cmd.
procedure
(scheduler-add-job! sched #:data data [ #:cost cost]) → void? sched : scheduler? data : readable? cost : number? = 0
The data will be sent to the worker, who will receive it on its input port and will be accessible via job-data.
The cost is used for ordering the job in the priority queue, which is ordered by minimum cost.
procedure
(scheduler-n-queued-jobs sched) → exact-nonnegative-integer?
sched : scheduler?
procedure
(scheduler-n-active-jobs sched) → exact-nonnegative-integer?
sched : scheduler?
procedure
(scheduler-start sched [ n-workers #:before-start before-start #:after-stop after-stop #:close-workers? close-workers?]) → void? sched : scheduler? n-workers : (or/c #f exact-nonnegative-integer?) = #f before-start : (-> scheduler? job? any) = void after-stop : (-> scheduler? job? readable? any) = void close-workers? : any/c = #t
If close-workers? is not #f, then all worker instances are closed when returning from the function call, and the workers normally terminate gracefully. If close-workers? is #f, workers are not closed when returning from the call, so they can be re-used for a subsequent call to scheduler-start without starting the workers (and the corresponding racket instances) again. If n-workers is #f, the number of running worker instances is not changed, and previous running instances are re-used. If there are already more running worker instances than n-workers, some workers are killed to match n-workers.
The callback before-start is called before a job is sent to a worker. The callback after-stop is called when a job is finished and the result is received from the worker. Both callbacks can be used to add new jobs to the queue, using scheduler-add-job!.
See an example of using scheduler-start in examples/server-worker.
procedure
(processor-count) → nonnegative-integer?
7 Worker
| (require jobsched/worker) | package: jobsched |
procedure
(start-worker run-job [#:silent? silent?]) → void?
run-job : (-> job? readable?) silent? : any/c = #f
See example at the top.
Communication between server and worker uses TCP on localhost. The worker’s stdout/stderr are redirected to the server’s stderr, so printf and displayln calls in the worker are visible for debugging and do not interfere with the protocol.
run-job must return a readable? value (in the sense of racket/fasl), and (void) is not readable.
8 Utilities
| (require jobsched/utils) | package: jobsched |
procedure
(make-racket-cmd path-to-prog [ #:submod submod #:errortrace? errortrace?] args ...) → (listof path-string?) path-to-prog : path-string? submod : (or/c symbol? #f) = #f errortrace? : any/c = #f args : path-string?
syntax
9 Comparison with other Racket parallelism mechanisms
Futures can make use of many cores and can share memory, but are limited in the kind of operations they can handle without blocking. Futures are well suited for numerical applications where garbage collection can be greatly reduced, allowing to make the most of the CPUs without requiring multiple Racket instances.
Places allow more free-form racket computation and can make use of multiple cores and can share memory, but are still constrained by a single garbage collector, which prevents from making full use of all the cores.
Distributed Places create separate Racket instances like jobsched but can also spawn workers on remote machines. When all workers are on the same machine, jobsched may be simpler to use.