Pulsar WebSocket API

推荐使用单机模式的 Pulsar 进行开发,在环境中启用 WebSocket 服务。

在非单机模式下,有两种方法可以部署 WebSocket 服务:

  • 嵌入 Pulsar Broker
  • 作为一个的组件

在这种模式下,WebSocket 服务会使用已经在 broker 中运行的 HTTP 服务。 要启用此模式,需在安装目录下的 conf/broker.conf 文件中设置 参数。

作为一个独立的组件

在这种模式下,WebSocket 会作为单独的服务在 Pulsar 上运行。 运行此模式,需在 conf/websocket.conf 文件中进行配置。 You’ll need to set at least the following parameters:

下面是一个示例:

  1. webServicePort=8080
  2. clusterName=my-cluster

安全设置

在 WebSocket 服务上启用 TLS 加密:

  1. tlsEnabled=true
  2. tlsCertificateFilePath=/path/to/broker.cert.pem
  3. tlsKeyFilePath=/path/to/broker.key-pk8.pem
  4. tlsTrustCertsFilePath=/path/to/ca.cert.pem

配置完成后,你可以使用 命令来启动服务:

  1. $ bin/pulsar-daemon start websocket

Pulsar 的 WebSocket API 提供三个端点,用于生产消息、和阅读消息

所有通过 WebSocket API 的数据都使用 JSON 进行交互。

认证

Browser javascript WebSocket client

使用查询参数 token 传送身份验证令牌。

  1. ws://broker-service-url:8080/path?token=token

Producer 端

Producer 端需要在 URL 中指定租户、命名空间和 topic,例如:

  1. ws://broker-service-url:8080/ws/v2/producer/persistent/:tenant/:namespace/:topic
查询参数

发布消息

  1. {
  2. "payload": "SGVsbG8gV29ybGQ=",
  3. "properties": {"key1": "value1", "key2": "value2"},
  4. "context": "1"
  5. }
Key类型是否必需说明
payloadstringBase-64 编码的负载
properties键值对应用程序定义的属性
contextstring应用程序定义的请求标识符
keystring分区 topic 中使用的分区
replicationClusters数组根据名称允许添加到集群列表的副本
响应成功示例
  1. {
  2. "result": "ok",
  3. "messageId": "CAAQAw==",
  4. "context": "1"
  5. }
响应失败示例
  1. {
  2. "result": "send-error:3",
  3. "errorMsg": "Failed to de-serialize from JSON",
  4. "context": "1"
  5. }
Key类型是否必需说明
resultstring发送成功则为 ok,否则抛出异常
messageIdstring已发布消息的 Message ID
contextstring应用程序定义的请求标识符

Concumer 端要求在 URL 中指定租户、命名空间、topic 和订阅:

查询参数
Key类型是否必需说明
ackTimeoutMillislong设置未完成消息确认的超时时间(默认值:0)
subscriptionTypestring订阅类型独占灾备共享key共享
receiverQueueSizeintConsumer 接收队列的大小(默认:1000)
consumerNamestringConsumer 的名称
priorityLevelint指定 consumer 的
maxRedeliverCountint为 consumer 指定 maxRedeliverCount(默认值:0)。 启用 。
deadLetterTopicstring为 consumer 指定 deadLetterTopic(默认值:{topic}-{subscription}-DLQ)。 启用 。
pullModebooleanEnable pull mode (default: false). See “Flow Control” below.
negativeAckRedeliveryDelayintWhen a message is negatively acknowledged, it will be redelivered to the DLQ.
tokenstring身份验证令牌,这用于浏览器的 javascript 客户端
接收消息

