Message retention and expiry

因此,在一个没有创建任何订阅的主题上不会保留任何消息(默认情况下)。

(注意,不再被存储的消息不需要立即删除,事实上可能仍然可以被访问,直到下一个 ledger 滚动创建。 因为客户端无法预测 ledger 什么时候发生滚动,所以依赖滚动发生在合适的时间是不明智的。)

在 Pulsar 中,你有两种方式在命名空间的级别去修改这种行为:

  • 你可以通过设置消息保留策略持久化存储不在 backlog 内的消息(因为他们已经在每个现有的订阅上被确认,或者并没有被订阅)。
  • 可以通过设置 (TTL),设置消息在指定的时间内不被确认的话,自动确认。

你可以通过 Pulsar admin 接口 在命名空间(租户、集群或集群) 级别管理消息保留策略和TTL。

By default, when a Pulsar message arrives at a broker, the message is stored until it has been acknowledged on all subscriptions, at which point it is marked for deletion. You can override this behavior and retain messages that have already been acknowledged on all subscriptions by setting a retention policy for all topics in a given namespace. Retention is based on both a size limit and a time limit.

Retention policies are useful when you use the Reader interface. The Reader interface does not use acknowledgements, and messages do not exist within backlogs. It is required to configure retention for Reader-only use cases.

When you set a retention policy on topics in a namespace, you must set both a size limit and a time limit. You can refer to the following table to set retention policies in pulsar-admin and Java.

The retention settings apply to all messages on topics that do not have any subscriptions, or to messages that have been acknowledged by all subscriptions. The retention policy settings do not affect unacknowledged messages on topics with subscriptions. The unacknowledged messages are controlled by the backlog quota.

When a retention limit on a topic is exceeded, the oldest message is marked for deletion until the set of retained messages falls within the specified limits again.

You can set message retention at instance level with the following two parameters: defaultRetentionTimeInMinutes and defaultRetentionSizeInMB. Both parameters are set to 0 by default.

For more information of the two parameters, refer to the broker.conf configuration file.

设置保留策略

You can set a retention policy for a namespace by specifying the namespace, a size limit and a time limit in pulsar-admin, REST API and Java.

pulsar-admin

REST API

Java

You can use the set-retention subcommand and specify a namespace, a size limit using the -s/--size flag, and a time limit using the -t/--time flag.

  • When the size of messages reaches 10 GB on a topic within 3 hours, the acknowledged messages will not be retained.
  • After 3 hours, even if the message size is less than 10 GB, the acknowledged messages will not be retained.

In the following example, the time is not limited and the size limit is set to 1 TB. The size limit determines the retention.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 1T \ --time -1

In the following example, the size is not limited and the time limit is set to 3 hours. The time limit determines the retention.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time 3h

To achieve infinite retention, set both values to -1.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size -1 \ --time -1

To disable the retention policy, set both values to 0.

  1. $ pulsar-admin namespaces set-retention my-tenant/my-ns \ --size 0 \ --time 0

  1. int retentionTime = 10; // 10 minutesint retentionSize = 500; // 500 megabytesRetentionPolicies policies = new RetentionPolicies(retentionTime, retentionSize);admin.namespaces().setRetention(namespace, policies);

获取保留策略

You can fetch the retention policy for a namespace by specifying the namespace. The output will be a JSON object with two keys: retentionTimeInMinutes and retentionSizeInMB.

pulsar-admin

使用 get-retention子命令并指定命名空间。

示例
  1. {
  2. "retentionTimeInMinutes": 10,
  3. "retentionSizeInMB": 500
  4. }

REST API

Java

Backlogs are sets of unacknowledged messages for a topic that have been stored by bookies. Pulsar stores all unacknowledged messages in backlogs until they are processed and acknowledged.

You can control the allowable size of backlogs, at the namespace level, using backlog quotas. Setting a backlog quota involves setting:

待办:如何对每个主题或每个 backlog 进行扩展?

  • an allowable size threshold for each topic in the namespace

可以使用以下保留策略:

