最佳实践

    如果不做流控,很可能产生很高的QPS峰值写,对Pegasus系统造成较大压力:

    • 写QPS太大,会影响读性能,造成读操作延迟上升;
    • 写QPS太大,可能会造成集群无法承受压力而停止服务;因此,强烈建议业务方在灌数据的时候对写QPS进行流量控制。

    客户端流控的思路就是:

    • 首先定好总的QPS限制是多少(譬如10000/s),有多少个并发的客户端访问线程(譬如50个),然后计算出每个线程的QPS限制(譬如10000/50=200)。
    • 对于单个客户端线程,通过流控工具将QPS限制在期望的范围内。如果超过了QPS限制,就采用简单的sleep方式来等待。我们提供了一个流控工具类com.xiaomi.infra.pegasus.tools.FlowController,把计算QPS和执行sleep的逻辑封装起来,方便用户使用。FlowController用法:

    • 构造函数接受一个QPS参数,用于指定流量限制,譬如单线程QPS只允许200/s,就传入200;

    • 用户在每次需要执行写操作之前调用cntl.getToken()方法,该方法产生两种可能:
      • 如果当前未达到流量控制,则无阻塞直接返回,继续执行后面的写操作;
      • 如果当前已经达到流量限制,则该方法会阻塞(sleep)一段时间才返回,以达到控制流量的效果。
    • 该工具尽量配合同步接口使用,对于异步接口可能效果没那么好。使用方法很简单:

    在分布式灌数据的场景下,用户可以先确定分布式的Task并发数,然后通过,得到单个Task的QPS限制,再使用FlowController进行控制。

    分页查询

    有时候业务需要分页查询功能,类似实现前端的分页功能。典型地,一个HashKey下有很多SortKey,一页只显示固定数量的SortKey,下一页时再显示接下来的固定数量的SortKey。

    分页查询在Pegasus下有多种实现方式:

    • 一次性获取HaskKey下的全部数据,在业务端缓存下来,由业务端自己实现分页逻辑。
    • 实现顺序分页,可以使用multiGet()和方法,这两者都支持SortKey的范围查询:
      • 查第一页:
        • startSortKey = null
        • startInclusive = true
        • stopSortKey = null
        • stopInclusive = false
        • maxFetchCount = countPerPage
      • 通过判断返回值(同步接口)或者allFetched(异步接口)可以知道是否还有更多数据。如果还有更多数据,则查下一页。
      • 查下一页:记录当前页的最大SortKey(假设为maxSortKey),通过MultiGetOptions或者ScanOptions指定:
        • startInclusive = false
        • stopSortKey = null
        • stopInclusive = false
        • maxFetchCount = countPerPage
    • 实现逆序分页,请使用multiGet()方法,其支持SortKey的逆序查询:
      • 查第一页:
        • startSortKey = null
        • startInclusive = true
        • stopSortKey = null
        • stopInclusive = false
        • maxFetchCount = countPerPage
        • reverse = true
      • 通过判断返回值(同步接口)或者allFetched(异步接口)可以知道是否还有更多数据。如果还有更多数据,则查下一页。
      • 查下一页:记录当前页的最小SortKey(假设为minSortKey)。如果还有更多数据,则获取下一页数据,可以在MultiGetOptions中指定:
        • startSortKey = null
        • startInclusive = true
        • stopSortKey = minSortKey
        • stopInclusive = false
        • maxFetchCount = countPerPage
        • reverse = true

    通常序列化有这些方式:

    • json:好处是数据可读性好;坏处是比较占空间。不推荐。
    • thrift:提供了多种Compact协议,常见的有binary协议。但是推荐用tcompact协议,因为这种协议的压缩率更高。
    • protobuf:与thrift类似,推荐序列化为binary格式。对于Thrift结构,使用tcompact协议进行序列化的样例:
    1. import org.apache.thrift.TSerializer;
    2. import org.apache.thrift.protocol.TCompactProtocol;
    3. TSerializer serializer = new TSerializer(new TCompactProtocol.Factory());
    4. byte[] bytes = serializer.serialize(data);

    数据压缩

    对于value较大(>=2kb)的业务,我们推荐在客户端使用facebook/Zstandard压缩算法(简称 Zstd)对数据进行压缩,以减少value的数据长度,提升Pegasus的服务稳定性和读写性能。Zstd算法在压缩比和压缩速率上取得较好的平衡,适合通用场景。

    从版本开始,我们提供了Zstd压缩工具类com.xiaomi.infra.pegasus.tools.ZstdWrapper,方便用户实现压缩功能。

    使用示例:

    也可以参考测试用例代码 。

    以上两个优化 数据序列化 和 可以在客户端同时使用,都是用客户端的CPU换取Pegasus集群的稳定性和读写性能。在通常情况下这都是值得的。

    有时候,业务方在开始使用Pegasus的时候,没有采用客户端压缩,但是在使用一段时间后,发现单条数据的value比较大,希望能通过压缩的办法改进性能。可以分两步:

    • 评估压缩收益:评估通过客户端压缩是否能够获得足够好的压缩率。
    • :升级业务端使用Pegasus Java客户端的逻辑,增加客户端压缩支持,同时兼容原来未压缩的数据。

    原料:

    • 业务集群:user_cluster,meta配置地址为${user_cluster_meta_list},其中用户表为user_table。
    • 测试集群:test_cluster,meta配置地址为${test_cluster_meta_list}
    • Shell工具:使用1.11.3及以上版本;修改配置文件,添加访问test_cluster集群的配置项。
    • :使用1.11.4及以上版本;修改配置文件pegasus.properties,设置meta_servers = ${test_cluster_meta_list}。步骤:

    • 使用Shell工具的create命令,在test_cluster集群中新建测试表user_table_no_compress和user_table_zstd_compress:

    1. >>> create user_table_no_compress -p 8 -r 3
    2. >>> create user_table_zstd_compress -p 8 -r 3
    • 使用Shell工具的copy_data命令,将业务集群的user_table表的部分数据复制到测试集群的user_table_no_compress表中(在复制足够条数后通过Ctrl-C中断执行):
    • 使用Java客户端工具的copy_data命令,将测试集群user_table_no_compress表的数据复制到user_table_zstd_compress表中,并设置数据写出时采用zstd压缩:
    1. copy_data file://./pegasus.properties user_table_zstd_compress none zstd
    • 使用Shell工具的count_data命令,分别统计两个测试表的数据大小,然后计算压缩率:

    使用兼容性压缩

    业务表原来已经有未压缩的数据,如果应用了客户端压缩,写入新的已压缩的数据,但是hashKey和sortKey保持不变,就会出现未压缩数据和已压缩数据混合存在的情况:有的value存储的是未压缩的数据,有的value存储的是已压缩的数据。

    这就要求业务端在读数据的时候保证兼容性:既能读取未压缩的数据,又能读取已压缩的数据。

    基于未压缩的数据采用zstd进行解压缩时基本都会失败这一事实,业务端读取的逻辑可以这样:

    • 首先,尝试将客户端读到的value数据进行解压缩,如果成功,则说明是已压缩的数据。
    • 如果上一步解压缩失败,则说明读到的是未压缩的数据,不需要解压。示例代码:
    1. // decompress the value
    2. byte[] decompressedValue = null;
    3. try {
    4. decompressedValue = ZstdWrapper.decompress(value);
    5. } catch (PException e) {
    6. // decompress fail
    7. }

    与此同时,可以使用后台工具将未压缩数据逐渐替换掉为已压缩数据,并在替换过程中保证数据的一致性:扫描表,逐条读取数据,如果数据是未压缩的,则将其转换为已压缩的,使用check_and_set原子操作进行数据替换。