Apache Kafka
1 Client
client?
make-client
disconnect-all
sasl-ctx-proc/  c
1.1 Proxies
proxy?
make-http-proxy
1.2 Errors
exn:  fail:  kafka?
exn:  fail:  kafka:  client?
exn:  fail:  kafka:  server?
1.3 Topic Management
create-topics
delete-topics
Create  Topic
make-Create  Topic
Created  Topics
Created  Topic
Deleted  Topics
Deleted  Topic
1.4 Record Results
Record  Result
Produce  Response  Partition
1.5 Contracts
error-code/  c
tags/  c
2 Consumer
consumer?
make-consumer
consume-evt
consumer-commit
consumer-stop
2.1 Records
record?
record-partition-id
record-offset
record-key
record-value
record-headers
2.2 Limitations
2.2.1 Compression
2.2.2 Group Assignment
2.2.3 Error Detection
3 Producer
producer?
make-producer
produce
producer-flush
producer-stop
3.1 Limitations
3.1.1 Compression
8.12

Apache Kafka🔗ℹ

Bogdan Popa <bogdan@defn.io>

This package provides a client for Apache Kafka versions 0.11 and up. It is a work in progress, so expect breaking changes.

1 Client🔗ℹ

 (require kafka) package: kafka-lib

Clients transparently pool connections to brokers within a cluster. Connections are leased from the pool in order of least in-progress requests. Reconnections are handled transparently, and connection errors bubble up to the caller. Despite being thread-safe, clients may not be shared between consumers.

procedure

(client? v)  boolean?

  v : any/c
Returns #t when v is a client.

procedure

