通过订阅(Subscription)从 HStreamDB 消费数据

    一个 stream 可以有多个订阅,但一个给定的订阅只属于一个 stream。同样地,一个订阅 对应一个具有多个消费者的 consumer group,但每个消费者只属于一个订阅。

    请参考,了解关于创建和管理订阅的详细信息。

    如何用一个订阅来消费数据

    为了消费写入 stream 中的数据,HStreamDB 客户端库提供了异步 Consumer API,它将发 起请求加入指定订阅的 consumer group。

    正如我们所介绍的,在 HStreamDB 中有两种 Record 类型,HRecord 和 Raw Record。当启 动一个消费者时,需要相应的 Receiver。在只设置了 HRecord Receiver 的情况下,当消 费者收到一条 raw record 时,消费者将忽略它并消费下一条 record。因此,原则上,我 们不建议在同一个 stream 中同时写入 HRecord 和 raw record。然而,这并没有在实现的 层面上严格禁止,用户仍然可以提供两种 receiver 来同时处理两种类型的 record。

    1. // ExampleConsumer.go
    2. package examples
    3. import (
    4. "github.com/hstreamdb/hstreamdb-go/hstream"
    5. "log"
    6. "time"
    7. )
    8. func ExampleConsumer() error {
    9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
    10. if err != nil {
    11. log.Fatalf("Creating client error: %s", err)
    12. }
    13. defer client.Close()
    14. subId := "SubscriptionId0"
    15. consumer := client.NewConsumer("consumer-1", subId)
    16. defer consumer.Stop()
    17. dataChan := consumer.StartFetch()
    18. timer := time.NewTimer(3 * time.Second)
    19. defer timer.Stop()
    20. for {
    21. select {
    22. case <-timer.C:
    23. log.Println("[consumer]: Streaming fetch stopped")
    24. return nil
    25. case recordMsg := <-dataChan:
    26. if recordMsg.Err != nil {
    27. log.Printf("[consumer]: Streaming fetch error: %s", err)
    28. continue
    29. }
    30. log.Printf("[consumer]: Receive %s record: record id = %s, payload = %+v",
    31. record.GetRecordType(), record.GetRecordId().String(), record.GetPayload())
    32. record.Ack()
    33. }
    34. }
    35. }
    36. return nil
    37. }

    For better performance, Batched Ack is enabled by default with setting ackBufferSize = 100 and ackAgeLimit = 100, which you can change when initiating your consumers.

    1. Consumer consumer =
    2. client
    3. .newConsumer()
    4. .subscription("you_subscription_id")
    5. .name("your_consumer_name")
    6. .hRecordReceiver(your_receiver)
    7. // When ack() is called, the consumer will not send it to servers immediately,
    8. // the ack request will be buffered until the ack count reaches ackBufferSize
    9. // or the consumer is stopping or reached ackAgelimit
    10. .ackBufferSize(100)
    11. .ackAgeLimit(100)
    12. .build();

    为了获得更好的性能,默认情况下启用了 Batched Ack,和 ackBufferSize = 100 和 ackAgeLimit = 100 的设置,你可以在启动你的消费者时更新它。

    多个消费者和共享订阅

    如先前提到的,在 HStream 中,一个订阅是对应了一个 consumer group 消费的。在这个 consumer group 中,可能会有多个消费者,并且他们共享订阅的进度。当想要提高从订阅 中消费数据的速度时,我们可以让一个新的消费者加入现有的订阅。这段代码是用来演示新 的消费者是如何加入 consumer group 的。更常见的情况是,用户使用来自不同客户端的消 费者去共同消费一个订阅。

    1. // ConsumeDataSharedExample.java
    2. package docs.code.examples;
    3. import static java.util.concurrent.TimeUnit.SECONDS;
    4. import io.hstream.Consumer;
    5. import io.hstream.HRecordReceiver;
    6. import io.hstream.HStreamClient;
    7. import java.util.concurrent.TimeoutException;
    8. public class ConsumeDataSharedExample {
    9. public static void main(String[] args) throws Exception {
    10. String serviceUrl = "127.0.0.1:6570";
    11. if (System.getenv("serviceUrl") != null) {
    12. serviceUrl = System.getenv("serviceUrl");
    13. }
    14. String subscription = "your_subscription_id";
    15. String consumer1 = "your_consumer1_name";
    16. String consumer2 = "your_consumer2-name";
    17. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
    18. Thread t1 =
    19. new Thread(() -> consumeDataFromSubscriptionSharedExample(client, subscription, consumer1));
    20. Thread t2 =
    21. t1.start();
    22. t2.start();
    23. t1.join();
    24. t2.join();
    25. client.close();
    26. }
    27. public static void consumeDataFromSubscriptionSharedExample(
    28. HStreamClient client, String subscription, String consumerName) {
    29. HRecordReceiver receiver =
    30. ((hRecord, responder) -> {
    31. System.out.println("Received a record :" + hRecord.getHRecord());
    32. responder.ack();
    33. });
    34. Consumer consumer =
    35. client
    36. .newConsumer()
    37. .subscription(subscription)
    38. .name(consumerName)
    39. .hRecordReceiver(receiver)
    40. .build();
    41. try {
    42. // sleep 5s for consuming records
    43. consumer.startAsync().awaitRunning();
    44. consumer.awaitTerminated(5, SECONDS);
    45. } catch (TimeoutException e) {
    46. // stop consumer
    47. consumer.stopAsync().awaitTerminated();
    48. }
    49. }
    50. }

    一个常发生的状况是,消费者处理和确认数据的速度很可能跟不上服务器发送的速度,或者 一些意外的问题导致消费者无法确认收到的数据,这可能会导致以下问题:

    为了缓解上述问题,使用订阅的 maxUnackedRecords 设置来控制消费者接收消息时允许 的未确认 records 的最大数量。一旦数量超过 maxUnackedRecords,服务器将停止向当 前订阅的消费者们发送消息。

    按顺序接收消息

    注意:下面描述的接收顺序只针对单个消费者。如果一个订阅有多个消费者,在每个消费者 中仍然可以保证顺序,但如果我们把 consumer group 看成一个整体,那么顺序性就不再保 证了。

    消费者将按照 HStream 服务器收到信息的顺序接收具有相同分区键的 record。由于 HStream 以至少一次的语义发送 hstream record,在某些情况下,当 HServer 可能没有收 到中间某些 record 的 ack 时,它将可能多次发送这条 record。而在这些情况下,我们也 不能保证顺序。

    当消费者正在运行时,如果 receiver 失败了,默认的行为是消费者会将将捕获异常,打印 错误日志,并继续消费下一条记录而不是导致消费者也失败。

    1. // add Listener for handling failed consumer
    2. var threadPool = new ScheduledThreadPoolExecutor(1);
    3. consumer.addListener(
    4. new Service.Listener() {
    5. public void failed(Service.State from, Throwable failure) {
    6. System.out.println("consumer failed, with error: " + failure.getMessage());
    7. }
    8. },
    9. threadPool);