Asynchronous Subscriptions

    Note: For a given subscription, messages are dispatched serially, one message at a time. If your application does not care about processing ordering and would prefer the messages to be dispatched concurrently, it is the application’s responsibility to move them to some internal queue to be picked up by threads/go routines.

    The following example subscribes to the subject and handles the incoming messages:

    {% tabs %} {% tab title=”Go” %}

    {% endtab %}

    1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
    2. // Use a latch to wait for a message to arrive
    3. CountDownLatch latch = new CountDownLatch(1);
    4. // Create a dispatcher and inline message handler
    5. Dispatcher d = nc.createDispatcher((msg) -> {
    6. String str = new String(msg.getData(), StandardCharsets.UTF_8);
    7. System.out.println(str);
    8. latch.countDown();
    9. });
    10. // Subscribe
    11. d.subscribe("updates");
    12. // Wait for a message to come in
    13. latch.await();
    14. // Close the connection
    15. nc.close();

    {% endtab %}

    {% tab title=”JavaScript” %}

    {% endtab %}

    {% tab title=”Python” %}

    1. nc = NATS()
    2. future = asyncio.Future()
    3. async def cb(msg):
    4. nonlocal future
    5. future.set_result(msg)
    6. await nc.subscribe("updates", cb=cb)
    7. await nc.publish("updates", b'All is Well')
    8. await nc.flush()
    9. # Wait for message to come in
    10. msg = await asyncio.wait_for(future, 1)

    {% tab title=”Ruby” %}

    {% endtab %}

    {% tab title=”C” %}

    1. static void
    2. onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
    3. {
    4. printf("Received msg: %s - %.*s\n",
    5. natsMsg_GetSubject(msg),
    6. natsMsg_GetDataLength(msg),
    7. natsMsg_GetData(msg));
    8. // Need to destroy the message!
    9. }
    10. (...)
    11. natsConnection *conn = NULL;
    12. natsSubscription *sub = NULL;
    13. natsStatus s;
    14. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
    15. if (s == NATS_OK)
    16. {
    17. // Creates an asynchronous subscription on subject "foo".
    18. // When a message is sent on subject "foo", the callback
    19. // onMsg() will be invoked by the client library.
    20. // You can pass a closure as the last argument.
    21. s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
    22. }
    23. (...)
    24. // Destroy objects that were created
    25. natsConnection_Destroy(conn);

    {% endtab %} {% endtabs %}