Lean OCaml NATS clients built around a small protocol core and an Async runtime.
The project was inspired by romanchechyotkin/nats.ocaml, but this is a fresh implementation with an Async-first runtime, HPUB support, reconnect handling, and a smaller public surface.
nats-client
CONNECT, PUB, HPUB, SUB, UNSUB, PING, PONGINFO, MSG, HMSG, +OK, and -ERRnats-client-async
AsyncPING/PONGWith opam:
opam install nats-client nats-client-async
From source:
dune build @install
dune install
If you are developing against this repository locally, use proto to install the toolchain from .prototools, then run dune and opam directly from that shell environment.
Asyncprotocol=1 / headers=trueCONNECT fieldsHMSG parsing and header deliveryYojson.Safe.tPING handling and client keepalive PINGconnect None is usedCalling Nats_client_async.connect None returns a disabled client.
In disabled mode:
publish is a no-oppublish_json is a no-oppublish_result returns `Droppedsubscribe, unsubscribe, and request return an errorThis is useful when a caller wants to keep its main transaction path identical with or without NATS.
val Nats_client.Protocol.encode_connect :
?connect:Nats_client.Protocol.connect -> unit -> string
val Nats_client.Protocol.encode_pub :
subject:string -> ?reply_to:string -> string -> string
val Nats_client.Protocol.encode_hpub :
subject:string -> ?reply_to:string -> headers:Nats_client.Headers.t -> string -> string
val Nats_client.Protocol.parse_server_line :
string -> Nats_client.Protocol.parsed_line
The protocol layer also exposes:
Nats_client.Headers for ordered, repeatable headersNats_client.Sid for subscription idsNats_client.Protocol.message for received messagesval Nats_client_async.connect :
?connect:Nats_client.Protocol.connect ->
?ping_interval:Time_ns.Span.t ->
?ping_timeout:Time_ns.Span.t ->
?reconnect_initial:Time_ns.Span.t ->
?reconnect_max:Time_ns.Span.t ->
Uri.t option ->
client Deferred.t
val Nats_client_async.publish :
client -> subject:string -> ?reply_to:string -> ?headers:Nats_client.Headers.t -> string -> unit
val Nats_client_async.publish_json :
client -> subject:string -> ?reply_to:string -> ?headers:Nats_client.Headers.t -> Yojson.Safe.t -> unit
val Nats_client_async.subscribe :
client -> subject:string -> ?queue_group:string -> ?sid:Nats_client.Sid.t -> unit -> subscription Or_error.t Deferred.t
val Nats_client_async.request :
client -> subject:string -> ?headers:Nats_client.Headers.t -> ?timeout:Time_ns.Span.t -> string -> string Or_error.t Deferred.t
publish and publish_json are fire-and-forget. If the client is unavailable, they are silently dropped.
Start NATS locally:
docker run --rm --name nats-server -p 4222:4222 nats:latest
Then run code like this:
open Core
open Async
let main () =
let uri = Uri.of_string "nats://127.0.0.1:4222" in
Nats_client_async.connect (Some uri)
>>= fun client ->
Monitor.protect
~finally:(fun () -> Nats_client_async.close client)
(fun () ->
Nats_client_async.subscribe client ~subject:"greet.*" ()
>>= function
| Error error -> Error.raise error
| Ok subscription ->
List.iter [ "greet.sue"; "greet.bob"; "greet.pam" ] ~f:(fun subject ->
Nats_client_async.publish client ~subject "hello");
Pipe.read subscription.messages
>>= function
| `Eof -> Deferred.unit
| `Ok message ->
printf "'%s' received on %s\n"
message.Nats_client.Protocol.payload
message.subject;
Deferred.unit)
let () = Thread_safe.block_on_async_exn main
There is also a runnable version in examples/natsbyexample/publish_subscribe.ml.
Start a local NATS server with Docker:
docker run --rm --name nats-server -p 4222:4222 nats:latest
In another shell, build or run the examples from this repo. All examples default to nats://127.0.0.1:4222, so you do not need to set NATS_URL unless you want a different server.
Build the example executables with:
dune build examples/protocol_demo.exe examples/publish_json.exe examples/request_reply.exe
dune build examples/natsbyexample/publish_subscribe.exe
dune build examples/natsbyexample/request_reply.exe
dune build examples/natsbyexample/json_for_message_payloads.exe
Run them against the Docker NATS server:
dune exec ./examples/protocol_demo.exe
dune exec ./examples/publish_json.exe
dune exec ./examples/request_reply.exe
dune exec ./examples/natsbyexample/publish_subscribe.exe
dune exec ./examples/natsbyexample/request_reply.exe
dune exec ./examples/natsbyexample/json_for_message_payloads.exe
The examples cover:
natsbyexample directory with publish/subscribe, request/reply, and JSON payload examplesTo run against another server, set NATS_URL, for example:
NATS_URL=nats://demo.nats.io:4222 dune exec ./examples/publish_json.exe
The intended release path is PR-based:
.changes/ with a patch, minor, or major header.release-pr.yml workflow aggregates those fragments into CHANGES.md and opens or updates a release: vX.Y.Z PR.vX.Y.Z tag from the merge commit.publish.yml for that exact tag. This is required because tags created by a GitHub Actions workflow with GITHUB_TOKEN do not trigger downstream push workflows.publish.yml workflow checks out the tag, creates the GitHub release with dune-release publish, and submits the package to opam-repository with dune-release opam submit.OPAM_PUBLISH_GH_TOKEN secret configured for the opam-repository submission step.opam-repository PR is merged, users can install the packages with opam install.MIT. See LICENSE.
We use cookies to analyze traffic and improve your experience. You can accept or reject analytics cookies.