(make-client [#:id id    
  #:bootstrap-host host    
  #:bootstrap-port port    
  #:sasl-mechanism&ctx sasl-ctx    
  #:ssl-ctx ssl-ctx    
  #:proxy proxy])  client?
  id : non-empty-string? = "racket-kafka"
  host : string? = "127.0.0.1"
  port : (integer-in 0 65535) = 9092
  sasl-ctx : 
(or/c #f
      (list/c 'plain string?)
      (list/c symbol? sasl-ctx-proc/c))
 = #f
  ssl-ctx : (or/c #f ssl-client-context?) = #f
  proxy : (or/c #f proxy?) = #f
Connects to a Kafka cluster via the server at host and port and returns a client.

When a sasl-ctx is provided, it is used to authenticate the connection to the bootstrap host as well as any subsequent connections made to other nodes in the cluster.

When an ssl-ctx is provided, it is used to encrypt all connections.

procedure

(disconnect-all c)  void?

  c : client?
Closes all open connections owned by c.

value

sasl-ctx-proc/c : (-> string? (integer-in 0 65535) sasl-ctx?)

The contract for SASL context factories. The first argument is the host being authenticated against and the second is the port. See "example/amazon-msk-auth.rkt" for an example.

1.1 Proxies🔗ℹ

Proxies act as intermediaries between clients and brokers.

procedure

(proxy? v)  boolean?

  v : any/c
Returns #t when v is a proxy.

procedure

(make-http-proxy host port)  proxy?

  host : string?
  port : (integer-in 1 65535)
Returns a proxy that connects to the given host and port using the HTTP/1.1 CONNECT protocol.

1.2 Errors🔗ℹ

procedure

(exn:fail:kafka? v)  boolean?

  v : any/c

procedure

(exn:fail:kafka:client? v)  boolean?

  v : any/c

procedure

(exn:fail:kafka:server? v)  boolean?

  v : any/c
Predicates for the various kinds of errors that may be raised.

1.3 Topic Management🔗ℹ

procedure

(create-topics c t ...+)  CreatedTopics?

  c : client?
  t : CreateTopic?
Creates the given topics on the broker if they don’t already exist.

When given a set of topics, some of them may succeed, and some may fail. It’s up to the caller to inspect the error codes on the returned CreatedTopics.

procedure

(delete-topics c t ...+)  DeletedTopics?

  c : client?
  t : string?
Deletes the given set of topics if they exist.

struct

(struct CreateTopic (name partitions))

  name : string?
  partitions : exact-positive-integer?

procedure

(make-CreateTopic #:name name    
  #:partitions partitions    
  [#:replication-factor factor    
  #:assignments assignments    
  #:configs configs])  CreateTopic?
  name : string?
  partitions : exact-positive-integer?
  factor : (or/c -1 exact-positive-integer?) = -1
  assignments : (hash/c exact-nonnegative-integer? (listof exact-nonnegative-integer?))
   = (hasheqv)
  configs : (hash/c string? string?) = (hash)
Structs representing new topic configuration to be passed to create-topics.

struct

(struct CreatedTopics (topics))

  topics : (listof CreatedTopic?)

struct

(struct CreatedTopic (error-code error-message name))

  error-code : exact-nonnegative-integer?
  error-message : (or/c #f string?)
  name : string?
Structs representing the results of calling create-topics.

struct

(struct DeletedTopics (throttle-time-ms topics tags))

  throttle-time-ms : (or/c exact-nonnegative-integer?)
  topics : (listof DeletedTopic?)
  tags : (or/c #f tags/c)

struct

(struct DeletedTopic (error-code error-message name uuid tags))

  error-code : error-code/c
  error-message : (or/c #f string?)
  name : string?
  uuid : (or/c #f bytes?)
  tags : (or/c #f tags/c)
Structs representing the results of calling delete-topics.

1.4 Record Results🔗ℹ

Record results represent the results of publishing individual records.

struct

(struct RecordResult (topic partition))

  topic : string?
  partition : ProduceResponsePartition?
Represents a record result.

struct

(struct ProduceResponsePartition (id error-code offset))

  id : exact-nonnegative-integer?
  error-code : exact-nonnegative-integer?
  offset : exact-nonnegative-integer?
Details about the partition a record was published to. If the error-code is non-zero, there was an error and the record was not published.

1.5 Contracts🔗ℹ

The contract for error codes.

value

tags/c : hash?

The contract for tags.

2 Consumer🔗ℹ

 (require kafka/consumer) package: kafka-lib

Consumers form consumer groups to subscribe to topics and retrieve records. As the name implies, consumer groups group consumers together so that topic partitions may be spread out across the members of the group.

Consumers are not thread-safe.

procedure

(consumer? v)  boolean?

  v : any/c
Returns #t when v is a consumer.

procedure

(make-consumer client 
  group-id 
  topic ...+ 
  [#:reset-strategy strategy 
  #:session-timeout-ms session-timeout-ms]) 
  consumer?
  client : client?
  group-id : string?
  topic : string?
  strategy : (or/c 'earliest 'latest) = 'earliest
  session-timeout-ms : exact-nonnegative-integer? = 30000
Creates a new consumer. The new consumer joins the consumer group named group-id and subscribes to the given set of topics. If there are any existing consumers in the joined group, this may trigger a group rebalance. Should the consumer be picked as the leader by the group coordinator, it handles assigning topics & partitions to all of the members of the group.

The #:reset-strategy argument controls what the consumer’s initial offsets for newly-assigned partitions are going to be. When this value is 'earliest, the consumer will receive records starting from the beginning of each partition. When this value is 'latest, it will receive records starting from the time that it subscribes to each topic.

procedure

(consume-evt c [timeout-ms])

  
(evt/c
 (or/c
  (values 'rebalance (hash/c string? (hash/c integer? integer?)))
  (values 'records (vectorof record?))))
  c : consumer?
  timeout-ms : exact-nonnegative-integer? = 1000
Returns a synchronizable event that represents the result of consuming data from the topics c is subscribed to. The synchronization result is a pair of values representing the result type and its associated data.

When a consumer leaves or joins the consumer group, the event will synchronize to a 'rebalance result. In that case, the consumer will automatically re-join the group and discard any un-committed offsets. The associated data is a hash from topic names to hashes of partition ids to offsets. When a rebalance happens, you must take care not to commit any old offsets (i.e. you must issue a new consume-evt before making any calls to consumer-commit).

When either the timeout passes or new records become available on the broker, the event will synchronize to a 'records result whose associated data will be a vector of records.

More result types may be added in the future.

The timeout-ms argument controls how long the server-side may wait before returning a response. If there are no records in between the time this function is called and when the timeout passes, an empty vector or records will be returned. The other end may not necessarily respect the timeout value, and may return immediately when there are no more records.

procedure

(consumer-commit c)  void?

  c : consumer?
Commits the topic-partition offsets consumed so far.

Call this function after you have successfully processed a batch of records received from consume-evt. If you forget to call this function, or if the consumer crashes in between calling consume-evt and calling this function, another consumer in the group will eventually receive that same batch again.

procedure

(consumer-stop c)  void?

  c : consumer?
Gracefully stops the consumer and removes it from its consumer group. The consumer may not be used after this point.

2.1 Records🔗ℹ

Records represent individual key-value pairs on a topic.

procedure

(record? v)  boolean?

  v : any/c
Returns #t when v is a record.

Returns the id of the partition the record belongs to.

procedure

(record-offset r)  exact-nonnegative-integer?

  r : record?
Returns the record’s partition offset.

procedure

(record-key r)  (or/c #f bytes?)

  r : record?
Returns the record’s key.

procedure

(record-value r)  bytes?

  r : record?
Returns the record’s value.

procedure

(record-headers r)  (hash/c string? bytes?)

  r : record?
Returns the record’s headers.

2.2 Limitations🔗ℹ

Consumers have several limitations at the moment, some of which will be addressed in future versions.

2.2.1 Compression🔗ℹ

At the moment, only 'gzip is supported when producing records and 'gzip, 'lz4, 'snappy and 'zstd are supported when consuming records. Fetching a batch of records that is compressed using any other method will fail silently.

2.2.2 Group Assignment🔗ℹ

Only brokers that implement client-side assignment are supported (Apache Kafka versions 0.11 and up). At the moment, only the range and round-robin group assignment strategies are implemented.

2.2.3 Error Detection🔗ℹ

Batches retrieved from the broker contain a CRC code for error detection, but the library does not validate these at the moment.

3 Producer🔗ℹ

 (require kafka/producer) package: kafka-lib

Producers publish data to one or more topics. They batch data internally by topic & partition, and they are thread-safe.

procedure

(producer? v)  boolean?

  v : any/c
Returns #t when v is a producer.

procedure

(make-producer c    
  [#:acks acks    
  #:compression compression    
  #:flush-interval interval    
  #:max-batch-bytes max-bytes    
  #:max-batch-size max-size])  producer?
  c : client?
  acks : (or/c 'none 'leader 'full) = 'leader
  compression : (or/c 'none 'gzip) = 'gzip
  interval : exact-positive-integer? = 60000
  max-bytes : exact-positive-integer? = (* 100 1024 1024)
  max-size : exact-positive-integer? = 1000
Returns a producer.

Data is batched internally by topic & partition. Within each batch, the data is compressed according to the #:compression method.

The producer automatically flushes its data every #:flush-interval milliseconds, whenever the total size of all its batches exceeds #:max-batch-bytes, or whenever the total number of records contained in all of its batches exceeds #:max-batch-size, whichever condition is true first.

During a flush, calling produce on a producer blocks until the flush completes.

procedure

(produce p    
  topic    
  key    
  value    
  [#:partition partition    
  #:headers headers])  evt?
  p : producer?
  topic : string?
  key : (or/c #f bytes?)
  value : (or/c #f bytes?)
  partition : exact-nonnegative-integer? = 0
  headers : (hash/c string? (or/c #f bytes?)) = (hash)
Returns a synchronizable event that is ready for synchronization after a new record has been written to the partition belonging to topic. The event’s synchronization result will be a record result.

Typically, you would call this function in a loop to produce a set of data, collect the results then sync them to ensure they’ve been written to the log.

Each result event may only be synced at most once.

procedure

(producer-flush p)  void?

  p : producer?
Flushes any pending batches in p.

procedure

(producer-stop p)  void?

  p : producer?
Gracefully stops p after flushing any pending data to the broker. The producer may no longer be used after this is called.

3.1 Limitations🔗ℹ

3.1.1 Compression🔗ℹ

Kafka supports snappy, lz4, and zstd compression in addition to gzip, but this library only supports gzip at the moment.