Draining Messages Before Disconnect

    Drain provides clients that use queue subscriptions with a way to bring down applications without losing any messages. A client can bring up a new queue member, drain and shut down the old queue member, all without losing messages sent to the old client. Without drain, there is the possibility of lost messages due to delivery timing.

    The libraries can provide drain on a connection or on a subscriber, or both.

    For a connection the process is essentially:

    1. Drain all subscriptions
    2. Stop new messages from being published
    3. Flush any remaining published messages
    4. Close

    The API for drain can generally be used instead of close:

    As an example of draining a connection:

    {% tabs %} {% tab title=”Go” %}

    {% endtab %}

    {% tab title=”Java” %}

    1. // Use a latch to wait for a message to arrive
    2. CountDownLatch latch = new CountDownLatch(1);
    3. // Create a dispatcher and inline message handler
    4. Dispatcher d = nc.createDispatcher((msg) -> {
    5. String str = new String(msg.getData(), StandardCharsets.UTF_8);
    6. System.out.println(str);
    7. latch.countDown();
    8. });
    9. // Subscribe
    10. d.subscribe("updates");
    11. // Wait for a message to come in
    12. latch.await();
    13. // Drain the connection, which will close it
    14. CompletableFuture<Boolean> drained = nc.drain(Duration.ofSeconds(10));
    15. // Wait for the drain to complete
    16. drained.get();

    {% endtab %}

    {% tab title=”JavaScript” %}

    1. const nc = await connect({ servers: "demo.nats.io" });
    2. const sub = nc.subscribe(createInbox(), () => {});
    3. nc.publish(sub.getSubject());
    4. await nc.drain();

    {% tab title=”Python” %}

    1. import asyncio
    2. from nats.aio.client import Client as NATS
    3. async def example(loop):
    4. nc = NATS()
    5. await nc.connect("nats://127.0.0.1:4222", loop=loop)
    6. async def handler(msg):
    7. print("[Received] ", msg)
    8. await nc.publish(msg.reply, b'I can help')
    9. # Can check whether client is in draining state
    10. if nc.is_draining:
    11. print("Connection is draining")
    12. await nc.subscribe("help", "workers", cb=handler)
    13. await nc.flush()
    14. requests = []
    15. for i in range(0, 10):
    16. request = nc.request("help", b'help!', timeout=1)
    17. requests.append(request)
    18. # Wait for all the responses
    19. responses = []
    20. responses = await asyncio.gather(*requests)
    21. # Gracefully close the connection.
    22. await nc.drain()
    23. print("Received {} responses".format(len(responses)))

    {% endtab %}

    {% tab title=”Ruby” %}

    {% endtab %}

    {% tab title=”C” %}

    1. static void
    2. onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
    3. {
    4. printf("Received msg: %s - %.*s\n",
    5. natsMsg_GetSubject(msg),
    6. natsMsg_GetDataLength(msg),
    7. natsMsg_GetData(msg));
    8. // Add some delay while processing
    9. nats_Sleep(200);
    10. // Need to destroy the message!
    11. natsMsg_Destroy(msg);
    12. }
    13. static void
    14. closeHandler(natsConnection *conn, void *closure)
    15. {
    16. cond_variable cv = (cond_variable) closure;
    17. notify_cond_variable(cv);
    18. }
    19. (...)
    20. natsConnection *conn = NULL;
    21. natsOptions *opts = NULL;
    22. natsSubscription *sub = NULL;
    23. natsStatus s = NATS_OK;
    24. cond_variable cv = new_cond_variable(); // some fictuous way to notify between threads.
    25. s = natsOptions_Create(&opts);
    26. if (s == NATS_OK)
    27. // Setup a close handler and pass a reference to our condition variable.
    28. s = natsOptions_SetClosedCB(opts, closeHandler, (void*) cv);
    29. if (s == NATS_OK)
    30. s = natsConnection_Connect(&conn, opts);
    31. // Subscribe
    32. if (s == NATS_OK)
    33. s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
    34. // Publish a message
    35. if (s == NATS_OK)
    36. s = natsConnection_PublishString(conn, "foo", "hello");
    37. // Drain the connection, which will close it when done.
    38. if (s == NATS_OK)
    39. s = natsConnection_Drain(conn);
    40. // Wait for the connection to be closed
    41. if (s == NATS_OK)
    42. cond_variable_wait(cv);
    43. (...)
    44. // Destroy objects that were created
    45. natsSubscription_Destroy(sub);
    46. natsConnection_Destroy(conn);
    47. natsOptions_Destroy(opts);

    {% endtab %} {% endtabs %}

    The mechanics of drain for a subscription are simpler:

    1. Unsubscribe
    2. Process all cached or inflight messages
    3. Clean up

    The API for drain can generally be used instead of unsubscribe:

    {% tabs %} {% tab title=”Go” %}

    1. nc, err := nats.Connect("demo.nats.io")
    2. if err != nil {
    3. log.Fatal(err)
    4. }
    5. defer nc.Close()
    6. done := sync.WaitGroup{}
    7. done.Add(1)
    8. count := 0
    9. errCh := make(chan error, 1)
    10. msgAfterDrain := "not this one"
    11. // Just to not collide using the demo server with other users.
    12. subject := nats.NewInbox()
    13. // This callback will process each message slowly
    14. sub, err := nc.Subscribe(subject, func(m *nats.Msg) {
    15. if string(m.Data) == msgAfterDrain {
    16. errCh <- fmt.Errorf("Should not have received this message")
    17. return
    18. }
    19. time.Sleep(100 * time.Millisecond)
    20. count++
    21. if count == 2 {
    22. done.Done()
    23. }
    24. })
    25. // Send 2 messages
    26. for i := 0; i < 2; i++ {
    27. nc.Publish(subject, []byte("hello"))
    28. }
    29. // Call Drain on the subscription. It unsubscribes but
    30. // wait for all pending messages to be processed.
    31. if err := sub.Drain(); err != nil {
    32. log.Fatal(err)
    33. }
    34. // Send one more message, this message should not be received
    35. nc.Publish(subject, []byte(msgAfterDrain))
    36. done.Wait()
    37. // Now check that the 3rd message was not received
    38. case e := <-errCh:
    39. log.Fatal(e)
    40. case <-time.After(200 * time.Millisecond):
    41. // OK!
    42. }

    {% endtab %}

    1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
    2. // Use a latch to wait for a message to arrive
    3. CountDownLatch latch = new CountDownLatch(1);
    4. // Create a dispatcher and inline message handler
    5. Dispatcher d = nc.createDispatcher((msg) -> {
    6. String str = new String(msg.getData(), StandardCharsets.UTF_8);
    7. System.out.println(str);
    8. latch.countDown();
    9. });
    10. // Subscribe
    11. d.subscribe("updates");
    12. // Wait for a message to come in
    13. latch.await();
    14. // Messages that have arrived will be processed
    15. CompletableFuture<Boolean> drained = d.drain(Duration.ofSeconds(10));
    16. // Wait for the drain to complete
    17. drained.get();
    18. // Close the connection
    19. nc.close();

    {% endtab %}

    {% tab title=”JavaScript” %}

    {% endtab %}

    {% tab title=”Python” %}

    1. import asyncio
    2. from nats.aio.client import Client as NATS
    3. async def example(loop):
    4. nc = NATS()
    5. await nc.connect("nats://127.0.0.1:4222", loop=loop)
    6. async def handler(msg):
    7. print("[Received] ", msg)
    8. await nc.publish(msg.reply, b'I can help')
    9. # Can check whether client is in draining state
    10. if nc.is_draining:
    11. print("Connection is draining")
    12. sid = await nc.subscribe("help", "workers", cb=handler)
    13. await nc.flush()
    14. # Gracefully unsubscribe the subscription
    15. await nc.drain(sid)

    {% endtab %}

    {% tab title=”Ruby” %}

    1. # There is currently no API to drain a single subscription, the whole connection can be drained though via NATS.drain

    {% endtab %}

    {% tab title=”C” %}

    1. natsConnection *conn = NULL;
    2. natsSubscription *sub = NULL;
    3. natsStatus s = NATS_OK;
    4. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
    5. // Subscribe
    6. if (s == NATS_OK)
    7. s = natsConnection_Subscribe(&sub, conn, "foo", onMsg, NULL);
    8. // Publish 2 messages
    9. if (s == NATS_OK)
    10. {
    11. int i;
    12. for (i=0; (s == NATS_OK) && (i<2); i++)
    13. {
    14. s = natsConnection_PublishString(conn, "foo", "hello");
    15. }
    16. }
    17. // Call Drain on the subscription. It unsubscribes but
    18. // wait for all pending messages to be processed.
    19. if (s == NATS_OK)
    20. s = natsSubscription_Drain(sub);
    21. (...)
    22. // Destroy objects that were created
    23. natsConnection_Destroy(conn);

    {% endtab %} {% endtabs %}

    Because draining can involve messages flowing to the server, for a flush and asynchronous message processing, the timeout for drain should generally be higher than the timeout for a simple message request/reply or similar.