Wildcard Subscriptions

    For example, you can subscribe using and then act based on the actual subject.

    Go

    Java

    1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
    2. // Use a latch to wait for 2 messages to arrive
    3. CountDownLatch latch = new CountDownLatch(2);
    4. // Create a dispatcher and inline message handler
    5. Dispatcher d = nc.createDispatcher((msg) -> {
    6. String subject = msg.getSubject();
    7. String str = new String(msg.getData(), StandardCharsets.UTF_8);
    8. System.out.println(subject + ": " + str);
    9. latch.countDown();
    10. });
    11. // Subscribe
    12. d.subscribe("time.*.east");
    13. // Wait for messages to come in
    14. latch.await();
    15. // Close the connection
    16. nc.close();

    JavaScript

    1. let nc = NATS.connect({
    2. url: "nats://demo.nats.io:4222"});
    3. nc.subscribe('time.us.*', (msg, reply, subject) => {
    4. // converting timezones correctly in node requires a library
    5. // this doesn't take into account *many* things.
    6. let time = "";
    7. switch (subject) {
    8. case 'time.us.east':
    9. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
    10. break;
    11. case 'time.us.central':
    12. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
    13. break;
    14. case 'time.us.mountain':
    15. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
    16. break;
    17. case 'time.us.west':
    18. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
    19. break;
    20. default:
    21. time = "I don't know what you are talking about Willis";
    22. }
    23. t.log(subject, time);
    24. });

    Python

    1. nc = NATS()
    2. await nc.connect(servers=["nats://demo.nats.io:4222"])
    3. # Use queue to wait for 2 messages to arrive
    4. queue = asyncio.Queue()
    5. async def cb(msg):
    6. await queue.put_nowait(msg)
    7. await nc.subscribe("time.*.east", cb=cb)
    8. # Send 2 messages and wait for them to come in
    9. await nc.publish("time.A.east", b'A')
    10. await nc.publish("time.B.east", b'B')
    11. msg_A = await queue.get()
    12. msg_B = await queue.get()
    13. print("Msg A:", msg_A)
    14. print("Msg B:", msg_B)

    Ruby

    1. require 'nats/client'
    2. require 'fiber'
    3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
    4. Fiber.new do
    5. f = Fiber.current
    6. nc.subscribe("time.*.east") do |msg, reply|
    7. f.resume Time.now
    8. end
    9. nc.publish("time.A.east", "A")
    10. nc.publish("time.B.east", "B")
    11. # Use the response
    12. msg_A = Fiber.yield
    13. puts "Msg A: #{msg_A}"
    14. msg_B = Fiber.yield
    15. puts "Msg B: #{msg_B}"
    16. end.resume
    17. end

    TypeScript

    1. await nc.subscribe('time.us.*', (err, msg) => {
    2. // converting timezones correctly in node requires a library
    3. // this doesn't take into account *many* things.
    4. let time = "";
    5. switch (msg.subject) {
    6. case 'time.us.east':
    7. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
    8. break;
    9. case 'time.us.central':
    10. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
    11. break;
    12. case 'time.us.mountain':
    13. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
    14. break;
    15. case 'time.us.west':
    16. break;
    17. time = "I don't know what you are talking about Willis";
    18. }
    19. console.log(msg.subject, time);
    20. });

    or do something similar with >:

    Go

    1. nc, err := nats.Connect("demo.nats.io")
    2. if err != nil {
    3. log.Fatal(err)
    4. }
    5. defer nc.Close()
    6. // Use a WaitGroup to wait for 4 messages to arrive
    7. wg := sync.WaitGroup{}
    8. wg.Add(4)
    9. // Subscribe
    10. if _, err := nc.Subscribe("time.>", func(m *nats.Msg) {
    11. log.Printf("%s: %s", m.Subject, m.Data)
    12. wg.Done()
    13. }); err != nil {
    14. log.Fatal(err)
    15. }
    16. // Wait for the 4 messages to come in
    17. wg.Wait()
    18. // Close the connection
    19. nc.Close()

    Java

    1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
    2. // Use a latch to wait for 4 messages to arrive
    3. CountDownLatch latch = new CountDownLatch(4);
    4. // Create a dispatcher and inline message handler
    5. Dispatcher d = nc.createDispatcher((msg) -> {
    6. String subject = msg.getSubject();
    7. String str = new String(msg.getData(), StandardCharsets.UTF_8);
    8. System.out.println(subject + ": " + str);
    9. latch.countDown();
    10. });
    11. // Subscribe
    12. d.subscribe("time.>");
    13. // Wait for messages to come in
    14. latch.await();
    15. // Close the connection
    16. nc.close();

    JavaScript

    1. let nc = NATS.connect({
    2. url: "nats://demo.nats.io:4222"});
    3. nc.subscribe('time.>', (msg, reply, subject) => {
    4. // converting timezones correctly in node requires a library
    5. // this doesn't take into account *many* things.
    6. let time = "";
    7. switch (subject) {
    8. case 'time.us.east':
    9. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
    10. break;
    11. case 'time.us.central':
    12. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
    13. break;
    14. case 'time.us.mountain':
    15. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
    16. break;
    17. case 'time.us.west':
    18. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
    19. break;
    20. default:
    21. time = "I don't know what you are talking about Willis";
    22. }
    23. t.log(subject, time);
    24. });

    Python

    1. nc = NATS()
    2. await nc.connect(servers=["nats://demo.nats.io:4222"])
    3. # Use queue to wait for 4 messages to arrive
    4. queue = asyncio.Queue()
    5. async def cb(msg):
    6. await queue.put(msg)
    7. await nc.subscribe("time.>", cb=cb)
    8. # Send 2 messages and wait for them to come in
    9. await nc.publish("time.A.east", b'A')
    10. await nc.publish("time.B.east", b'B')
    11. await nc.publish("time.C.west", b'C')
    12. await nc.publish("time.D.west", b'D')
    13. for i in range(0, 4):
    14. msg = await queue.get()
    15. print("Msg:", msg)
    16. await nc.close()

    Ruby

    1. require 'nats/client'
    2. require 'fiber'
    3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
    4. Fiber.new do
    5. f = Fiber.current
    6. f.resume Time.now.to_f
    7. end
    8. nc.publish("time.A.east", "A")
    9. nc.publish("time.B.east", "B")
    10. nc.publish("time.C.west", "C")
    11. nc.publish("time.D.west", "D")
    12. # Use the response
    13. 4.times do
    14. msg = Fiber.yield
    15. puts "Msg: #{msg}"
    16. end
    17. end.resume
    18. end

    TypeScript

    1. static void
    2. onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
    3. {
    4. printf("Received msg: %s - %.*s\n",
    5. natsMsg_GetSubject(msg),
    6. natsMsg_GetDataLength(msg),
    7. natsMsg_GetData(msg));
    8. // Need to destroy the message!
    9. natsMsg_Destroy(msg);
    10. }
    11. (...)
    12. natsConnection *conn = NULL;
    13. natsSubscription *sub = NULL;
    14. natsStatus s;
    15. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
    16. if (s == NATS_OK)
    17. s = natsConnection_Subscribe(&sub, conn, "time.>", onMsg, NULL);
    18. (...)
    19. // Destroy objects that were created
    20. natsSubscription_Destroy(sub);
    21. natsConnection_Destroy(conn);

    The following example can be used to test these two subscribers. The * subscriber should receive at most 2 messages, while the > subscriber receives 4. More importantly the time.*.east subscriber won’t receive on time.us.east.atlanta because that won’t match.

    Go

    1. nc, err := nats.Connect("demo.nats.io")
    2. if err != nil {
    3. log.Fatal(err)
    4. }
    5. defer nc.Close()
    6. zoneID, err := time.LoadLocation("America/New_York")
    7. if err != nil {
    8. log.Fatal(err)
    9. }
    10. now := time.Now()
    11. zoneDateTime := now.In(zoneID)
    12. formatted := zoneDateTime.String()
    13. nc.Publish("time.us.east", []byte(formatted))
    14. nc.Publish("time.us.east.atlanta", []byte(formatted))
    15. zoneID, err = time.LoadLocation("Europe/Warsaw")
    16. if err != nil {
    17. log.Fatal(err)
    18. }
    19. zoneDateTime = now.In(zoneID)
    20. formatted = zoneDateTime.String()
    21. nc.Publish("time.eu.east", []byte(formatted))
    22. nc.Publish("time.eu.east.warsaw", []byte(formatted))

    Java

    1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
    2. ZoneId zoneId = ZoneId.of("America/New_York");
    3. ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
    4. String formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
    5. nc.publish("time.us.east", formatted.getBytes(StandardCharsets.UTF_8));
    6. nc.publish("time.us.east.atlanta", formatted.getBytes(StandardCharsets.UTF_8));
    7. zoneId = ZoneId.of("Europe/Warsaw");
    8. zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
    9. formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
    10. nc.publish("time.eu.east", formatted.getBytes(StandardCharsets.UTF_8));
    11. nc.publish("time.eu.east.warsaw", formatted.getBytes(StandardCharsets.UTF_8));
    12. nc.flush(Duration.ZERO);
    13. nc.close();

    JavaScript

    1. nc.publish('time.us.east');
    2. nc.publish('time.us.central');
    3. nc.publish('time.us.mountain');
    4. nc.publish('time.us.west');

    Python

    1. nc = NATS()
    2. await nc.connect(servers=["nats://demo.nats.io:4222"])
    3. await nc.publish("time.us.east", b'...')
    4. await nc.publish("time.us.east.atlanta", b'...')
    5. await nc.publish("time.eu.east", b'...')
    6. await nc.publish("time.eu.east.warsaw", b'...')
    7. await nc.close()

    Ruby

    TypeScript

    1. nc.publish('time.us.east');
    2. nc.publish('time.us.central');
    3. nc.publish('time.us.mountain');