Pulsar Clients
底层实现上,目前官方版的 Pulsar 客户端支持对 broker 的透明连接/重连、故障切换、未 ack 消息的缓冲、消息重传。
在应用程序创建生产者/消费者之前,Pulsar 客户端库需要启动一个设置阶段,包括两个步骤:
- 客户端将尝试通过向服务器(Broker)发送 HTTP 查找请求,来确定主题(Topic)所在的服务器(Broker)。 客户端通过查询 ZooKeeper 中 (缓存) 的元数据,来确定这条消息的 topic 在哪个 broker 上,如果该 topic 不在任何一个 broker 上,则把这个 topic 分配在负载最少的 broker 上。
每当 TCP 连接中断时, 客户端将立即重新启动上述步骤的设置阶段, 并将继续尝试使用指数退避重新建立生产者或消费者, 直到上述步骤执行成功为止。
Reader 接口
The reader interface for Pulsar enables applications to manually manage cursors. When you use a reader to connect to a topic—-rather than a consumer—-you need to specify which message the reader begins reading from when it connects to a topic. 当连接到一个 topic 时,reader 接口支持的开始位置包括:
- The earliest available message in the topic
- 在最早和最新之间的其他消息。 如果你选择此选项,则需要明确提供消息 ID。 你的应用程序将需要提前“知道”这个消息 ID,可能要从持久化存储或缓存中获取。
Reader 接口对流处理系统中,需要用到 effectively-once(仅仅一次) 语义的场景是很有帮助的。 Pulsar能够将主题的消息进行重放,并从重放后的位置开始读取消息,是满足流处理的场景的重要基础。 Reader 接口为 Pulsar 客户端在 Topic 内提供了一种能“手动管理起始位置”的底层抽象。
Reader 接口内部是作为一个使用独占、非持久化订阅的被随机命名的一个消费者来实现的。
[ IMPORTANT ]
Please also note that a reader can have a “backlog”, but the metric is only used for users to know how behind the reader is. The metric is not considered for any backlog quota calculations.
下面是一个Java语言实现的从主题上最早可用消息的位置开始消费的例子
创建一个从最新可用消息处开始读取消息的 reader: