Consumers

    The and DISPATCH Consumers are pull-based, meaning the services consuming data from them have to ask the system for the next available message. This means you can easily scale your services up by adding more workers and the messages will get spread across the workers based on their availability.

    Pull-based Consumers are created the same as push-based Consumers, you just don’t specify a delivery target.

    Output

    1. No Consumers defined

    We have no Consumers, lets add the NEW one:

    I supply the --sample options on the CLI as this is not prompted for at present, everything else is prompted. The help in the CLI explains each:

    1. nats con add --sample 100

    Output

    1. ? Select a Stream ORDERS
    2. ? Consumer name NEW
    3. ? Delivery target
    4. ? Start policy (all, last, 1h, msg sequence) all
    5. ? Filter Stream by subject (blank for all) ORDERS.received
    6. ? Maximum Allowed Deliveries 20
    7. Information for Consumer ORDERS > NEW
    8. Configuration:
    9. Durable Name: NEW
    10. Pull Mode: true
    11. Subject: ORDERS.received
    12. Deliver All: true
    13. Deliver Last: false
    14. Ack Policy: explicit
    15. Ack Wait: 30s
    16. Replay Policy: instant
    17. Maximum Deliveries: 20
    18. Sampling Rate: 100
    19. State:
    20. Last Delivered Message: Consumer sequence: 1 Stream sequence: 1
    21. Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
    22. Pending Messages: 0
    23. Redelivered Messages: 0

    This is a pull-based Consumer (empty Delivery Target), it gets messages from the first available message and requires specific acknowledgement of each and every message.

    It only received messages that originally entered the Stream on ORDERS.received. Remember the Stream subscribes to ORDERS.*, this lets us select a subset of messages from the Stream.

    A Maximum Delivery limit of 20 is set, this means if the message is not acknowledged it will be retried but only up to this maximum total deliveries.

    Again this can all be done in a single CLI call, lets make the DISPATCH Consumer:

    1. nats con add ORDERS DISPATCH --filter ORDERS.processed --ack explicit --pull --deliver all --sample 100 --max-deliver 20

    Additionally, one can store the configuration in a JSON file, the format of this is the same as $ nats con info ORDERS DISPATCH -j | jq .config:

    1. nats con add ORDERS MONITOR --config monitor.json

    Creating Push-Based Consumers

      Ouptput

      1. ? Select a Stream ORDERS
      2. ? Delivery target monitor.ORDERS
      3. ? Start policy (all, last, 1h, msg sequence) last
      4. ? Acknowledgement policy none
      5. ? Replay policy instant
      6. ? Filter Stream by subject (blank for all)
      7. ? Maximum Allowed Deliveries -1
      8. Information for Consumer ORDERS > MONITOR
      9. Configuration:
      10. Durable Name: MONITOR
      11. Delivery Subject: monitor.ORDERS
      12. Deliver All: false
      13. Deliver Last: true
      14. Ack Policy: none
      15. Replay Policy: instant
      16. State:
      17. Last Delivered Message: Consumer sequence: 1 Stream sequence: 3
      18. Acknowledgment floor: Consumer sequence: 0 Stream sequence: 2
      19. Pending Messages: 0
      20. Redelivered Messages: 0

      Again you can do this with a single non-interactive command:

      Additionally one can store the configuration in a JSON file, the format of this is the same as $ nats con info ORDERS MONITOR -j | jq .config:

      1. nats con add ORDERS --config monitor.json

      You can get a quick list of all the Consumers for a specific Stream:

      1. nats con ls ORDERS

      Output

      1. Consumers for Stream ORDERS:
      2. DISPATCH
      3. MONITOR
      4. NEW

      Querying

      All details for a Consumer can be queried, lets first look at a pull-based Consumer:

      1. $ nats con info ORDERS DISPATCH
      2. Information for Consumer ORDERS > DISPATCH
      3. Configuration:
      4. Pull Mode: true
      5. Subject: ORDERS.processed
      6. Deliver All: true
      7. Deliver Last: false
      8. Ack Wait: 30s
      9. Replay Policy: instant
      10. Sampling Rate: 100
      11. State:
      12. Last Delivered Message: Consumer sequence: 1 Stream sequence: 1
      13. Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
      14. Pending Messages: 0
      15. Redelivered Messages: 0

      More details about the State section will be shown later when discussing the ack models in depth.

      Pull-based Consumers require you to specifically ask for messages and ack them, typically you would do this with the client library Request() feature, but the nats utility has a helper:

      First, we ensure we have a message:

      1. nats pub ORDERS.processed "order 1"
      2. nats pub ORDERS.processed "order 2"
      3. nats pub ORDERS.processed "order 3"

      We can now read them using nats:

      1. nats con next ORDERS DISPATCH

      Output

      1. --- received on ORDERS.processed
      2. order 1
      3. Acknowledged message

      Output

      1. --- received on ORDERS.processed
      2. order 2
      3. Acknowledged message

      You can prevent ACKs by supplying --no-ack.

      To do this from code you’d send a Request() to $JS.API.CONSUMER.MSG.NEXT.ORDERS.DISPATCH:

      1. nats req '$JS.API.CONSUMER.MSG.NEXT.ORDERS.DISPATCH' ''

      Output

      1. Published [$JS.API.CONSUMER.MSG.NEXT.ORDERS.DISPATCH] : ''
      2. Received [ORDERS.processed] : 'order 3'

      Here nats req cannot ack, but in your code you’d respond to the received message with a nil payload as an Ack to JetStream.

      Consuming Push-Based Consumers

      Push-based Consumers will publish messages to a subject and anyone who subscribes to the subject will get them, they support different Acknowledgement models covered later, but here on the MONITOR Consumer we have no Acknowledgement.

      1. nats con info ORDERS MONITOR

      Output extract

      1. ...
      2. Delivery Subject: monitor.ORDERS
      3. ...

      The Consumer is publishing to that subject, so let’s listen there:

      1. nats sub monitor.ORDERS

      Output

      1. Listening on [monitor.ORDERS]
      2. [#3] Received on [ORDERS.processed]: 'order 3'

      Note the subject here of the received message is reported as ORDERS.processed this helps you distinguish what you’re seeing in a Stream covering a wildcard, or multiple subjects, subject space.

      This Consumer needs no ack, so any new message into the ORDERS system will show up here in real-time.