策略触发的操作
producer_request_holdBroker 会持有生产者投递的消息,但并不会把投递的消息进行持久化存储
producer_exceptionBroker 会与客户端断开连接并抛出异常
consumer_backlog_evictionBroker 将开始丢弃backlog的消息

Backlog quotas are handled at the namespace level. They can be managed via:

You can set a size and/or time threshold and backlog retention policy for all of the topics in a by specifying the namespace, a size limit and/or a time limit in second, and a policy by name.

pulsar-admin

使用子命令,并使用 -l/--limit 参数指定命名空间大小限制 ,以及使用 <code>-p/--policy 参数指定保留策略。

示例
  1. $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \
  2. --limit 2G \
  3. --limitTime 36000 \
  4. --policy producer_request_hold

REST API

Java

  1. long sizeLimit = 2147483648L;
  2. BacklogQuota.RetentionPolicy policy = BacklogQuota.RetentionPolicy.producer_request_hold;
  3. BacklogQuota quota = new BacklogQuota(sizeLimit, policy);
  4. admin.namespaces().setBacklogQuota(namespace, quota);

获取 backlog 大小阈值和 backlog 保留策略

可以查看已对命名空间应用的大小阈值和 backlog 保留策略。

pulsar-admin

Use the subcommand and specify a namespace. 下面是一个示例:

  1. $ pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
  2. {
  3. "destination_storage": {
  4. "policy" : "producer_request_hold"
  5. }
  6. }

REST API

Java

  1. Map<BacklogQuota.BacklogQuotaType,BacklogQuota> quotas =
  2. admin.namespaces().getBacklogQuotas(namespace);

移除 backlog quotas

pulsar-admin

Use the subcommand and specify a namespace. 下面是一个示例:

  1. $ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns

REST API

Java

  1. admin.namespaces().removeBacklogQuota(namespace);

pulsar-admin

使用 clear-backlog 子命令。

示例

By default, you will be prompted to ensure that you really want to clear the backlog for the namespace. You can override the prompt using the -f/--force flag.

默认情况下,Pulsar 会永久存储所有未确认的消息。 在大量消息未得到确认的情况下,可能会导致大量磁盘空间的使用。 如果需要考虑磁盘空间,可以设置生存时间(TTL),以确定未确认的消息将保留多长时间。

设置命名空间的TTL

pulsar-admin

使用 set-message-ttl 子命令并指定命名空间和TTL(以秒为单位,使用-ttl/--messageTTL参数指定)。

示例
  1. $ pulsar-admin namespaces set-message-ttl my-tenant/my-ns \
  2. --messageTTL 120 # TTL of 2 minutes

REST API

Java

  1. admin.namespaces().setNamespaceMessageTTL(namespace, ttlInSeconds);

获取命名空间的 TTL 配置

pulsar-admin

使用 子命令并指定命名空间。

示例
  1. $ pulsar-admin namespaces get-message-ttl my-tenant/my-ns
  2. 60

REST API

GET /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().getNamespaceMessageTTL(namespace)

pulsar-admin

Use the subcommand and specify a namespace.

示例
  1. $ pulsar-admin namespaces remove-message-ttl my-tenant/my-ns

REST API

DELETE /admin/v2/namespaces/:tenant/:namespace/messageTTL

Java

  1. admin.namespaces().removeNamespaceMessageTTL(namespace)

If you do not have any retention period and that you never have much of a backlog, the upper limit for retaining messages, which are acknowledged, equals to the Pulsar segment rollover period + entry log rollover period + (garbage collection interval * garbage collection ratios).

  • Segment rollover period: basically, the segment rollover period is how often a new segment is created. Once a new segment is created, the old segment will be deleted. By default, this happens either when you have written 50,000 entries (messages) or have waited 240 minutes. You can tune this in your broker.

  • Entry log rollover period: multiple ledgers in BookKeeper are interleaved into an entry log. In order for a ledger that has been deleted, the entry log must all be rolled over. The entry log rollover period is configurable, but is purely based on the entry log size. For details, see . Once the entry log is rolled over, the entry log can be garbage collected.