Listen for Connection Events

    The actual API for these listeners is language dependent, but the following examples show a few of the more common use cases. See the API documentation for the client library you are using for more specific instructions.

    Connection events may include the connection being closed, disconnected or reconnected. Reconnecting involves a disconnect and connect, but depending on the library implementation may also include multiple disconnects as the client tries to find a server, or the server is rebooted.

    {% tabs %} {% tab title=”Go” %}

    {% endtab %}

    {% tab title=”Java” %}

    1. public void connectionEvent(Connection natsConnection, Events event) {
    2. System.out.println("Connection event - "+event);
    3. }
    4. }
    5. public class SetConnectionListener {
    6. public static void main(String[] args) {
    7. try {
    8. Options options = new Options.Builder().
    9. server("nats://demo.nats.io:4222").
    10. connectionListener(new MyConnectionListener()). // Set the listener
    11. build();
    12. Connection nc = Nats.connect(options);
    13. // Do something with the connection
    14. nc.close();
    15. } catch (Exception e) {
    16. e.printStackTrace();
    17. }
    18. }
    19. }

    {% endtab %}

    {% tab title=”JavaScript” %}

    1. const nc = await connect({ servers: ["demo.nats.io"] });
    2. nc.closed().then(() => {
    3. t.log("the connection closed!");
    4. });
    5. (async () => {
    6. for await (const s of nc.status()) {
    7. switch (s.type) {
    8. case Status.Disconnect:
    9. t.log(`client disconnected - ${s.data}`);
    10. break;
    11. case Status.LDM:
    12. t.log("client has been requested to reconnect");
    13. break;
    14. case Status.Update:
    15. t.log(`client received a cluster update - ${s.data}`);
    16. break;
    17. case Status.Reconnect:
    18. t.log(`client reconnected - ${s.data}`);
    19. break;
    20. case Status.Error:
    21. t.log("client got a permissions error");
    22. break;
    23. case DebugEvents.Reconnecting:
    24. t.log("client is attempting to reconnect");
    25. break;
    26. case DebugEvents.StaleConnection:
    27. t.log("client has a stale connection");
    28. break;
    29. default:
    30. t.log(`got an unknown status ${s.type}`);
    31. }
    32. }
    33. })().then();

    {% endtab %}

    {% tab title=”Python” %}

    1. # Asyncio NATS client can be defined a number of event callbacks
    2. async def disconnected_cb():
    3. print("Got disconnected!")
    4. async def reconnected_cb():
    5. # See who we are connected to on reconnect.
    6. print("Got reconnected to {url}".format(url=nc.connected_url.netloc))
    7. async def error_cb(e):
    8. print("There was an error: {}".format(e))
    9. async def closed_cb():
    10. print("Connection is closed")
    11. # Setup callbacks to be notified on disconnects and reconnects
    12. options["disconnected_cb"] = disconnected_cb
    13. options["reconnected_cb"] = reconnected_cb
    14. # Setup callbacks to be notified when there is an error
    15. # or connection is closed.
    16. options["error_cb"] = error_cb
    17. options["closed_cb"] = closed_cb
    18. await nc.connect(**options)

    {% endtab %}

    {% tab title=”Ruby” %}

    1. # There is not a single listener for connection events in the Ruby NATS Client.
    2. # Instead, you can set individual event handlers using:
    3. NATS.on_disconnect do
    4. end
    5. NATS.on_reconnect do
    6. end
    7. NATS.on_close do
    8. end
    9. NATS.on_error do
    10. end

    {% endtab %}

    1. static void
    2. disconnectedCB(natsConnection *conn, void *closure)
    3. {
    4. // Do something
    5. printf("Connection disconnected\n");
    6. }
    7. static void
    8. reconnectedCB(natsConnection *conn, void *closure)
    9. {
    10. printf("Connection reconnected\n");
    11. static void
    12. closedCB(natsConnection *conn, void *closure)
    13. {
    14. // Do something
    15. printf("Connection closed\n");
    16. }
    17. (...)
    18. natsConnection *conn = NULL;
    19. natsOptions *opts = NULL;
    20. natsStatus s = NATS_OK;
    21. s = natsOptions_Create(&opts);
    22. if (s == NATS_OK)
    23. s = natsOptions_SetDisconnectedCB(opts, disconnectedCB, NULL);
    24. if (s == NATS_OK)
    25. s = natsOptions_SetReconnectedCB(opts, reconnectedCB, NULL);
    26. if (s == NATS_OK)
    27. s = natsOptions_SetClosedCB(opts, closedCB, NULL);
    28. if (s == NATS_OK)
    29. s = natsConnection_Connect(&conn, opts);
    30. (...)
    31. // Destroy objects that were created
    32. natsConnection_Destroy(conn);
    33. natsOptions_Destroy(opts);

    {% endtab %} {% endtabs %}

    When working with a cluster, servers may be added or changed. Some of the clients allow you to listen for this notification:

    {% tabs %} {% tab title=”Go” %}

    {% endtab %}

    {% tab title=”Java” %}

    1. class ServersAddedListener implements ConnectionListener {
    2. public void connectionEvent(Connection nc, Events event) {
    3. if (event == Events.DISCOVERED_SERVERS) {
    4. for (String server : nc.getServers()) {
    5. System.out.println("Known server: "+server);
    6. }
    7. }
    8. }
    9. }
    10. public class ListenForNewServers {
    11. public static void main(String[] args) {
    12. try {
    13. Options options = new Options.Builder().
    14. server("nats://demo.nats.io:4222").
    15. connectionListener(new ServersAddedListener()). // Set the listener
    16. build();
    17. Connection nc = Nats.connect(options);
    18. // Do something with the connection
    19. nc.close();
    20. } catch (Exception e) {
    21. e.printStackTrace();
    22. }
    23. }
    24. }

    {% endtab %}

    {% tab title=”JavaScript” %}

    1. const nc = await connect({ servers: ["demo.nats.io:4222"] });
    2. (async () => {
    3. for await (const s of nc.status()) {
    4. switch (s.type) {
    5. case Status.Update:
    6. t.log(`servers added - ${s.data.added}`);
    7. t.log(`servers deleted - ${s.data.deleted}`);
    8. break;
    9. default:
    10. }
    11. }
    12. })().then();

    {% endtab %}

    {% tab title=”Python” %}

    1. # Asyncio NATS client does not support discovered servers handler right now

    {% endtab %}

    {% tab title=”Ruby” %}

    1. # The Ruby NATS client does not support discovered servers handler right now

    {% endtab %}

    1. static void
    2. discoveredServersCB(natsConnection *conn, void *closure)
    3. {
    4. natsStatus s = NATS_OK;
    5. char **servers = NULL;
    6. int count = 0;
    7. s = natsConnection_GetDiscoveredServers(conn, &servers, &count);
    8. if (s == NATS_OK)
    9. {
    10. int i;
    11. // Do something...
    12. for (i=0; i<count; i++)
    13. printf("Discovered server: %s\n", servers[i]);
    14. // Free allocated memory
    15. for (i=0; i<count; i++)
    16. free(servers[i]);
    17. free(servers);
    18. }
    19. }
    20. (...)
    21. natsConnection *conn = NULL;
    22. natsOptions *opts = NULL;
    23. natsStatus s = NATS_OK;
    24. s = natsOptions_Create(&opts);
    25. if (s == NATS_OK)
    26. s = natsOptions_SetDiscoveredServersCB(opts, discoveredServersCB, NULL);
    27. if (s == NATS_OK)
    28. s = natsConnection_Connect(&conn, opts);
    29. // Destroy objects that were created
    30. natsConnection_Destroy(conn);
    31. natsOptions_Destroy(opts);

    {% endtab %} {% endtabs %}

    Listen for Errors

    The client library may separate server-to-client errors from events. Many server events are not handled by application code and result in the connection being closed. Listening for the errors can be very useful for debugging problems.

    {% tabs %} {% tab title=”Go” %}

    {% endtab %}

    {% tab title=”Java” %}

    1. class MyErrorListener implements ErrorListener {
    2. public void errorOccurred(Connection conn, String error)
    3. {
    4. System.out.println("The server notificed the client with: "+error);
    5. }
    6. public void exceptionOccurred(Connection conn, Exception exp) {
    7. System.out.println("The connection handled an exception: "+exp.getLocalizedMessage());
    8. }
    9. public void slowConsumerDetected(Connection conn, Consumer consumer) {
    10. System.out.println("A slow consumer was detected.");
    11. }
    12. }
    13. public class SetErrorListener {
    14. public static void main(String[] args) {
    15. try {
    16. Options options = new Options.Builder().
    17. server("nats://demo.nats.io:4222").
    18. errorListener(new MyErrorListener()). // Set the listener
    19. build();
    20. Connection nc = Nats.connect(options);
    21. // Do something with the connection
    22. nc.close();
    23. } catch (Exception e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. }

    {% endtab %}

    {% tab title=”JavaScript” %}

    1. const nc = await connect({ servers: ["demo.nats.io"] });
    2. // if the client gets closed with an error you can trap that
    3. // condition in the closed handler like this:
    4. nc.closed().then((err) => {
    5. if (err) {
    6. t.log(`the connection closed with an error ${err.message}`);
    7. } else {
    8. t.log(`the connection closed.`);
    9. }
    10. });
    11. // if you have a status listener, it will too get notified
    12. (async () => {
    13. for await (const s of nc.status()) {
    14. switch (s.type) {
    15. case Status.Error:
    16. // typically if you get this the nats connection will close
    17. t.log("client got an async error from the server");
    18. break;
    19. default:
    20. t.log(`got an unknown status ${s.type}`);
    21. }
    22. }
    23. })().then();

    {% endtab %}

    {% tab title=”Python” %}

    1. nc = NATS()
    2. async def error_cb(e):
    3. print("Error: ", e)
    4. await nc.connect(
    5. servers=["nats://demo.nats.io:4222"],
    6. reconnect_time_wait=10,
    7. error_cb=error_cb,
    8. )
    9. # Do something with the connection.

    {% endtab %}

    {% tab title=”Ruby” %}

    1. require 'nats/client'
    2. NATS.start(servers:["nats://demo.nats.io:4222"]) do |nc|
    3. nc.on_error do |e|
    4. puts "Error: #{e}"
    5. end
    6. nc.close
    7. end

    {% endtab %}

    1. static void
    2. errorCB(natsConnection *conn, natsSubscription *sub, natsStatus s, void *closure)
    3. {
    4. // Do something
    5. printf("Error: %d - %s\n", s, natsStatus_GetText(s));
    6. }
    7. (...)
    8. natsConnection *conn = NULL;
    9. natsOptions *opts = NULL;
    10. natsStatus s = NATS_OK;
    11. s = natsOptions_Create(&opts);
    12. if (s == NATS_OK)
    13. s = natsOptions_SetErrorHandler(opts, errorCB, NULL);
    14. if (s == NATS_OK)
    15. s = natsConnection_Connect(&conn, opts);
    16. (...)
    17. // Destroy objects that were created
    18. natsConnection_Destroy(conn);
    19. natsOptions_Destroy(opts);

    {% endtab %} {% endtabs %}