Lean OCaml NATS clients built around a small protocol core and an Async runtime.
The project was inspired by romanchechyotkin/nats.ocaml, but this is a fresh implementation with an Async-first runtime, HPUB support, reconnect handling, and a smaller public surface.
nats-client
CONNECT, PUB, HPUB, SUB, UNSUB, PING, PONGINFO, MSG, HMSG, +OK, and -ERRnats-client-async
AsyncPING/PONGWith opam:
opam install nats-client nats-client-async
From source:
dune build @install
dune install
If you are developing against this repository locally, use proto to install the toolchain from .prototools, then run dune and opam directly from that shell environment.
Asyncprotocol=1 / headers=trueCONNECT fieldsHMSG parsing and header deliveryYojson.Safe.tPING handling and client keepalive PINGconnect None is usedCalling Nats_client_async.connect None returns a disabled client.
In disabled mode:
publish is a no-oppublish_json is a no-oppublish_result returns `Droppedsubscribe, unsubscribe, and request return an errorThis is useful when a caller wants to keep its main transaction path identical with or without NATS.
val Nats_client.Protocol.encode_connect :
?connect:Nats_client.Protocol.connect -> unit -> string
val Nats_client.Protocol.encode_pub :
subject:string -> ?reply_to:string -> string -> string
val Nats_client.Protocol.encode_hpub :
subject:string -> ?reply_to:string -> headers:Nats_client.Headers.t -> string -> string
val Nats_client.Protocol.parse_server_line :
string -> Nats_client.Protocol.parsed_line
The protocol layer also exposes:
Nats_client.Headers for ordered, repeatable headersNats_client.Sid for subscription idsNats_client.Protocol.message for received messagesval Nats_client_async.connect :
?connect:Nats_client.Protocol.connect ->
?ping_interval:Time_ns.Span.t ->
?ping_timeout:Time_ns.Span.t ->
?reconnect_initial:Time_ns.Span.t ->
?reconnect_max:Time_ns.Span.t ->
Uri.t option ->
client Deferred.t
val Nats_client_async.publish :
client -> subject:string -> ?reply_to:string -> ?headers:Nats_client.Headers.t -> string -> unit
val Nats_client_async.publish_json :
client -> subject:string -> ?reply_to:string -> ?headers:Nats_client.Headers.t -> Yojson.Safe.t -> unit
val Nats_client_async.subscribe :
client -> subject:string -> ?queue_group:string -> ?sid:Nats_client.Sid.t -> unit -> subscription Or_error.t Deferred.t
val Nats_client_async.request :
client -> subject:string -> ?headers:Nats_client.Headers.t -> ?timeout:Time_ns.Span.t -> string -> string Or_error.t Deferred.t
publish and publish_json are fire-and-forget. If the client is unavailable, they are silently dropped.
Start NATS locally:
docker run --rm --name nats-server -p 4222:4222 nats:latest
Then run code like this:
open Core
open Async
let main () =
let uri = Uri.of_string "nats://127.0.0.1:4222" in
Nats_client_async.connect (Some uri)
>>= fun client ->
Monitor.protect
~finally:(fun () -> Nats_client_async.close client)
(fun () ->
Nats_client_async.subscribe client ~subject:"greet.*" ()
>>= function
| Error error -> Error.raise error
| Ok subscription ->
List.iter [ "greet.sue"; "greet.bob"; "greet.pam" ] ~f:(fun subject ->
Nats_client_async.publish client ~subject "hello");
Pipe.read subscription.messages
>>= function
| `Eof -> Deferred.unit
| `Ok message ->
printf "'%s' received on %s\n"
message.Nats_client.Protocol.payload
message.subject;
Deferred.unit)
let () = Thread_safe.block_on_async_exn main
There is also a runnable version in examples/natsbyexample/publish_subscribe.ml.
Start a local NATS server with Docker:
docker run --rm --name nats-server -p 4222:4222 nats:latest
In another shell, build or run the examples from this repo. All examples default to nats://127.0.0.1:4222, so you do not need to set NATS_URL unless you want a different server.
Build the example executables with:
dune build examples/protocol_demo.exe examples/publish_json.exe examples/request_reply.exe
dune build examples/natsbyexample/publish_subscribe.exe
dune build examples/natsbyexample/request_reply.exe
dune build examples/natsbyexample/json_for_message_payloads.exe
Run them against the Docker NATS server:
dune exec ./examples/protocol_demo.exe
dune exec ./examples/publish_json.exe
dune exec ./examples/request_reply.exe
dune exec ./examples/natsbyexample/publish_subscribe.exe
dune exec ./examples/natsbyexample/request_reply.exe
dune exec ./examples/natsbyexample/json_for_message_payloads.exe
The examples cover:
natsbyexample directory with publish/subscribe, request/reply, and JSON payload examplesTo run against another server, set NATS_URL, for example:
NATS_URL=nats://demo.nats.io:4222 dune exec ./examples/publish_json.exe
The intended release path is PR-based:
.changes/ with a patch, minor, or major header.release-pr.yml workflow aggregates those fragments into CHANGES.md and opens or updates a release: vX.Y.Z PR.vX.Y.Z tag from the merge commit.publish.yml for that exact tag. This is required because tags created by a GitHub Actions workflow with GITHUB_TOKEN do not trigger downstream push workflows.publish.yml workflow checks out the tag, creates the GitHub release with dune-release publish, and submits the package to opam-repository with dune-release opam submit.OPAM_PUBLISH_GH_TOKEN secret configured for the opam-repository submission step.opam-repository PR is merged, users can install the packages with opam install.MIT. See LICENSE.