Server will push messages on the WebSocket session:

  1. {
  2. "messageId": "CAMQADAA",
  3. "payload": "hvXcJvHW7kOSrUn17P2q71RA5SdiXwZBqw==",
  4. "properties": {},
  5. "publishTime": "2021-10-29T16:01:38.967-07:00",
  6. "redeliveryCount": 0,
  7. "encryptionContext": {
  8. "keys": {
  9. "client-rsa.pem": {
  10. "keyValue": "jEuwS+PeUzmCo7IfLNxqoj4h7txbLjCQjkwpaw5AWJfZ2xoIdMkOuWDkOsqgFmWwxiecakS6GOZHs94x3sxzKHQx9Oe1jpwBg2e7L4fd26pp+WmAiLm/ArZJo6JotTeFSvKO3u/yQtGTZojDDQxiqFOQ1ZbMdtMZA8DpSMuq+Zx7PqLo43UdW1+krjQfE5WD+y+qE3LJQfwyVDnXxoRtqWLpVsAROlN2LxaMbaftv5HckoejJoB4xpf/dPOUqhnRstwQHf6klKT5iNhjsY4usACt78uILT0pEPd14h8wEBidBz/vAlC/zVMEqiDVzgNS7dqEYS4iHbf7cnWVCn3Hxw==",
  11. "metadata": {}
  12. },
  13. "param": "Tfu1PxVm6S9D3+Hk",
  14. "compressionType": "NONE",
  15. "uncompressedMessageSize": 0,
  16. "batchSize": {
  17. "empty": false,
  18. "present": true
  19. }
  20. }

Below are the parameters in the WebSocket consumer response.

  • General parameters

    Key类型是否必需说明
    messageIdstring消息 ID
    payloadstringBase-64 编码的负载
    publishTimestring发布时间戳
    redeliveryCountnumber此消息已经发送的次数
    properties键值对应用程序定义的属性
    keystringProducer 设置的原始路由密钥
    EncryptionContextEncryption context that consumers can use to decrypt received messages
    paramstringInitialization vector for cipher (Base64 encoding)
    batchSizestringNumber of entries in a message (if it is a batch message)
    uncompressedMessageSizestringMessage size before compression
    compressionTypestringAlgorithm used to compress the message payload
  • encryptionContext related parameter

  • encryptionKey related parameters

    Key类型是否必需说明
    keyValuestringEncryption key (Base64 encoding)
    metadata键值对Application-defined metadata

ACK 确认消息

Consumer needs to acknowledge the successful processing of the message to have the Pulsar broker delete it.

  1. {
  2. "messageId": "CAAQAw=="
  3. }
Key类型是否必需说明
messageIdstring处理消息的消息ID

ACK 否认消息

  1. {
  2. "type": "negativeAcknowledge",
  3. "messageId": "CAAQAw=="
  4. }
Key类型是否必需说明
messageIdstring处理消息的消息ID

流量控制

推送模式

默认情况下(pullMode=false),consumer 端使用 receiverQueueSize 参数设置内部接收队列的大小,并限制传递到 WebSocket 客户端的未确认消息数。 在这种模式下,如果不发送消息确认,发送到 WebSocket 客户端的消息达到 receiverQueueSize时,Pulsar WebSocket 将停止发送消息。

拉取模式

如果设置 pullModetrue,则 WebSocket 客户端需要使用 permit 命令允许 Pulsar WebSocket 服务发送更多消息。

  1. {
  2. "type": "permit",
  3. "permitMessages": 100
  4. }
Key类型是否必需说明
typestringType of command. Must be permit
permitMessagesint允许的消息数量

注意:在这种模式下,可以在不同的连接中确认消息。

检查是否到达主题的结尾

消费者可以通过发送 isEndOfTopic 请求来检查它是否已经到达主题的结尾。

Request

  1. {
  2. "type": "isEndOfTopic"
  3. }

Response

  1. {
  2. "endOfTopic": "true/false"
  3. }

Reader 端

The reader endpoint requires you to specify a tenant, namespace, and topic in the URL:

  1. ws://broker-service-url:8080/ws/v2/reader/persistent/:tenant/:namespace/:topic
查询参数
Key类型是否必需说明
readerNamestringReader name
receiverQueueSizeintConsumer 接收队列的大小(默认:1000)
messageIdint or enumMessage ID to start from, earliest or latest (default: latest)
tokenstring身份验证令牌,这用于浏览器的 javascript 客户端
接收消息
  1. {
  2. "messageId": "CAAQAw==",
  3. "payload": "SGVsbG8gV29ybGQ=",
  4. "properties": {"key1": "value1", "key2": "value2"},
  5. "publishTime": "2016-08-30 16:45:57.785",
  6. "redeliveryCount": 4
  7. }
Key类型是否必需说明
messageIdstring消息 ID
payloadstringBase-64 编码的负载
publishTimestring发布时间戳
redeliveryCountnumber此消息已经发送的次数
properties键值对应用程序定义的属性
keystringProducer 设置的原始路由密钥

ACK 确认消息

In WebSocket, Reader needs to acknowledge the successful processing of the message to have the Pulsar WebSocket service update the number of pending messages. If you don’t send acknowledgements, Pulsar WebSocket service will stop sending messages after reaching the pendingMessages limit.

Key类型是否必需说明
messageIdstring处理消息的消息ID

检查是否到达主题的结尾

消费者可以通过发送 isEndOfTopic 请求来检查它是否已经到达主题的结尾。

Request

  1. {
  2. "type": "isEndOfTopic"
  3. }
Key类型是否必需说明
typestringType of command. Must be isEndOfTopic

Response

  1. {
  2. "endOfTopic": "true/false"
  3. }

错误代码

In case of error the server will close the WebSocket session using the following error codes:

应用程序负责在后台重新建立 WebSocket 连接。

Below you’ll find code examples for the Pulsar WebSocket API in and Node.js.

This example uses the package. You can install it using pip:

  1. $ pip install websocket-client

You can also download it from .

Python producer

Here’s an example Python producer that sends a simple message to a Pulsar :

  1. #!/usr/bin/python
  2. #coding=utf-8
  3. import websocket, base64, json
  4. # If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
  5. enable_TLS = False
  6. scheme = 'ws'
  7. if enable_TLS:
  8. scheme = 'wss'
  9. TOPIC = scheme + '://localhost:8080/ws/v2/producer/persistent/public/default/my-topic'
  10. ws = websocket.create_connection(TOPIC)
  11. # Send one message as JSON
  12. ws.send(json.dumps({
  13. 'payload' : base64.b64encode('Hello World'),
  14. 'properties': {
  15. 'key1' : 'value1',
  16. },
  17. 'context' : 5
  18. }))
  19. response = json.loads(ws.recv())
  20. if response['result'] == 'ok':
  21. print 'Message published successfully'
  22. else:
  23. print 'Failed to publish message:', response
  24. ws.close()

Python consumer

Here’s an example Python consumer that listens on a Pulsar topic and prints the message ID whenever a message arrives:

  1. #!/usr/bin/python
  2. #coding=utf-8
  3. import websocket, base64, json
  4. enable_TLS = False
  5. scheme = 'ws'
  6. if enable_TLS:
  7. scheme = 'wss'
  8. TOPIC = scheme + '://localhost:8080/ws/v2/consumer/persistent/public/default/my-topic/my-sub'
  9. ws = websocket.create_connection(TOPIC)
  10. while True:
  11. msg = json.loads(ws.recv())
  12. if not msg: break
  13. print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))
  14. # Acknowledge successful processing
  15. ws.send(json.dumps({'messageId' : msg['messageId']}))
  16. ws.close()

