Model Deep Dive

    Streams store data on disk, but we cannot store all data forever so we need ways to control their size automatically.

    There are 3 features that come into play when Streams decide how long they store data.

    The describes based on what criteria a set will evict messages from its storage:

    In all Retention Policies the basic limits apply as upper bounds, these are MaxMsgs for how many messages are kept in total, MaxBytes for how big the set can be in total and MaxAge for what is the oldest message that will be kept. These are the only limits in play with LimitsPolicy retention.

    One can then define additional ways a message may be removed from the Stream earlier than these limits. In WorkQueuePolicy the messages will be removed as soon as any Consumer received an Acknowledgement. In InterestPolicy messages will be removed as soon as there are no more Consumers.

    In both WorkQueuePolicy and InterestPolicy the age, size and count limits will still apply as upper bounds.

    A final control is the Maximum Size any single message may have. NATS have it’s own limit for maximum size (1 MiB by default), but you can say a Stream will only accept messages up to 1024 bytes using MaxMsgSize.

    The Discard Policy sets how messages are discarded when limits set by LimitsPolicy are reached. The DiscardOld option removes old messages making space for new, while DiscardNew refuses any new messages.

    The WorkQueuePolicy mode is a specialized mode where a message, once consumed and acknowledged, is discarded from the Stream. In this mode, there are a few limits on consumers. Inherently it’s about 1 message to one consumer, this means you cannot have overlapping consumers defined on the Stream - needs unique filter subjects.

    Message Deduplication

    JetStream support idempotent message writes by ignoring duplicate messages as indicated by the Nats-Msg-Id header.

    Here we set a Nats-Msg-Id:1 header which tells JetStream to ensure we do not have duplicates of this message - we only consult the message ID not the body.

    1. $ nats str info ORDERS
    2. ....
    3. State:
    4. Messages: 1
    5. Bytes: 67 B

    The default window to track duplicates in is 2 minutes, this can be set on the command line using --dupe-window when creating a stream, though we would caution against large windows.

    Acknowledgement Models

    Streams support acknowledging receiving a message, if you send a Request() to a subject covered by the configuration of the Stream the service will reply to you once it stored the message. If you just publish, it will not. A Stream can be set to disable Acknowledgements by setting NoAck to true in it’s configuration.

    Consumers have 3 acknowledgement modes:

    To understand how Consumers track messages we will start with a clean ORDERS Stream and DISPATCH Consumer.

    1. $ nats str info ORDERS
    2. ...
    3. Statistics:
    4. Messages: 0
    5. Bytes: 0 B
    6. FirstSeq: 0
    7. LastSeq: 0
    8. Active Consumers: 1

    The Set is entirely empty

    1. $ nats con info ORDERS DISPATCH
    2. ...
    3. State:
    4. Last Delivered Message: Consumer sequence: 1 Stream sequence: 1
    5. Acknowledgment floor: Consumer sequence: 0 Stream sequence: 0
    6. Pending Messages: 0
    7. Redelivered Messages: 0

    The Consumer has no messages outstanding and has never had any (Consumer sequence is 1).

    We publish one message to the Stream and see that the Stream received it:

    1. $ nats pub ORDERS.processed "order 4"
    2. Published 7 bytes to ORDERS.processed
    3. $ nats str info ORDERS
    4. ...
    5. Statistics:
    6. Messages: 1
    7. Bytes: 53 B
    8. FirstSeq: 1
    9. LastSeq: 1
    10. Active Consumers: 1

    As the Consumer is pull-based, we can fetch the message, ack it, and check the Consumer state:

    1. $ nats con next ORDERS DISPATCH
    2. --- received on ORDERS.processed
    3. order 4
    4. Acknowledged message
    5. $ nats con info ORDERS DISPATCH
    6. ...
    7. State:
    8. Last Delivered Message: Consumer sequence: 2 Stream sequence: 2
    9. Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
    10. Pending Messages: 0
    11. Redelivered Messages: 0

    The message got delivered and acknowledged - Acknowledgement floor is 1 and 1, the sequence of the Consumer is 2 which means its had only the one message through and got acked. Since it was acked, nothing is pending or redelivering.

    We’ll publish another message, fetch it but not Ack it this time and see the status:

    1. $ nats pub ORDERS.processed "order 5"
    2. Published 7 bytes to ORDERS.processed
    3. --- received on ORDERS.processed
    4. order 5
    5. $ nats con info ORDERS DISPATCH
    6. State:
    7. Last Delivered Message: Consumer sequence: 3 Stream sequence: 3
    8. Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
    9. Pending Messages: 1
    10. Redelivered Messages: 0

    Now we can see the Consumer has processed 2 messages (obs sequence is 3, next message will be 3) but the Ack floor is still 1 - thus 1 message is pending acknowledgement. Indeed this is confirmed in the Pending messages.

    1. $ nats con next ORDERS DISPATCH --no-ack
    2. --- received on ORDERS.processed
    3. order 5
    4. $ nats con info ORDERS DISPATCH
    5. State:
    6. Last Delivered Message: Consumer sequence: 4 Stream sequence: 3
    7. Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
    8. Pending Messages: 1
    9. Redelivered Messages: 1

    The Consumer sequence increases - each delivery attempt increases the sequence - and our redelivered count also goes up.

    Finally, if I then fetch it again and ack it this time:

    1. $ nats con next ORDERS DISPATCH
    2. --- received on ORDERS.processed
    3. order 5
    4. Acknowledged message
    5. $ nats con info ORDERS DISPATCH
    6. State:
    7. Last Delivered Message: Consumer sequence: 5 Stream sequence: 3
    8. Acknowledgment floor: Consumer sequence: 1 Stream sequence: 1
    9. Pending Messages: 0
    10. Redelivered Messages: 0

    Having now Acked the message there are no more pending.

    Additionally, there are a few types of acknowledgements:

    So far all of the examples were the AckAck type of acknowledgement, by replying to the Ack with the body as indicated in Bytes you can pick what mode of acknowledgement you want.

    All of these acknowledgement modes, except AckNext, support double acknowledgement - if you set a reply subject when acknowledging the server will in turn acknowledge having received your ACK.

    The +NXT acknowledgement can have a few formats: +NXT 10 requests 10 messages and +NXT {"no_wait": true} which is the same data that can be sent in a Pull Request.

    JetStream supports Exactly Once delivery by combining Message Deduplication and double acks.

    On the publishing side you can avoid duplicate message ingestion using the Message Deduplication feature.

    Consumers can be 100% sure a message was correctly processed by requesting the server Acknowledge having received your acknowledgement by setting a reply subject on the Ack. If you receive this response you will never receive that message again.

    Consumer Starting Position

    When setting up a Consumer you can decide where to start, the system supports the following for the DeliverPolicy:

    Regardless of what mode you set, this is only the starting point. Once started it will always give you what you have not seen or acknowledged. So this is merely how it picks the very first message.

    Let’s look at each of these, first we make a new Stream ORDERS and add 100 messages to it.

    Now create a DeliverAll pull-based Consumer:

    Now create a DeliverLast pull-based Consumer:

    1. $ nats con add ORDERS LAST --pull --filter ORDERS.processed --ack none --replay instant --deliver last
    2. $ nats con next ORDERS LAST
    3. --- received on ORDERS.processed
    4. order 100
    5. Acknowledged message

    Now create a MsgSetSeq pull-based Consumer:

    1. $ nats con add ORDERS TEN --pull --filter ORDERS.processed --ack none --replay instant --deliver 10
    2. $ nats con next ORDERS TEN
    3. --- received on ORDERS.processed
    4. order 10
    5. Acknowledged message

    And finally a time-based Consumer. Let’s add some messages a minute apart:

    1. $ nats str purge ORDERS
    2. $ for i in 1 2 3
    3. do
    4. nats pub ORDERS.processed "order ${i}"
    5. sleep 60
    6. done

    Then create a Consumer that starts 2 minutes ago:

    1. $ nats con add ORDERS 2MIN --pull --filter ORDERS.processed --ack none --replay instant --deliver 2m
    2. $ nats con next ORDERS 2MIN
    3. --- received on ORDERS.processed
    4. order 2
    5. Acknowledged message

    Ephemeral Consumers

    So far, all the Consumers you have seen were Durable, meaning they exist even after you disconnect from JetStream. In our Orders scenario, though the MONITOR a Consumer could very well be a short-lived thing there just while an operator is debugging the system, there is no need to remember the last seen position if all you are doing is wanting to observe the real-time state.

    In this case, we can make an Ephemeral Consumer by first subscribing to the delivery subject, then creating a durable and giving it no durable name. An Ephemeral Consumer exists as long as any subscription is active on its delivery subject. It is automatically be removed, after a short grace period to handle restarts, when there are no subscribers.

    Ephemeral Consumers can only be push-based.

    Terminal 1:

    1. $ nats sub my.monitor
    1. $ nats con add ORDERS --filter '' --ack none --target 'my.monitor' --deliver last --replay instant --ephemeral

    The --ephemeral switch tells the system to make an Ephemeral Consumer.

    Typically what you want is if a new Consumer is made the selected messages are delivered to you as quickly as possible. You might want to replay messages at the rate they arrived though, meaning if messages first arrived 1 minute apart and you make a new Consumer it will get the messages a minute apart.

    This is useful in load testing scenarios etc. This is called the ReplayPolicy and have values of ReplayInstant and ReplayOriginal.

    You can only set ReplayPolicy on push-based Consumers.

    1. $ nats con add ORDERS REPLAY --target out.original --filter ORDERS.processed --ack none --deliver all --sample 100 --replay original
    2. ...
    3. Replay Policy: original
    4. ...

    Now let’s publish messages into the Set 10 seconds apart:

    1. $ for i in 1 2 3 <15:15:35
    2. do
    3. nats pub ORDERS.processed "order ${i}"
    4. sleep 10
    5. done
    6. Published [ORDERS.processed] : 'order 1'
    7. Published [ORDERS.processed] : 'order 3'

    And when we consume them they will come to us 10 seconds apart:

    Stream Templates

    When you have many similar streams it can be helpful to auto-create them, let’s say you have a service by client and they are on subjects CLIENT.*, you can construct a template that will auto-generate streams for any matching traffic.

    1. $ nats str template add CLIENTS --subjects "CLIENT.*" --ack --max-msgs=-1 --max-bytes=-1 --max-age=1y --storage file --retention limits --max-msg-size 2048 --max-streams 1024 --discard old
    2. Stream Template CLIENTS was created
    3. Information for Stream Template CLIENTS
    4. Configuration:
    5. Subjects: CLIENT.*
    6. Acknowledgements: true
    7. Retention: File - Limits
    8. Replicas: 1
    9. Maximum Messages: -1
    10. Maximum Bytes: -1
    11. Maximum Age: 8760h0m0s
    12. Maximum Message Size: 2048
    13. Maximum Consumers: -1
    14. Maximum Streams: 1024
    15. Managed Streams:
    16. No Streams have been defined by this template

    You can see no streams currently exist, let’s publish some data:

    1. $ nats pub CLIENT.acme hello

    And we’ll have 1 new Stream:

    1. $ nats str ls
    2. Streams:
    3. CLIENTS_acme

    When the template is deleted all the streams it created will be deleted too.

    Ack Sampling

    In the earlier sections we saw that samples are being sent to a monitoring system. Let’s look at that in depth; how the monitoring system works and what it contains.

    As messages pass through a Consumer you’d be interested in knowing how many are being redelivered and how many times but also how long it takes for messages to be acknowledged.

    Consumers can sample Ack’ed messages for you and publish samples so your monitoring system can observe the health of a Consumer. We will add support for this to .

    You can configure a Consumer for sampling bypassing the --sample 80 option to nats consumer add, this tells the system to sample 80% of Acknowledgements.

    When viewing info of a Consumer you can tell if it’s sampled or not:

    1. $ nats con info ORDERS NEW
    2. ...
    3. Sampling Rate: 100
    4. ...

    Consuming

    Samples are published to $JS.EVENT.METRIC.CONSUMER_ACK.<stream>.<consumer> in JSON format containing api.ConsumerAckMetric. Use the nats con events command to view samples:

    1. $ nats con events ORDERS NEW
    2. Listening for Advisories on $JS.EVENT.ADVISORY.*.ORDERS.NEW
    3. Listening for Metrics on $JS.EVENT.METRIC.*.ORDERS.NEW
    4. 15:08:31] [Ph0fsiOKRg1TS0c2k0mMz2] Acknowledgement Sample
    5. Consumer: ORDERS > NEW
    6. Stream Sequence: 40
    7. Consumer Sequence: 161
    8. Deliveries: 1
    9. Delay: 1.009ms
    1. $ nats con events ORDERS NEW --json
    2. {
    3. "stream": "ORDERS",
    4. "consumer": "NEW",
    5. "consumer_seq": 155,
    6. "stream_seq": 143,
    7. "ack_time": 5387000,
    8. "delivered": 1
    9. }
    10. {
    11. "stream": "ORDERS",
    12. "consumer": "NEW",
    13. "consumer_seq": 156,
    14. "stream_seq": 144,
    15. "ack_time": 5807800,
    16. "delivered": 1
    17. }

    JetStream file storage is very efficient, storing as little extra information about the message as possible.

    NOTE: This might change once clustering is supported.

    We do store some message data with each message, namely:

    • Message headers
    • The subject it was received on
    • The time it was received
    • The message payload
    • A hash of the message
    • The message sequence
    • A few other bits like the length of the subject and the length of headers

    Without any headers the size is:

    1. length of the message record (4bytes) + seq(8) + ts(8) + subj_len(2) + subj + msg + hash(8)

    A 5 byte hello message without headers will take 39 bytes.

    With headers: