以下说明和命令中所使用 topic 的名称结构如下:

    以列表的形式列出指定命名空间下所有持久 topic。

    pulsar-admin

    可以使用 命令获取 topic 列表。

    1. $ pulsar-admin persistent list \
    2. my-tenant/my-namespace

    REST API

    GET /admin/v2/persistent/:tenant/:namespace

    Java

    1. String namespace = "my-tenant/my-namespace";
    2. admin.persistentTopics().getList(namespace);

    授权

    授权给客户端用户,允许其在指定的 topic 上执行某些操作。

    pulsar-admin

    可以使用 grant-permission 命令授权。

    1. $ pulsar-admin persistent grant-permission \
    2. --actions produce,consume --role application1 \
    3. persistent://test-tenant/ns1/tp1 \

    REST API

    POST /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. String role = "test-role";
    3. Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce, AuthAction.consume);
    4. admin.persistentTopics().grantPermission(topic, role, actions);

    获取权限

    可以使用 命令获取权限。

    pulsar-admin

    1. $ pulsar-admin persistent permissions \
    2. persistent://test-tenant/ns1/tp1 \
    3. {
    4. "application1": [
    5. "consume",
    6. "produce"
    7. ]
    8. }

    REST API

    GET /admin/v2/persistent/:tenant/:namespace/:topic/permissions

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. admin.persistentTopics().getPermissions(topic);

    取消权限

    取消已经授予客户端用户的权限。

    pulsar-admin

    可以使用 revoke-permission 命令取消权限。

    1. $ pulsar-admin persistent revoke-permission \
    2. --role application1 \
    3. persistent://test-tenant/ns1/tp1 \
    4. {
    5. "application1": [
    6. "consume",
    7. "produce"
    8. ]
    9. }

    REST API

    DELETE /admin/v2/persistent/:tenant/:namespace/:topic/permissions/:role

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. String role = "test-role";
    3. admin.persistentTopics().revokePermissions(topic, role);

    删除 topic

    It deletes a topic. The topic cannot be deleted if there’s any active subscription or producers connected to it.

    pulsar-admin

    可以使用 delete 命令删除 topic。

    1. $ pulsar-admin persistent delete \
    2. persistent://test-tenant/ns1/tp1 \

    REST API

    DELETE /admin/v2/persistent/:tenant/:namespace/:topic

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. admin.persistentTopics().delete(topic);

    卸载 topic

    可以卸载 topic。

    pulsar-admin

    可以使用 unload 命令卸载 topic。

    1. $ pulsar-admin persistent unload \
    2. persistent://test-tenant/ns1/tp1 \

    REST API

    PUT /admin/v2/persistent/:tenant/:namespace/:topic/unload

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. admin.persistentTopics().unload(topic);

    获取非持久化 topic 的统计数据。

    • msgRateIn: The sum of all local and replication publishers’ publish rates in messages per second

    • msgThroughputIn: Same as above, but in bytes per second instead of messages per second

    • msgRateOut: The sum of all local and replication consumers’ dispatch rates in messages per second

    • msgThroughputOut: Same as above, but in bytes per second instead of messages per second

    • averageMsgSize: The average size in bytes of messages published within the last interval

    • storageSize: The sum of the ledgers’ storage size for this topic. Space used to store the messages for the topic

    • publishers: The list of all local publishers into the topic. There can be zero or thousands

      • msgRateIn: Total rate of messages published by this publisher in messages per second

      • msgThroughputIn: Total throughput of the messages published by this publisher in bytes per second

      • averageMsgSize: Average message size in bytes from this publisher within the last interval

      • producerId: Internal identifier for this producer on this topic

      • producerName: Internal identifier for this producer, generated by the client library

      • address: IP address and source port for the connection of this producer

      • connectedSince: Timestamp this producer was created or last reconnected

    • subscriptions: The list of all local subscriptions to the topic

      • my-subscription: The name of this subscription (client defined)

        • msgRateOut: Total rate of messages delivered on this subscription (msg/s)

        • msgThroughputOut: Total throughput delivered on this subscription (bytes/s)

        • msgBacklog: Number of messages in the subscription backlog

        • type: This subscription type

        • lastExpireTimestamp: The last message expire execution timestamp

        • lastConsumedFlowTimestamp: The last flow command received timestamp

        • lastConsumedTimestamp: The latest timestamp of all the consumed timestamp of the consumers

        • lastAckedTimestamp: The latest timestamp of all the acked timestamp of the consumers

        • consumers: The list of connected consumers for this subscription

          • msgRateOut: Total rate of messages delivered to the consumer (msg/s)

          • msgThroughputOut: Total throughput delivered to the consumer (bytes/s)

          • consumerName: Internal identifier for this consumer, generated by the client library

          • availablePermits: The number of messages this consumer has space for in the client library’s listen queue. 值为0意味着客户端类库的队列已经满了,receive()不再被调用。 非零值意味着 consumer 可以接收消息。

          • unackedMessages: Number of unacknowledged messages for the consumer

          • blockedConsumerOnUnackedMsgs: Flag to verify if the consumer is blocked due to reaching threshold of unacked messages

          • lastConsumedTimestamp: The timestamp of the consumer last consume a message

          • lastAckedTimestamp: The timestamp of the consumer last ack a message

    • replication: This section gives the stats for cross-colo replication of this topic

      • msgRateIn: Total rate of messages received from the remote cluster (msg/s)

      • msgThroughputIn: Total throughput received from the remote cluster (bytes/s)

      • msgRateOut: Total rate of messages delivered to the replication-subscriber (msg/s)

      • msgThroughputOut: Total through delivered to the replication-subscriber (bytes/s)

      • msgRateExpired: Total rate of messages expired (msg/s)

      • replicationBacklog: Number of messages pending to be replicated to remote cluster

      • connected: Whether the outbound replicator is connected

      • replicationDelayInSeconds: How long the oldest message has been waiting to be sent through the connection, if connected is true

      • inboundConnection: The IP and port of the broker in the remote cluster’s publisher connection to this broker

      • inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。

      • outboundConnection: Address of outbound replication connection

      • outboundConnectedSince: Timestamp of establishing outbound connection

    pulsar-admin

    可以使用 命令获取 topic 的统计信息。

    1. $ pulsar-admin persistent stats \

    REST API

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. admin.persistentTopics().getStats(topic);

    获取内部统计信息

    获取 topic 的详细统计信息。

    • entriesAddedCounter: Messages published since this broker loaded this topic

    • numberOfEntries: Total number of messages being tracked

    • totalSize: Total storage size in bytes of all messages

    • currentLedgerEntries: Count of messages written to the ledger currently open for writing

    • currentLedgerSize: Size in bytes of messages written to ledger currently open for writing

    • lastLedgerCreatedTimestamp: time when last ledger was created

    • lastLedgerCreationFailureTimestamp: time when last ledger was failed

    • waitingCursorsCount: How many cursors are “caught up” and waiting for a new message to be published

    • pendingAddEntriesCount: How many messages have (asynchronous) write requests we are waiting on completion

    • lastConfirmedEntry: The ledgerid:entryid of the last message successfully written. 如果 entryid 为 -1,则 ledger 已经允许写入或正在开放写入权限,但还没有写入 entry。

    • ledgers: The ordered list of all ledgers for this topic holding its messages

      • ledgerId: Id of this ledger

      • entries: Total number of entries belong to this ledger

      • size: Size of messages written to this ledger (in bytes)

      • offloaded: Whether this ledger is offloaded

    • cursors: The list of all cursors on this topic. There will be one for every subscription you saw in the topic stats.

      • markDeletePosition: All of messages before the markDeletePosition are acknowledged by the subscriber.

      • readPosition: The latest position of subscriber for reading message

      • waitingReadOp: This is true when the subscription has read the latest message published to the topic and is waiting on new messages to be published.

      • pendingReadOps: The counter for how many outstanding read requests to the BookKeepers we have in progress

      • messagesConsumedCounter: Number of messages this cursor has acked since this broker loaded this topic

      • cursorLedger: The ledger being used to persistently store the current markDeletePosition

      • cursorLedgerLastEntry: The last entryid used to persistently store the current markDeletePosition

      • individuallyDeletedMessages: If Acks are being done out of order, shows the ranges of messages Acked between the markDeletePosition and the read-position

      • lastLedgerSwitchTimestamp: The last time the cursor ledger was rolled over

      • state: The state of the cursor ledger: Open means we have a cursor ledger for saving updates of the markDeletePosition.

    1. {
    2. "numberOfEntries": 3233,
    3. "totalSize": 331482,
    4. "currentLedgerEntries": 3233,
    5. "currentLedgerSize": 331482,
    6. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
    7. "lastLedgerCreationFailureTimestamp": null,
    8. "waitingCursorsCount": 1,
    9. "pendingAddEntriesCount": 0,
    10. "lastConfirmedEntry": "324711539:3232",
    11. "state": "LedgerOpened",
    12. "ledgers": [
    13. {
    14. "ledgerId": 324711539,
    15. "entries": 0,
    16. "size": 0
    17. }
    18. ],
    19. "cursors": {
    20. "my-subscription": {
    21. "markDeletePosition": "324711539:3133",
    22. "readPosition": "324711539:3233",
    23. "waitingReadOp": true,
    24. "pendingReadOps": 0,
    25. "messagesConsumedCounter": 20449501,
    26. "cursorLedger": 324702104,
    27. "cursorLedgerLastEntry": 21,
    28. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
    29. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
    30. "state": "Open"
    31. }
    32. }
    33. }

    pulsar-admin

    可以使用 命令获取 topic 中数据的统计信息。

    1. $ pulsar-admin persistent stats-internal \
    2. persistent://test-tenant/ns1/tp1 \

    REST API

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. admin.persistentTopics().getInternalStats(topic);

    查看消息

    查看指定 topic 中某个订阅的 N 条消息。

    pulsar-admin

    1. $ pulsar-admin persistent peek-messages \
    2. --count 10 --subscription my-subscription \
    3. persistent://test-tenant/ns1/tp1 \
    4. Message ID: 315674752:0
    5. Properties: { "X-Pulsar-publish-time" : "2015-07-13 17:40:28.451" }
    6. msg-payload

    REST API

    GET /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/position/:messagePosition

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. String subName = "my-subscription";
    3. admin.persistentTopics().peekMessages(topic, subName, numMessages);

    Get message by ID

    It fetches the message with given ledger id and entry id.

    pulsar-admin

    1. $ ./bin/pulsar-admin topics get-message-by-id \
    2. persistent://public/default/my-topic \
    3. -l 10 -e 0

    REST API

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. long ledgerId = 10;
    3. long entryId = 10;
    4. admin.persistentTopics().getMessageById(topic, ledgerId, entryId);

    跳过消息

    某个订阅跳过指定topic的N条消息。

    pulsar-admin

    1. $ pulsar-admin persistent skip \
    2. --count 10 --subscription my-subscription \
    3. persistent://test-tenant/ns1/tp1 \

    REST API

    POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/skip/:numMessages

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. String subName = "my-subscription";
    3. int numMessages = 1;
    4. admin.persistentTopics().skipMessages(topic, subName, numMessages);

    跳过所有消息

    某个订阅跳过指定topic的所有消息

    pulsar-admin

    1. $ pulsar-admin persistent skip-all \
    2. --subscription my-subscription \
    3. persistent://test-tenant/ns1/tp1 \

    REST API

    更多信息

    Java

    重置订阅的cursor位置回到X分钟之前被记录的位置。 实际上通过计算时间和X分钟之前的cursor位置,来重置回到那个位置。

    pulsar-admin

    1. $ pulsar-admin persistent reset-cursor \
    2. --subscription my-subscription --time 10 \
    3. persistent://test-tenant/ns1/tp1 \

    REST API

    POST /admin/v2/persistent/:tenant/:namespace/:topic/subscription/:subName/resetcursor/:timestamp

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. String subName = "my-subscription";
    3. long timestamp = 2342343L;
    4. admin.persistentTopics().skipAllMessages(topic, subName, timestamp);

    查询topic

    定位正在服务于指定topic的broker

    pulsar-admin

    1. $ pulsar-admin persistent lookup \
    2. persistent://test-tenant/ns1/tp1 \
    3. "pulsar://broker1.org.com:4480"

    REST API

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. admin.lookup().lookupDestination(topic);

    获取bundle

    给出包含指定topic的bundle范围。

    pulsar-admin

    1. $ pulsar-admin persistent bundle-range \
    2. persistent://test-tenant/ns1/tp1 \
    3. "0x00000000_0xffffffff"

    REST API

    GET /lookup/v2/topic/:topic_domain/:tenant/:namespace/:topic/bundle

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. admin.lookup().getBundleRange(topic);

    获取订阅

    给出了指定topic的所有订阅的名称。

    pulsar-admin

    1. $ pulsar-admin persistent subscriptions \
    2. persistent://test-tenant/ns1/tp1 \
    3. my-subscription

    REST API

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. admin.persistentTopics().getSubscriptions(topic);

    取消订阅

    当不再处理更多消息时,可以取消订阅

    pulsar-admin

    1. $ pulsar-admin persistent unsubscribe \
    2. --subscription my-subscription \
    3. persistent://test-tenant/ns1/tp1 \

    REST API

    DELETE /admin/v2/namespaces/:tenant/:namespace/:topic/subscription/:subscription

    Java

    1. String topic = "persistent://my-tenant/my-namespace/my-topic";
    2. String subscriptionName = "my-subscription";
    3. admin.persistentTopics().deleteSubscription(topic, subscriptionName);

    最后一条消息Id

    给出提交到持久topic的最后一条消息ID,将在2.3.0中提供此特性。

    1. pulsar-admin topics last-message-id topic-name

    REST API

    {% endpoint Get /admin/v2/persistent/:tenant/:namespace/:topic/lastMessageId %}

    Java

    1. admin.persistentTopics().getLastMessage(topic);