Python reader

Here’s an example Python reader that listens on a Pulsar topic and prints the message ID whenever a message arrives:

  1. #!/usr/bin/python
  2. #coding=utf-8
  3. import websocket, base64, json
  4. # If set enableTLS to true, your have to set tlsEnabled to true in conf/websocket.conf.
  5. enable_TLS = False
  6. scheme = 'ws'
  7. if enable_TLS:
  8. scheme = 'wss'
  9. TOPIC = scheme + '://localhost:8080/ws/v2/reader/persistent/public/default/my-topic'
  10. ws = websocket.create_connection(TOPIC)
  11. while True:
  12. msg = json.loads(ws.recv())
  13. if not msg: break
  14. print "Received: {} - payload: {}".format(msg, base64.b64decode(msg['payload']))
  15. # Acknowledge successful processing
  16. ws.send(json.dumps({'messageId' : msg['messageId']}))
  17. ws.close()

Node.js

This example uses the package. You can install it using npm:

  1. $ npm install ws

Node.js producer

  1. const WebSocket = require('ws');
  2. // 如果设置启用TLS 为 true,您必须设置 tls 启用为 conf/websocket.conf。
  3. const enableTLS = false;
  4. const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/producer/persistent/public/default/my-topic`;
  5. const ws = new WebSocket(topic);
  6. var message = {
  7. "payload" : new Buffer("Hello World").toString('base64'),
  8. "properties": {
  9. "key1" : "value1",
  10. "key2" : "value2"
  11. },
  12. "context" : "1"
  13. };
  14. ws.on('open', function() {
  15. // Send one message
  16. ws.send(JSON.stringify(message));
  17. });
  18. ws.on('message', function(message) {
  19. console.log('received ack: %s', message);
  20. });

Node.js consumer

Here’s an example Node.js consumer that listens on the same topic used by the producer above:

NodeJS reader

  1. const WebSocket = require('ws');
  2. // 如果设置启用TLS 为 true,您必须设置 tls 启用为 conf/websocket.conf。
  3. const enableTLS = false;
  4. const topic = `${enableTLS ? 'wss' : 'ws'}://localhost:8080/ws/v2/reader/persistent/public/default/my-topic`;
  5. const ws = new WebSocket(topic);
  6. ws.on('message', function(message) {
  7. var receiveMsg = JSON.parse(message);
  8. console.log('Received: %s - payload: %s', message, new Buffer(receiveMsg.payload, 'base64').toString());
  9. var ackMsg = {"messageId" : receiveMsg.messageId};
  10. });