通过订阅(Subscription)从 HStreamDB 消费数据
一个 stream 可以有多个订阅,但一个给定的订阅只属于一个 stream。同样地,一个订阅 对应一个具有多个消费者的 consumer group,但每个消费者只属于一个订阅。
请参考,了解关于创建和管理订阅的详细信息。
如何用一个订阅来消费数据
为了消费写入 stream 中的数据,HStreamDB 客户端库提供了异步 Consumer API,它将发 起请求加入指定订阅的 consumer group。
正如我们所介绍的,在 HStreamDB 中有两种 Record 类型,HRecord 和 Raw Record。当启 动一个消费者时,需要相应的 Receiver。在只设置了 HRecord Receiver 的情况下,当消 费者收到一条 raw record 时,消费者将忽略它并消费下一条 record。因此,原则上,我 们不建议在同一个 stream 中同时写入 HRecord 和 raw record。然而,这并没有在实现的 层面上严格禁止,用户仍然可以提供两种 receiver 来同时处理两种类型的 record。
// ExampleConsumer.go
package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
"time"
)
func ExampleConsumer() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
subId := "SubscriptionId0"
consumer := client.NewConsumer("consumer-1", subId)
defer consumer.Stop()
dataChan := consumer.StartFetch()
timer := time.NewTimer(3 * time.Second)
defer timer.Stop()
for {
select {
case <-timer.C:
log.Println("[consumer]: Streaming fetch stopped")
return nil
case recordMsg := <-dataChan:
if recordMsg.Err != nil {
log.Printf("[consumer]: Streaming fetch error: %s", err)
continue
}
log.Printf("[consumer]: Receive %s record: record id = %s, payload = %+v",
record.GetRecordType(), record.GetRecordId().String(), record.GetPayload())
record.Ack()
}
}
}
return nil
}
For better performance, Batched Ack is enabled by default with setting ackBufferSize
= 100 and ackAgeLimit
= 100, which you can change when initiating your consumers.
Consumer consumer =
client
.newConsumer()
.subscription("you_subscription_id")
.name("your_consumer_name")
.hRecordReceiver(your_receiver)
// When ack() is called, the consumer will not send it to servers immediately,
// the ack request will be buffered until the ack count reaches ackBufferSize
// or the consumer is stopping or reached ackAgelimit
.ackBufferSize(100)
.ackAgeLimit(100)
.build();
为了获得更好的性能,默认情况下启用了 Batched Ack,和 ackBufferSize = 100 和 ackAgeLimit = 100 的设置,你可以在启动你的消费者时更新它。
多个消费者和共享订阅
如先前提到的,在 HStream 中,一个订阅是对应了一个 consumer group 消费的。在这个 consumer group 中,可能会有多个消费者,并且他们共享订阅的进度。当想要提高从订阅 中消费数据的速度时,我们可以让一个新的消费者加入现有的订阅。这段代码是用来演示新 的消费者是如何加入 consumer group 的。更常见的情况是,用户使用来自不同客户端的消 费者去共同消费一个订阅。
// ConsumeDataSharedExample.java
package docs.code.examples;
import static java.util.concurrent.TimeUnit.SECONDS;
import io.hstream.Consumer;
import io.hstream.HRecordReceiver;
import io.hstream.HStreamClient;
import java.util.concurrent.TimeoutException;
public class ConsumeDataSharedExample {
public static void main(String[] args) throws Exception {
String serviceUrl = "127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String subscription = "your_subscription_id";
String consumer1 = "your_consumer1_name";
String consumer2 = "your_consumer2-name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
Thread t1 =
new Thread(() -> consumeDataFromSubscriptionSharedExample(client, subscription, consumer1));
Thread t2 =
t1.start();
t2.start();
t1.join();
t2.join();
client.close();
}
public static void consumeDataFromSubscriptionSharedExample(
HStreamClient client, String subscription, String consumerName) {
HRecordReceiver receiver =
((hRecord, responder) -> {
System.out.println("Received a record :" + hRecord.getHRecord());
responder.ack();
});
Consumer consumer =
client
.newConsumer()
.subscription(subscription)
.name(consumerName)
.hRecordReceiver(receiver)
.build();
try {
// sleep 5s for consuming records
consumer.startAsync().awaitRunning();
consumer.awaitTerminated(5, SECONDS);
} catch (TimeoutException e) {
// stop consumer
consumer.stopAsync().awaitTerminated();
}
}
}
一个常发生的状况是,消费者处理和确认数据的速度很可能跟不上服务器发送的速度,或者 一些意外的问题导致消费者无法确认收到的数据,这可能会导致以下问题:
为了缓解上述问题,使用订阅的 maxUnackedRecords
设置来控制消费者接收消息时允许 的未确认 records 的最大数量。一旦数量超过 maxUnackedRecords
,服务器将停止向当 前订阅的消费者们发送消息。
按顺序接收消息
注意:下面描述的接收顺序只针对单个消费者。如果一个订阅有多个消费者,在每个消费者 中仍然可以保证顺序,但如果我们把 consumer group 看成一个整体,那么顺序性就不再保 证了。
消费者将按照 HStream 服务器收到信息的顺序接收具有相同分区键的 record。由于 HStream 以至少一次的语义发送 hstream record,在某些情况下,当 HServer 可能没有收 到中间某些 record 的 ack 时,它将可能多次发送这条 record。而在这些情况下,我们也 不能保证顺序。
当消费者正在运行时,如果 receiver 失败了,默认的行为是消费者会将将捕获异常,打印 错误日志,并继续消费下一条记录而不是导致消费者也失败。
// add Listener for handling failed consumer
var threadPool = new ScheduledThreadPoolExecutor(1);
consumer.addListener(
new Service.Listener() {
public void failed(Service.State from, Throwable failure) {
System.out.println("consumer failed, with error: " + failure.getMessage());
}
},
threadPool);