Pulsar configuration
BookKeeper 是一个冗余的日志存储系统,Pulsar 用它来持久化存储所有消息。
Broker
Pulsar broker 负责处理从生产者发出消息、向消费者派发消息、在集群间复制数据等。
<td>
Specify multiple advertised listeners for the broker.<br><br>The format is <code><listener_name>:pulsar://<host>:<port></code>.<br><br>If there are multiple listeners, separate them with commas.<br><br><strong x-id="1">Note</strong>: do not use this configuration with <code>advertisedAddress</code> and <code>brokerServicePort</code>. If the value of this configuration is empty, the broker uses <code>advertisedAddress</code> and <code>brokerServicePort</code>
</td>
<td>
/
</td>
<td>
Specify the internal listener name for the broker.<br><br><strong x-id="1">Note</strong>: the listener name must be contained in <code>advertisedListeners</code>.<br><br> If the value of this configuration is empty, the broker uses the first listener as the internal listener.
</td>
<td>
/
</td>
<td>
If this flag is set to <code>true</code>, the broker authenticates the original Auth data; else it just accepts the originalPrincipal and authorizes it (if required).
</td>
<td>
false
</td>
<td>
Whether persistent topics are enabled on the broker
</td>
<td>
true
</td>
<td>
Whether non-persistent topics are enabled on the broker
</td>
<td>
true
</td>
<td>
Whether the Pulsar Functions worker service is enabled in the broker
</td>
<td>
false
</td>
<td>
Whether to enable topic level metrics.
</td>
<td>
true
</td>
<td>
</td>
<td>
60
</td>
<td>
</td>
<td>
60
</td>
<td>
Zookeeper quorum connection string
</td>
<td>
</td>
<td>
ZooKeeper 缓存过期时间(秒)
</td>
<td>
300
</td>
<td>
配置存储连接字符串(以逗号分隔的列表)
</td>
<td>
</td>
<td>
Broker data port
</td>
<td>
6650
</td>
<td>
Broker data port for TLS
</td>
<td>
6651
</td>
<td>
Port to use to server HTTP request
</td>
<td>
8080
</td>
<td>
Port to use to server HTTPS request
</td>
<td>
</td>
<td>
Enable the WebSocket API service in broker
</td>
<td>
false
</td>
<td>
The number of connections per Broker in Pulsar Client used in WebSocket proxy.
</td>
<td>
8
</td>
<td>
Time in milliseconds that idle WebSocket session times out.
</td>
<td>
300000
</td>
<td>
The maximum size of a text message during parsing in WebSocket proxy.
</td>
<td>
1048576
</td>
<td>
Whether to enable topic level metrics.
</td>
<td>
true
</td>
<td>
Whether to enable consumer level metrics.
</td>
<td>
false
</td>
<td>
Classname of Pluggable JVM GC metrics logger that can log GC specific metrics.
</td>
<td>
N/A
</td>
<td>
Hostname or IP address the service binds on, default is 0.0.0.0.
</td>
<td>
0.0.0.0
</td>
<td>
服务对外发布的主机名或 IP 地址。 如果未设置,将使用 <code>InnetAddress.getLocalHost().getHostName()</code> 的值。
</td>
<td>
</td>
<td>
Name of the cluster to which this broker belongs to
</td>
<td>
</td>
<td>
Sets the default behavior for message deduplication in the broker. If enabled, the broker will reject messages that were already stored in the topic. This setting can be overridden on a per-namespace basis.
</td>
<td>
false
</td>
<td>
The maximum number of producers for which information will be stored for deduplication purposes.
</td>
<td>
10000
</td>
<td>
去重快照生成后,允许存储的去重信息数量。 A larger interval will lead to fewer snapshots being taken, though this would also lengthen the topic recovery time (the time required for entries published after the snapshot to be replayed).
</td>
<td>
1000
</td>
<td>
The time of inactivity (in minutes) after which the broker will discard deduplication information related to a disconnected producer.
</td>
<td>
360
</td>
<td>
The default messages per second dispatch throttling-limit for every replicator in replication. The value of <code>0</code> means disabling replication message dispatch-throttling
</td>
<td>
0
</td>
<td>
The default bytes per second dispatch throttling-limit for every replicator in replication. The value of <code>0</code> means disabling replication message-byte dispatch-throttling
</td>
<td>
0
</td>
<td>
Zookeeper session timeout in milliseconds
</td>
<td>
30000
</td>
<td>
等待 broker 关闭的时长。 达到本设置时间之后,将会直接关掉进程。
</td>
<td>
60000
</td>
<td>
</td>
<td>
true
</td>
<td>
How often to check for topics that have reached the quota
</td>
<td>
60
</td>
<td>
The default per-topic backlog quota limit. Being less than 0 means no limitation. By default, it is -1.
</td>
<td>
-1
</td>
<td>
The defaulted backlog quota retention policy. By Default, it is <code>producer_request_hold</code>. <li>'producer_request_hold' Policy which holds producer's send request until the resource becomes available (or holding times out)</li> <li>'producer_exception' Policy which throws `javax.jms.ResourceAllocationException` to the producer </li><li>'consumer_backlog_eviction' Policy which evicts the oldest message from the slowest consumer's backlog</li>
</td>
<td>
producer_request_hold
</td>
<td>
</td>
<td>
true
</td>
<td>
The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
</td>
<td>
non-partitioned
</td>
<td>
Enable subscription auto creation if a new consumer connected
</td>
<td>
true
</td>
<td>
The number of partitioned topics that is allowed to be automatically created if <code>allowAutoTopicCreationType</code> is partitioned
</td>
<td>
1
</td>
<td>
Enable the deletion of inactive topics. If topics are not consumed for some while, these inactive topics might be cleaned up. Deleting inactive topics is enabled by default. The default period is 1 minute.
</td>
<td>
true
</td>
<td>
How often to check for inactive topics
</td>
<td>
60
</td>
<td>
Set the mode to delete inactive topics. <li> <code>delete_when_no_subscriptions</code>: 删除没有订阅或活动生产者的主题。 <li> <code>delete_when_subscriptions_caught_up</code>: delete the topic whose subscriptions have no backlogs and which has no active producers or consumers.
</td>
<td>
<code>delete_when_no_subscriptions</code>
</td>
<td>
Set the maximum duration for inactive topics. If it is not specified, the <code>brokerDeleteInactiveTopicsFrequencySeconds</code> parameter is adopted.
</td>
<td>
N/A
</td>
<td>
Enable you to delete a tenant forcefully.
</td>
<td>
false
</td>
<td>
Enable you to delete a namespace forcefully.
</td>
<td>
false
</td>
<td>
The frequency of proactively checking and purging expired messages.
</td>
<td>
5
</td>
<td>
Interval between checks to determine whether topics with compaction policies need compaction.
</td>
<td>
60
</td>
配置项 |
---|
advertisedListeners |
internalListenerName |
authenticateOriginalAuthData |
enablePersistentTopics |
enableNonPersistentTopics |
functionsWorkerEnabled |
exposePublisherStats |
statsUpdateFrequencyInSecs |
statsUpdateInitialDelayInSecs |
zookeeperServers |
zooKeeperCacheExpirySeconds |
configurationStoreServers |
brokerServicePort |
brokerServicePortTls |
webServicePort |
webServicePortTls |
webSocketServiceEnabled |
webSocketNumIoThreads |
webSocketConnectionsPerBroker |
webSocketSessionIdleTimeoutMillis |
webSocketMaxTextFrameSize |
exposeTopicLevelMetricsInPrometheus |
exposeConsumerLevelMetricsInPrometheus |
jvmGCMetricsLoggerClassName |
bindAddress |
advertisedAddress |
clusterName |
brokerDeduplicationEnabled |
brokerDeduplicationMaxNumberOfProducers |
brokerDeduplicationEntriesInterval |
brokerDeduplicationProducerInactivityTimeoutMinutes |
dispatchThrottlingRatePerReplicatorInMsg |
dispatchThrottlingRatePerReplicatorInByte |
zooKeeperSessionTimeoutMillis |
brokerShutdownTimeoutMs |
skipBrokerShutdownOnOOM |
backlogQuotaCheckEnabled |
backlogQuotaCheckIntervalInSeconds |
backlogQuotaDefaultLimitGB |
backlogQuotaDefaultRetentionPolicy |
allowAutoTopicCreation |
allowAutoTopicCreationType |
allowAutoSubscriptionCreation |
defaultNumPartitions |
brokerDeleteInactiveTopicsEnabled |
brokerDeleteInactiveTopicsFrequencySeconds |
brokerDeleteInactiveTopicsMode |
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds |
forceDeleteTenantAllowed |
forceDeleteNamespaceAllowed |
messageExpiryCheckIntervalInMinutes |
brokerServiceCompactionMonitorIntervalInSeconds |
brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater than this threshold, compression is triggered.
Set this threshold to 0 means disabling the compression check.|N/A |delayedDeliveryEnabled| Whether to enable the delayed delivery for messages. If disabled, messages will be immediately delivered and there will be no tracking overhead.|true| |delayedDeliveryTickTimeMillis|Control the tick time for retrying on delayed delivery, which affects the accuracy of the delivery time compared to the scheduled time. By default, it is 1 second.|1000| |activeConsumerFailoverDelayTimeMillis| How long to delay rewinding cursor and dispatching messages when active consumer is changed. |1000| |clientLibraryVersionCheckEnabled| Enable check for minimum allowed client library version |false| |clientLibraryVersionCheckAllowUnversioned| Allow client libraries with no version information |true| |statusFilePath| Path for the file used to determine the rotation status for the broker when responding to service discovery health checks || |preferLaterVersions| If true, (and ModularLoadManagerImpl is being used), the load manager will attempt to use only brokers running the latest software version (to minimize impact to bundles) |false| |maxNumPartitionsPerPartitionedTopic|Max number of partitions per partitioned topic. Use 0 or negative number to disable the check|0| |tlsEnabled|Deprecated - Use
Setting this configuration to a value greater than 0 deletes inactive subscriptions automatically.
Setting this configuration to 0 does not delete inactive subscriptions automatically.
Since this configuration takes effect on all topics, if there is even one topic whose subscriptions should not be deleted automatically, you need to set it to 0.
Instead, you can set a subscription expiration time for each namespace using the . | 0 | |maxConcurrentLookupRequest| Max number of concurrent lookup request broker allows to throttle heavy incoming lookup traffic |50000| |maxConcurrentTopicLoadRequest| Max number of concurrent topic loading request broker allows to control number of zk-operations |5000| |authenticationEnabled| Enable authentication |false| |authenticationProviders| Authentication provider name list, which is comma separated list of class names || | authenticationRefreshCheckSeconds | Interval of time for checking for expired authentication credentials | 60 | |authorizationEnabled| Enforce authorization |false| |superUserRoles| Role names that are treated as “super-user”, meaning they will be able to do all admin operations and publish/consume from all topics || |brokerClientAuthenticationPlugin| Authentication settings of the broker itself. Used when the broker connects to other brokers, either in same or other clusters || |brokerClientAuthenticationParameters||| |athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication || |exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false| |schemaRegistryStorageClassName|The schema storage implementation used by this broker.|org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory| |isSchemaValidationEnforced|Enforce schema validation on following cases: if a producer without a schema attempts to produce to a topic with schema, the producer will be failed to connect. PLEASE be carefully on using this, since non-java clients don’t support schema. If this setting is enabled, then non-java clients fail to produce.|false| |offloadersDirectory|The directory for all the offloader implementations.|./offloaders| |bookkeeperMetadataServiceUri| Metadata service uri that bookkeeper is used for loading corresponding metadata driver and resolving its metadata service location. This value can be fetched using bookkeeper shell whatisinstanceid
command in BookKeeper cluster. 例如: zk+hierarchical://localhost:2181/ledgers。 The metadata service uri list can also be semicolon separated values like below: ;zk2:2181;zk3:2181/ledgers || |bookkeeperClientAuthenticationPlugin| Authentication plugin to use when connecting to bookies || |bookkeeperClientAuthenticationParametersName| BookKeeper auth plugin implementatation specifics parameters name and values || |bookkeeperClientAuthenticationParameters||| |bookkeeperClientNumWorkerThreads| Number of BookKeeper client worker threads. Default is Runtime.getRuntime().availableProcessors() || |bookkeeperClientTimeoutInSeconds| Timeout for BK add / read operations |30| |bookkeeperClientSpeculativeReadTimeoutInMillis| Speculative reads are initiated if a read request doesn’t complete within a certain time Using a value of 0, is disabling the speculative reads |0| |bookkeeperNumberOfChannelsPerBookie| Number of channels per bookie |16| |bookkeeperClientHealthCheckEnabled| Enable bookies health check. Bookies that have more than the configured number of failure within the interval will be quarantined for some time. During this period, new ledgers won’t be created on these bookies |true| |bookkeeperClientHealthCheckIntervalSeconds||60| |bookkeeperClientHealthCheckErrorThresholdPerInterval||5| |bookkeeperClientHealthCheckQuarantineTimeInSeconds ||1800| |bookkeeperClientRackawarePolicyEnabled| Enable rack-aware bookie selection policy. BK will chose bookies from different racks when forming a new bookie ensemble |true| |bookkeeperClientRegionawarePolicyEnabled| Enable region-aware bookie selection policy. BK will chose bookies from different regions and racks when forming a new bookie ensemble. If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored |false| |bookkeeperClientMinNumRacksPerWriteQuorum| Minimum number of racks per write quorum. BK rack-aware bookie selection policy will try to get bookies from at least ‘bookkeeperClientMinNumRacksPerWriteQuorum’ racks for a write quorum. |2| |bookkeeperClientEnforceMinNumRacksPerWriteQuorum| Enforces rack-aware bookie selection policy to pick bookies from ‘bookkeeperClientMinNumRacksPerWriteQuorum’ racks for a writeQuorum. If BK can’t find bookie then it would throw BKNotEnoughBookiesException instead of picking random one. |false| |bookkeeperClientReorderReadSequenceEnabled| Enable/disable reordering read sequence on reading entries. |false| |bookkeeperClientIsolationGroups| Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie outside the specified groups will not be used by the broker || |bookkeeperClientSecondaryIsolationGroups| Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn’t have enough bookie available. || |bookkeeperClientMinAvailableBookiesInIsolationGroups| Minimum bookies that should be available as part of bookkeeperClientIsolationGroups else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. || |bookkeeperClientGetBookieInfoIntervalSeconds| Set the interval to periodically check bookie info |86400| |bookkeeperClientGetBookieInfoRetryIntervalSeconds| Set the interval to retry a failed bookie info lookup |60| |bookkeeperEnableStickyReads | Enable/disable having read operations for a ledger to be sticky to a single bookie. If this flag is enabled, the client will use one single bookie (by preference) to read all entries for a ledger. | true | |managedLedgerDefaultEnsembleSize| Number of bookies to use when creating a ledger |2| |managedLedgerDefaultWriteQuorum| Number of copies to store for each message |2| |managedLedgerDefaultAckQuorum| Number of guaranteed copies (acks to wait before write is complete) |2| |managedLedgerCacheSizeMB| Amount of memory to use for caching data payload in managed ledger. This memory is allocated from JVM direct memory and it’s shared across all the topics running in the same broker. By default, uses 1/5th of available direct memory || |managedLedgerCacheCopyEntries| Whether we should make a copy of the entry payloads when inserting in cache| false| |managedLedgerCacheEvictionWatermark| Threshold to which bring down the cache level when eviction is triggered |0.9| |managedLedgerCacheEvictionFrequency| Configure the cache eviction frequency for the managed ledger cache (evictions/sec) | 100.0 | |managedLedgerCacheEvictionTimeThresholdMillis| All entries that have stayed in cache for more than the configured time, will be evicted | 1000 | |managedLedgerCursorBackloggedThreshold| Configure the threshold (in number of entries) from where a cursor should be considered ‘backlogged’ and thus should be set as inactive. | 1000| |managedLedgerDefaultMarkDeleteRateLimit| Rate limit the amount of writes per second generated by consumer acking the messages |1.0| |managedLedgerMaxEntriesPerLedger| The max number of entries to append to a ledger before triggering a rollover. A ledger rollover is triggered after the min rollover time has passed and one of the following conditions is true:
- The max rollover time has been reached
- The max entries have been written to the ledger
- The max ledger size has been written to the ledger
|50000| |managedLedgerMinLedgerRolloverTimeMinutes| Minimum time between ledger rollover for a topic |10| |managedLedgerMaxLedgerRolloverTimeMinutes| Maximum time before forcing a ledger rollover for a topic |240| |managedLedgerCursorMaxEntriesPerLedger| Max number of entries to append to a cursor ledger |50000| |managedLedgerCursorRolloverTimeInSeconds| Max time before triggering a rollover on a cursor ledger |14400| |managedLedgerMaxUnackedRangesToPersist| Max number of “acknowledgment holes” that are going to be persistently stored. When acknowledging out of order, a consumer will leave holes that are supposed to be quickly filled by acking all the messages. The information of which messages are acknowledged is persisted by compressing in “ranges” of messages that were acknowledged. After the max number of ranges is reached, the information will only be tracked in memory and messages will be redelivered in case of crashes. |1000| |autoSkipNonRecoverableData| Skip reading non-recoverable/unreadable data-ledger under managed-ledger’s list.It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger. |false| |loadBalancerEnabled| Enable load balancer |true| |loadBalancerPlacementStrategy| Strategy to assign a new bundle weightedRandomSelection || |loadBalancerReportUpdateThresholdPercentage| Percentage of change to trigger load report update |10| |loadBalancerReportUpdateMaxIntervalMinutes| maximum interval to update load report |15| |loadBalancerHostUsageCheckIntervalMinutes| Frequency of report to collect |1| |loadBalancerSheddingIntervalMinutes| Load shedding interval. Broker periodically checks whether some traffic should be offload from some over-loaded broker to other under-loaded brokers |30| |loadBalancerSheddingGracePeriodMinutes| Prevent the same topics to be shed and moved to other broker more that once within this timeframe |30| |loadBalancerBrokerMaxTopics| Usage threshold to allocate max number of topics to broker |50000| |loadBalancerBrokerUnderloadedThresholdPercentage| Usage threshold to determine a broker as under-loaded |1| |loadBalancerBrokerOverloadedThresholdPercentage| Usage threshold to determine a broker as over-loaded |85| |loadBalancerResourceQuotaUpdateIntervalMinutes| Interval to update namespace bundle resource quotat |15| |loadBalancerBrokerComfortLoadLevelPercentage| Usage threshold to determine a broker is having just right level of load |65| |loadBalancerAutoBundleSplitEnabled| enable/disable namespace bundle auto split |false| |loadBalancerNamespaceBundleMaxTopics| maximum topics in a bundle, otherwise bundle split will be triggered |1000| |loadBalancerNamespaceBundleMaxSessions| maximum sessions (producers + consumers) in a bundle, otherwise bundle split will be triggered |1000| |loadBalancerNamespaceBundleMaxMsgRate| maximum msgRate (in + out) in a bundle, otherwise bundle split will be triggered |1000| |loadBalancerNamespaceBundleMaxBandwidthMbytes| maximum bandwidth (in + out) in a bundle, otherwise bundle split will be triggered |100| |loadBalancerNamespaceMaximumBundles| maximum number of bundles in a namespace |128| |replicationMetricsEnabled| Enable replication metrics |true| |replicationConnectionsPerBroker| Max number of connections to open for each broker in a remote cluster More connections host-to-host lead to better throughput over high-latency links. |16| |replicationProducerQueueSize| Replicator producer queue size |1000| |replicatorPrefix| Replicator prefix used for replicator producer name and cursor name pulsar.repl|| |replicationTlsEnabled| Enable TLS when talking with other clusters to replicate messages |false| |brokerServicePurgeInactiveFrequencyInSeconds|Deprecated. Use brokerDeleteInactiveTopicsFrequencySeconds
.|60| |transactionCoordinatorEnabled|Whether to enable transaction coordinator in broker.|true| |transactionMetadataStoreProviderClassName| |org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider| |defaultRetentionTimeInMinutes| Default message retention time |0| |defaultRetentionSizeInMB| Default retention size |0| |keepAliveIntervalSeconds| How often to check whether the connections are still alive |30| |bootstrapNamespaces| The bootstrap name. | N/A | |loadManagerClassName| Name of load manager to use |org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl| |supportedNamespaceBundleSplitAlgorithms| Supported algorithms name for namespace bundle split |[range_equally_divide,topic_count_equally_divide]| |defaultNamespaceBundleSplitAlgorithm| Default algorithm name for namespace bundle split |range_equally_divide| |managedLedgerOffloadDriver| The directory for all the offloader implementations offloadersDirectory=./offloaders
. Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage). When using google-cloud-storage, Make sure both Google Cloud Storage and Google Cloud Storage JSON API are enabled for the project (check from Developers Console -> Api&auth -> APIs). || |managedLedgerOffloadMaxThreads| Maximum number of thread pool threads for ledger offloading |2| |managedLedgerOffloadPrefetchRounds|The maximum prefetch rounds for ledger reading for offloading.|1| |managedLedgerUnackedRangesOpenCacheSetEnabled| Use Open Range-Set to cache unacknowledged messages |true| |managedLedgerOffloadDeletionLagMs|Delay between a ledger being successfully offloaded to long term storage and the ledger being deleted from bookkeeper | 14400000| |managedLedgerOffloadAutoTriggerSizeThresholdBytes|The number of bytes before triggering automatic offload to long term storage |-1 (disabled)| |s3ManagedLedgerOffloadRegion| For Amazon S3 ledger offload, AWS region || |s3ManagedLedgerOffloadBucket| For Amazon S3 ledger offload, Bucket to place offloaded ledger into || |s3ManagedLedgerOffloadServiceEndpoint| For Amazon S3 ledger offload, Alternative endpoint to connect to (useful for testing) || |s3ManagedLedgerOffloadMaxBlockSizeInBytes| For Amazon S3 ledger offload, Max block size in bytes. (64MB by default, 5MB minimum) |67108864| |s3ManagedLedgerOffloadReadBufferSizeInBytes| For Amazon S3 ledger offload, Read buffer size in bytes (1MB by default) |1048576| |gcsManagedLedgerOffloadRegion|For Google Cloud Storage ledger offload, region where offload bucket is located. Go to this page for more details: https://cloud.google.com/storage/docs/bucket-locations .|N/A| |gcsManagedLedgerOffloadBucket|For Google Cloud Storage ledger offload, Bucket to place offloaded ledger into.|N/A| |gcsManagedLedgerOffloadMaxBlockSizeInBytes|For Google Cloud Storage ledger offload, the maximum block size in bytes. (64MB by default, 5MB minimum)|67108864| |gcsManagedLedgerOffloadReadBufferSizeInBytes|For Google Cloud Storage ledger offload, Read buffer size in bytes. (1MB by default)|1048576| |gcsManagedLedgerOffloadServiceAccountKeyFile|For Google Cloud Storage, path to json file containing service account credentials. For more details, see the “Service Accounts” section of .|N/A| |fileSystemProfilePath|For File System Storage, file system profile path.|../conf/filesystem_offload_core_site.xml| |fileSystemURI|For File System Storage, file system uri.|N/A| |s3ManagedLedgerOffloadRole| For Amazon S3 ledger offload, provide a role to assume before writing to s3 || |s3ManagedLedgerOffloadRoleSessionName| For Amazon S3 ledger offload, provide a role session name when using a role |pulsar-s3-offload| | acknowledgmentAtBatchIndexLevelEnabled | Enable or disable the batch index acknowledgement. | false | |enableReplicatedSubscriptions|Whether to enable tracking of replicated subscriptions state across clusters.|true| |replicatedSubscriptionsSnapshotFrequencyMillis|The frequency of snapshots for replicated subscriptions tracking.|1000| |replicatedSubscriptionsSnapshotTimeoutSeconds|The timeout for building a consistent snapshot for tracking replicated subscriptions state.|30| |replicatedSubscriptionsSnapshotMaxCachedPerSubscription|The maximum number of snapshot to be cached per subscription.|10| |maxMessagePublishBufferSizeInMB|The maximum memory size for a broker to handle messages that are sent by producers. If the processing message size exceeds this value, the broker stops reading data from the connection. The processing messages refer to the messages that are sent to the broker but the broker has not sent response to the client. Usually the messages are waiting to be written to bookies. It is shared across all the topics running in the same broker. The value -1
disables the memory limitation. By default, it is 50% of direct memory.|N/A| |messagePublishBufferCheckIntervalInMillis|Interval between checks to see if message publish buffer size exceeds the maximum. Use 0
or negative number to disable the max publish buffer limiting.|100| |retentionCheckIntervalInSeconds|Check between intervals to see if consumed ledgers need to be trimmed. Use 0 or negative number to disable the check.|120| | maxMessageSize | Set the maximum size of a message. | 5242880 | | preciseTopicPublishRateLimiterEnable | Enable precise topic publish rate limiting. | false | | lazyCursorRecovery | Whether to recover cursors lazily when trying to recover a managed ledger backing a persistent topic. It can improve write availability of topics. The caveat is now when recovered ledger is ready to write we’re not sure if all old consumers’ last mark delete position(ack position) can be recovered or not. So user can make the trade off or have custom logic in application to checkpoint consumer state.| false |
|haProxyProtocolEnabled | Enable or disable the HAProxy protocol. |false| | maxTopicsPerNamespace | The maximum number of persistent topics that can be created in the namespace. When the number of topics reaches this threshold, the broker rejects the request of creating a new topic, including the auto-created topics by the producer or consumer, until the number of connected consumers decreases. The default value 0 disables the check. | 0 | |subscriptionTypesEnabled| Enable all subscription types, which are exclusive, shared, failover, and key_shared. | Exclusive, Shared, Failover, Key_Shared |
Client
You can use the pulsar-client
CLI tool to publish messages to and consume messages from Pulsar topics. You can use this tool in place of a client library.
配置项 | 说明 | 默认值 |
---|---|---|
webServiceUrl | 群集的 web URL。 | |
brokerServiceUrl | 集群的Pulsar 协议地址。 | pulsar://localhost:6650/ |
authPlugin | 身份认证插件。 | |
authParams | 群集的身份认证参数, 逗号分隔的字符串。 | |
useTls | Whether to enforce the TLS authentication in the cluster. | false |
tlsAllowInsecureConnection | Allow TLS connections to servers whose certificate cannot be verified to have been signed by a trusted certificate authority. | false |
tlsEnableHostnameVerification | Whether the server hostname must match the common name of the certificate that is used by the server. | false |
tlsTrustCertsFilePath | ||
useKeyStoreTls | Enable TLS with KeyStore type configuration in the broker. | false |
tlsTrustStoreType | TLS TrustStore type configuration. | JKS |
tlsTrustStore | TLS TrustStore path. | |
tlsTrustStorePassword | TLS TrustStore password. |
Log4j
配置项 | 默认值 |
---|---|
pulsar.root.logger | WARN,CONSOLE |
pulsar.log.dir | logs |
pulsar.log.file | pulsar.log |
log4j.rootLogger | ${pulsar.root.logger} |
log4j.appender.CONSOLE | org.apache.log4j.ConsoleAppender |
log4j.appender.CONSOLE.Threshold | DEBUG |
log4j.appender.CONSOLE.layout | org.apache.log4j.PatternLayout |
log4j.appender.CONSOLE.layout.ConversionPattern | %d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n |
log4j.appender.ROLLINGFILE | org.apache.log4j.DailyRollingFileAppender |
log4j.appender.ROLLINGFILE.Threshold | DEBUG |
log4j.appender.ROLLINGFILE.File | ${pulsar.log.dir}/${pulsar.log.file} |
log4j.appender.ROLLINGFILE.layout | org.apache.log4j.PatternLayout |
log4j.appender.ROLLINGFILE.layout.ConversionPattern | %d{ISO8601} - %-5p [%t:%C{1}@%L] - %m%n |
log4j.appender.TRACEFILE | org.apache.log4j.FileAppender |
log4j.appender.TRACEFILE.Threshold | TRACE |
log4j.appender.TRACEFILE.File | pulsar-trace.log |
log4j.appender.TRACEFILE.layout | org.apache.log4j.PatternLayout |
log4j.appender.TRACEFILE.layout.ConversionPattern | %d{ISO8601} - %-5p [%t:%C{1}@%L][%x] - %m%n |
Log4j shell
配置项 | 默认值 |
---|---|
bookkeeper.root.logger | ERROR,CONSOLE |
log4j.rootLogger | ${bookkeeper.root.logger} |
log4j.appender.CONSOLE | org.apache.log4j.ConsoleAppender |
log4j.appender.CONSOLE.Threshold | DEBUG |
log4j.appender.CONSOLE.layout | org.apache.log4j.PatternLayout |
log4j.appender.CONSOLE.layout.ConversionPattern | %d{ABSOLUTE} %-5p %m%n |
log4j.logger.org.apache.zookeeper | ERROR |
log4j.logger.org.apache.bookkeeper | ERROR |
log4j.logger.org.apache.bookkeeper.bookie.BookieShell | INFO |
WebSocket
配置项 | 说明 | 默认值 |
---|---|---|
configurationStoreServers | ||
zooKeeperSessionTimeoutMillis | 30000 | |
zooKeeperCacheExpirySeconds | ZooKeeper 缓存过期时间(秒) | 300 |
serviceUrl | ||
serviceUrlTls | ||
brokerServiceUrl | ||
brokerServiceUrlTls | ||
webServicePort | 8080 | |
webServicePortTls | 8443 | |
bindAddress | 0.0.0.0 | |
clusterName | ||
authenticationEnabled | false | |
authenticationProviders | ||
authorizationEnabled | false | |
superUserRoles | ||
brokerClientAuthenticationPlugin | ||
brokerClientAuthenticationParameters | ||
tlsEnabled | false | |
tlsAllowInsecureConnection | false | |
tlsCertificateFilePath | ||
tlsKeyFilePath | ||
tlsTrustCertsFilePath |
Pulsar proxy
The can be configured in the conf/proxy.conf
file.
配置项 | 说明 | 默认值 |
---|---|---|
forwardAuthorizationCredentials | Forward client authorization credentials to Broker for re-authorization, and make sure authentication is enabled for this to take effect. | false |
zookeeperServers | ZooKeeper quorum 连接字符串(以逗号分隔的列表) | |
configurationStoreServers | 配置存储连接字符串(以逗号分隔的列表) | |
brokerServiceURL | The service URL pointing to the broker cluster. | |
brokerServiceURLTLS | The TLS service URL pointing to the broker cluster | |
brokerWebServiceURL | The Web service URL pointing to the broker cluster | |
brokerWebServiceURLTLS | The TLS Web service URL pointing to the broker cluster | |
functionWorkerWebServiceURL | The Web service URL pointing to the function worker cluster. It is only configured when you setup function workers in a separate cluster. | |
functionWorkerWebServiceURLTLS | The TLS Web service URL pointing to the function worker cluster. It is only configured when you setup function workers in a separate cluster. | |
zookeeperSessionTimeoutMs | ZooKeeper会话超时(以毫秒为单位) | 30000 |
zooKeeperCacheExpirySeconds | ZooKeeper 缓存过期时间(秒) | 300 |
advertisedAddress | 服务对外发布的主机名或 IP 地址。 If not set, the value of InetAddress.getLocalHost().getHostname() is used. | N/A |
servicePort | 用于服务器二进制Protobuf请求的端口 | 6650 |
servicePortTls | 用于服务器二进制Protobuf TLS请求的端口 | 6651 |
statusFilePath | 在响应服务发现健康检查时,用于确定代理实例的轮换状态的文件的路径 | |
proxyLogLevel | Proxy log level | 0 |
authenticationEnabled | 是否为Pulsar代理启用身份验证 | false |
authenticateMetricsEndpoint | Whether the ‘/metrics’ endpoint requires authentication. Defaults to true. ‘authenticationEnabled’ must also be set for this to take effect. | true |
authenticationProviders | 身份验证提供者名称列表(以逗号分隔的类名列表) | |
authorizationEnabled | 是否由Pulsar代理强制执行授权 | false |
authorizationProvider | 授权提供程序的完全限定类名 | org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider |
anonymousUserRole | When this parameter is not empty, unauthenticated users perform as anonymousUserRole. | |
brokerClientAuthenticationPlugin | Pulsar代理使用的身份验证插件,用于对Pulsar brokers进行身份验证 | |
brokerClientAuthenticationParameters | Pulsar代理用于对Pulsar Brokers进行身份验证的参数 | |
brokerClientTrustCertsFilePath | Pulsar代理用于对Pulsar Brokers进行身份验证的可信证书的路径 | |
superUserRoles | “超级用户”的角色名,这意味着它们将能够执行所有管理 | |
maxConcurrentInboundConnections | Max concurrent inbound connections. The proxy will reject requests beyond that. | 10000 |
maxConcurrentLookupRequests | Max concurrent outbound connections. The proxy will error out requests beyond that. | 50000 |
tlsEnabledInProxy | Deprecated - use servicePortTls and webServicePortTls instead. | false |
tlsEnabledWithBroker | Whether TLS is enabled when communicating with Pulsar brokers. | false |
tlsCertRefreshCheckDurationSec | TLS certificate refresh duration in seconds. If the value is set 0, check TLS certificate every new connection. | 300 |
tlsCertificateFilePath | TLS证书文件的路径 | |
tlsKeyFilePath | TLS私钥文件的路径 | |
tlsTrustCertsFilePath | 受信任的TLS证书pem文件的路径 | |
tlsHostnameVerificationEnabled | 当代理与brokers建立TLS连接时是否验证主机名 | false |
tlsRequireTrustedClientCertOnConnect | Whether client certificates are required for TLS. Connections are rejected if the client certificate isn’t trusted. | false |
tlsProtocols | Specify the tls protocols the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- TLSv1.3 , TLSv1.2 | |
tlsCiphers | Specify the tls cipher the broker will use to negotiate during TLS Handshake. Multiple values can be specified, separated by commas. Example:- TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 | |
httpReverseProxyConfigs | HTTP directs to redirect to non-pulsar services | |
httpOutputBufferSize | HTTP output buffer size. The amount of data that will be buffered for HTTP requests before it is flushed to the channel. A larger buffer size may result in higher HTTP throughput though it may take longer for the client to see data. If using HTTP streaming via the reverse proxy, this should be set to the minimum value (1) so that clients see the data as soon as possible. | 32768 |
httpNumThreads | Number of threads to use for HTTP requests processing | 2 * Runtime.getRuntime().availableProcessors() |
tokenSecretKey | Configure the secret key to be used to validate auth tokens. The key can be specified like: tokenSecretKey=data:;base64,xxxxxxxxx or tokenSecretKey=file:///my/secret.key . Note: key file must be DER-encoded. | |
tokenPublicKey | Configure the public key to be used to validate auth tokens. The key can be specified like: tokenPublicKey=data:;base64,xxxxxxxxx or tokenPublicKey=file:///my/secret.key . Note: key file must be DER-encoded. | |
tokenAuthClaim | Specify the token claim that will be used as the authentication “principal” or “role”. The “subject” field will be used if this is left blank | |
tokenAudienceClaim | The token audience “claim” name, e.g. “aud”. It is used to get the audience from token. If it is not set, the audience is not verified. | |
tokenAudience | The token audience stands for this broker. The field tokenAudienceClaim of a valid token need contains this parameter. | |
haProxyProtocolEnabled | Enable or disable the HAProxy protocol. | false |
ZooKeeper handles a broad range of essential configuration- and coordination-related tasks for Pulsar. The default configuration file for ZooKeeper is in the conf/zookeeper.conf
file in your Pulsar installation. The following parameters are available:
In addition to the parameters in the table above, configuring ZooKeeper for Pulsar involves adding a server.N
line to the conf/zookeeper.conf
file for each node in the ZooKeeper cluster, where N
is the number of the ZooKeeper node. Here’s an example for a three-node ZooKeeper cluster:
server.1=zk1.us-west.example.com:2888:3888
server.2=zk2.us-west.example.com:2888:3888