以下说明和命令中所使用 topic 的名称结构如下:

    Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.

    pulsar-admin

    你可以使用create-partitioned-topic命令创建partitioned topic,并指定topic的名字;使用-p--partitions标志指定分区数。

    Here’s an example:

    1. $ bin/pulsar-admin topics create-partitioned-topic \
    2. persistent://my-tenant/my-namespace/my-topic \
    3. --partitions 4

    Note

    If there already exists a non partitioned topic with suffix ‘-partition-‘ followed by numeric value like ‘xyz-topic-partition-10’, then you can not create partitioned topic with name ‘xyz-topic’ as the partitions of the partitioned topic could override the existing non partitioned topic. You have to delete that non partitioned topic first then create the partitioned topic.

    REST API

    Java

    1. String topicName = "persistent://my-tenant/my-namespace/my-topic";
    2. int numPartitions = 4;
    3. admin.persistentTopics().createPartitionedTopic(topicName, numPartitions);

    Create missed partitions

    pulsar-admin

    You can create missed partitions using the command and specifying the topic name as an argument.

    Here’s an example:

    1. $ bin/pulsar-admin topics create-missed-partitions \
    2. persistent://my-tenant/my-namespace/my-topic \

    REST API

    Java

    1. admin.persistentTopics().createMissedPartitions(topicName);

    Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:

    pulsar-admin

    You can see the number of partitions in a partitioned topic using the get-partitioned-topic-metadata subcommand. Here’s an example:

    1. $ pulsar-admin topics get-partitioned-topic-metadata \
    2. persistent://my-tenant/my-namespace/my-topic
    3. {
    4. "partitions": 4
    5. }

    REST API

    GET /admin/v2/persistent/:tenant/:namespace/:topic/partitions

    Java

    更新

    You can update the number of partitions on an existing partitioned topic if the topic is non-global. To update, the new number of partitions must be greater than the existing number.

    Decrementing the number of partitions would deleting the topic, which is not supported in Pulsar.

    已创建的分区生产者和消费者,将自动查找新创建的分区。

    pulsar-admin

    Partitioned topics can be updated using the update-partitioned-topic command.

    1. $ pulsar-admin topics update-partitioned-topic \
    2. persistent://my-tenant/my-namespace/my-topic \
    3. --partitions 8

    REST API

    Java

    1. admin.persistentTopics().updatePartitionedTopic(persistentTopic, numPartitions);

    pulsar-admin

    Partitioned topics can be deleted using the delete-partitioned-topic command, specifying the topic by name:

    1. $ bin/pulsar-admin topics delete-partitioned-topic \
    2. persistent://my-tenant/my-namespace/my-topic

    REST API

    DELETE /admin/v2/persistent/:topic/:namespace/:destination/partitions

    Java

    1. admin.persistentTopics().delete(persistentTopic);

    获取资源列表

    It provides a list of persistent topics existing under a given namespace.

    pulsar-admin

    1. $ pulsar-admin topics list tenant/namespace
    2. persistent://tenant/namespace/topic1
    3. persistent://tenant/namespace/topic2

    REST API

    Java

    It shows current statistics of a given partitioned topic. Here’s an example payload:

    1. "msgRateIn": 4641.528542257553,
    2. "msgThroughputIn": 44663039.74947473,
    3. "msgRateOut": 0,
    4. "msgThroughputOut": 0,
    5. "averageMsgSize": 1232439.816728665,
    6. "storageSize": 135532389160,
    7. "publishers": [
    8. {
    9. "msgRateIn": 57.855383881403576,
    10. "msgThroughputIn": 558994.7078932219,
    11. "averageMsgSize": 613135,
    12. "producerId": 0,
    13. "producerName": null,
    14. "address": null,
    15. "connectedSince": null
    16. }
    17. ],
    18. "subscriptions": {
    19. "my-topic_subscription": {
    20. "msgRateOut": 0,
    21. "msgThroughputOut": 0,
    22. "msgBacklog": 116632,
    23. "type": null,
    24. "msgRateExpired": 36.98245516804671,
    25. "consumers": []
    26. },
    27. "replication": {}
    28. }

    The following stats are available:

    pulsar-admin

    The stats for the partitioned topic and its connected producers and consumers can be fetched by using the partitioned-stats command, specifying the topic by name:

    1. $ pulsar-admin topics partitioned-stats \
    2. persistent://test-tenant/namespace/topic \
    3. --per-partition

    REST API

    GET /admin/v2/persistent/:tenant/:namespace/:topic/partitioned-stats

    Java

    1. admin.topics().getPartitionedStats(persistentTopic, true /* per partition */, false /* is precise backlog */);

    Internal stats

    获取 topic 的详细统计信息。

    1. {
    2. "numberOfEntries": 3233,
    3. "totalSize": 331482,
    4. "currentLedgerEntries": 3233,
    5. "currentLedgerSize": 331482,
    6. "lastLedgerCreatedTimestamp": "2016-06-29 03:00:23.825",
    7. "lastLedgerCreationFailureTimestamp": null,
    8. "waitingCursorsCount": 1,
    9. "pendingAddEntriesCount": 0,
    10. "lastConfirmedEntry": "324711539:3232",
    11. "state": "LedgerOpened",
    12. "ledgers": [
    13. {
    14. "ledgerId": 324711539,
    15. "entries": 0,
    16. "size": 0
    17. }
    18. ],
    19. "cursors": {
    20. "my-subscription": {
    21. "markDeletePosition": "324711539:3133",
    22. "readPosition": "324711539:3233",
    23. "waitingReadOp": true,
    24. "pendingReadOps": 0,
    25. "messagesConsumedCounter": 20449501,
    26. "cursorLedger": 324702104,
    27. "cursorLedgerLastEntry": 21,
    28. "individuallyDeletedMessages": "[(324711539:3134‥324711539:3136], (324711539:3137‥324711539:3140], ]",
    29. "lastLedgerSwitchTimestamp": "2016-06-29 01:30:19.313",
    30. "state": "Open"
    31. }
    32. }
    33. }

    pulsar-admin

    The internal stats for the partitioned topic can be fetched by using the stats-internal command, specifying the topic by name:

    1. $ pulsar-admin topics stats-internal \

    REST API

    Java