这类场景常见于运营商网关数据和金融行业数据,产生量大,并且要求快速插入大数据库中持久化保存。另外, 用户如果需要流式实时处理,可以参考基于PostgreSQL的流式处理方案,一天处理1万亿的实时流式处理是如何实现的?PostgreSQL “物联网”应用 - 1 实时流式数据处理案例(万亿每天)

TEST CASE

  1. 平均每条记录长度360字节(比较常见的长度);
  2. 时间字段创建索引;
  3. 每轮测试插入12TB数据,插入完12T后清除数据继续插入。循环;
  4. 测试满24小时停止测试;
  5. 统计24小时插入的记录数;

24小时一共完成12轮测试,平均每轮测试耗时7071秒。 506万行/s,1.78 GB/s,全天插入4372亿,154TB数据。

测试的硬件环境

  1. 2. xfs
  2. 3. PostgreSQL 9.5

数据库配置

  1. ./configure --prefix=/home/digoal/pgsql9.5.1 --with-blocksize=32 --with-segsize=128 --with-wal-blocksize=32 --with-wal-segsize=64
  2. make && make install

PostgreSQL支持hugepage的方法

参数

  1. listen_addresses = '0.0.0.0' # what IP address(es) to listen on;
  2. fsync=on
  3. port = 1921 # (change requires restart)
  4. max_connections = 600 # (change requires restart)
  5. superuser_reserved_connections = 13 # (change requires restart)
  6. unix_socket_directories = '.' # comma-separated list of directories
  7. unix_socket_permissions = 0700 # begin with 0 to use octal notation
  8. tcp_keepalives_idle = 60 # TCP_KEEPIDLE, in seconds;
  9. tcp_keepalives_interval = 10 # TCP_KEEPINTVL, in seconds;
  10. tcp_keepalives_count = 10 # TCP_KEEPCNT;
  11. shared_buffers = 256GB # min 128kB
  12. huge_pages = on # on, off, or try
  13. work_mem = 512MB # min 64kB
  14. maintenance_work_mem = 1GB # min 1MB
  15. autovacuum_work_mem = 1GB # min 1MB, or -1 to use maintenance_work_mem
  16. dynamic_shared_memory_type = posix # the default is the first option
  17. bgwriter_delay = 10ms # 10-10000ms between rounds
  18. bgwriter_lru_maxpages = 1000 # 0-1000 max buffers written/round
  19. bgwriter_lru_multiplier = 2.0
  20. synchronous_commit = off # synchronization level;
  21. full_page_writes = on # recover from partial page writes
  22. wal_buffers = 2047MB # min 32kB, -1 sets based on shared_buffers
  23. wal_writer_delay = 10ms # 1-10000 milliseconds
  24. checkpoint_timeout = 55min # range 30s-1h
  25. max_wal_size = 512GB
  26. checkpoint_completion_target = 0.9 # checkpoint target duration, 0.0 - 1.0
  27. effective_cache_size = 40GB
  28. log_destination = 'csvlog' # Valid values are combinations of
  29. logging_collector = on # Enable capturing of stderr and csvlog
  30. log_directory = 'pg_log' # directory where log files are written,
  31. log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log' # log file name pattern,
  32. log_file_mode = 0600 # creation mode for log files,
  33. log_truncate_on_rotation = on # If on, an existing log file with the
  34. log_checkpoints = off
  35. log_connections = off
  36. log_disconnections = off
  37. log_error_verbosity = verbose # terse, default, or verbose messages
  38. log_timezone = 'PRC'
  39. log_autovacuum_min_duration = 0 # -1 disables, 0 logs all actions and
  40. datestyle = 'iso, mdy'
  41. timezone = 'PRC'
  42. lc_messages = 'C' # locale for system error message
  43. lc_monetary = 'C' # locale for monetary formatting
  44. lc_numeric = 'C' # locale for number formatting
  45. lc_time = 'C' # locale for time formatting
  46. default_text_search_config = 'pg_catalog.english'
  47. autovacuum=off

创建测试表 : 每32K的block存储89条记录, 每条记录360字节。

  1. postgres=# select string_agg(i,'') from (select md5(random()::text) i from generate_series(1,10) t(i)) t(i);
  2. string_agg
  3. ----------------------------------------------------------------------
  4. 53d3ec7adbeacc912a45bdd8557b435be848e4b1050dc0f5e46b75703d4745833541b5dabc177db460b6b1493961fc72c478daaaac74bcc89aec4f946a496028d9cff1cc4144f738e01ea36436455c216aa697d87fe1f87ceb49134a687dc69cba34c9951d0c9ce9ca82bba229d56874af40498dca5f
  5. d8dfb9c877546db76c35a3362d6bdba6472d3919289b6eaeeab58feb4f6e79592fc1dd8253fd4c588a29
  6. (1 row)
  7. postgres=# create unlogged table test(crt_time timestamp, info text default '53d3ec7adbeacc912a45bdd8557b435be848e4b1050dc0f5e46b75703d4745833541b5dabc177db460b6b1493961fc72c478daaaac74bcc89aec4f946a496028d9cff1cc4144f738e01ea36436455c216aa697d87fe1f87ceb49134a687dc69cba34c9951d0c9ce9ca82bba229d56874af40498dca5f
  8. d8dfb9c877546db76c35a3362d6bdba6472d3919289b6eaeeab58feb4f6e79592fc1dd8253fd4c588a29');
  9. postgres=# alter table test alter column info set storage plain;
  10. postgres=# select ctid from test limit 1000;

创建多个分表,用于减少 block extend 冲突。

PostgreSQL 黑科技 BRIN 索引方法

这里使用的是brin范围索引,PostgreSQL 针对物联网流式数据的黑科技。

  1. postgres=# \di
  2. List of relations
  3. Schema | Name | Type | Owner | Table
  4. --------+-------------+-------+----------+---------
  5. public | idx_test1 | index | postgres | test1
  6. public | idx_test100 | index | postgres | test100
  7. public | idx_test101 | index | postgres | test101
  8. public | idx_test102 | index | postgres | test102
  9. public | idx_test103 | index | postgres | test103
  10. public | idx_test104 | index | postgres | test104
  11. public | idx_test105 | index | postgres | test105
  12. public | idx_test106 | index | postgres | test106
  13. ......
  14. ......
  15. public | idx_test90 | index | postgres | test90
  16. public | idx_test91 | index | postgres | test91
  17. public | idx_test92 | index | postgres | test92
  18. public | idx_test93 | index | postgres | test93
  19. public | idx_test94 | index | postgres | test94
  20. public | idx_test95 | index | postgres | test95
  21. public | idx_test96 | index | postgres | test96
  22. public | idx_test97 | index | postgres | test97
  23. public | idx_test98 | index | postgres | test98
  24. public | idx_test99 | index | postgres | test99
  25. (128 rows)

生成测试脚本, 一个连接一次插入178条记录,占用2个32KB的block :

  1. vi test.sql
  2. insert into test(crt_time) values (now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now()),(now());
  3. for ((i=1;i<=128;i++)) do sed "s/test/test$i/" test.sql > ./test$i.sql; done

开始测试前清除数据:

  1. do language plpgsql $$
  2. declare
  3. i int;
  4. sql text;
  5. begin
  6. for i in 1..128 loop
  7. sql := 'truncate test'||i;
  8. execute sql;
  9. end loop;
  10. end;
  11. $$;

测试方法: 每轮测试插入12TB数据。通过以下方式控制:

  1. 使用128个并行连接,每个连接执行1572864个事务;
  2. 一共执行201326592个事务(每个事务插入178条记录);
  3. 一共插入35836133376条记录(358.36 亿记录)(共计12TB 数据,索引空间另算)。

进行下一轮测试前,输出日志,并TRUNCATE所有的数据,然后重复以上测试。直到测试满24小时,输出统计数据。

  1. vi test.sh
  2. #!/bin/bash
  3. if [ $# -ne 5 ]; then
  4. echo "please use: $0 ip port dbname user pwd"
  5. exit 1
  6. fi
  7. IP=$1
  8. PORT=$2
  9. DBNAME=$3
  10. USER=$4
  11. PASSWORD=$5
  12. export PGPASSWORD=$PASSWORD
  13. DEP_CMD="psql"
  14. which $DEP_CMD
  15. if [ $? -ne 0 ]; then
  16. echo -e "dep commands: $DEP_CMD not exist."
  17. exit 1
  18. fi
  19. truncate() {
  20. psql -h $IP -p $PORT -U $USER $DBNAME <<EOF
  21. do language plpgsql \$\$
  22. declare
  23. i int;
  24. sql text;
  25. begin
  26. for i in 1..128 loop
  27. sql := 'truncate test'||i;
  28. end loop;
  29. end;
  30. \$\$;
  31. \q
  32. EOF
  33. }
  34. # truncate data first
  35. truncate
  36. START=`date +%s`
  37. echo "`date +%F%T` $START"
  38. for ((x=1;x>0;x++))
  39. do
  40. # ------------------------------------------------------
  41. echo "Round $x test start: `date +%F%T` `date +%s`"
  42. for ((i=1;i<=128;i++))
  43. do
  44. pgbench -M prepared -n -r -f ./test$i.sql -h $IP -p $PORT -U $USER $DBNAME -c 1 -j 1 -t 1572864 >>./$i.log 2>&1 &
  45. done
  46. wait
  47. echo "Round $x test end: `date +%F%T` `date +%s`"
  48. # ------------------------------------------------------
  49. if [ $((`date +%s`-$START)) -gt 86400 ]; then
  50. echo "end `date +%F%T` `date +%s`"
  51. echo "duration second: $((`date +%s`-$START))"
  52. exit 0
  53. fi
  54. echo "Round $x test end, start truncate `date +%F%T` `date +%s`"
  55. truncate
  56. echo "Round $x test end, end truncate `date +%F%T` `date +%s`"
  57. done

测试

  1. postgres=# select min(crt_time),max(crt_time) from test1;
  2. min | max
  3. ----------------------------+----------------------------
  4. 2016-04-08 00:32:26.842728 | 2016-04-08 02:29:41.583367
  5. (1 row)
  6. postgres=# explain select count(*) from test1 where crt_time between '2016-04-08 00:32:00' and '2016-04-08 00:33:00';
  7. QUERY PLAN
  8. -------------------------------------------------------------------------------------------------------------------------------------------------------------------
  9. Aggregate (cost=1183919.81..1183919.82 rows=1 width=0)
  10. -> Bitmap Heap Scan on test1 (cost=14351.45..1180420.19 rows=1399849 width=0)
  11. Recheck Cond: ((crt_time >= '2016-04-08 00:32:00'::timestamp without time zone) AND (crt_time <= '2016-04-08 00:33:00'::timestamp without time zone))
  12. -> Bitmap Index Scan on idx_test1 (cost=0.00..14001.49 rows=1399849 width=0)
  13. Index Cond: ((crt_time >= '2016-04-08 00:32:00'::timestamp without time zone) AND (crt_time <= '2016-04-08 00:33:00'::timestamp without time zone))
  14. (5 rows)
  15. Time: 0.382 ms
  16. postgres=# select count(*) from test1 where crt_time between '2016-04-08 00:32:00' and '2016-04-08 00:33:00';
  17. count
  18. ---------
  19. 2857968
  20. (1 row)
  21. Time: 554.474 ms

小结

  1. 这个CASE主要的应用场景是实时的大数据入库,例如物联网的应用场景,大量的传感器会产生庞大的数据。 又比如传统的运营商网关,也会有非常庞大的流量数据或业务数据需要实时的入库。索引方面,用到了PostgreSQL黑科技BRIN。

  2. 除了实时入库,用户如果需要流式实时处理,可以参考基于PostgreSQL的流式处理方案,一天处理1万亿的实时流式处理是如何实现的?

  3. 瓶颈, 还是在IO上面 , 有几个表现,TOP大量进程处于D(front io)状态 。

    1. w: S -- Process Status
    2. The status of the task which can be one of:
    3. D = uninterruptible sleep
    4. R = running
    5. S = sleeping
    6. T = traced or stopped
    7. Z = zombie

    所有块设备的使用率均达100% 。 清理数据时 :

    1. Device: rrqm/s wrqm/s r/s w/s rsec/s wsec/s avgrq-sz avgqu-sz await svctm %util
    2. dfa 0.00 0.00 5807.39 167576.65 1464080.93 1340613.23 16.18 535.69 3.02 0.01 116.77
    3. dfb 0.00 0.00 5975.10 185132.68 1506714.40 1481061.48 15.63 459.46 2.32 0.01 110.62
    4. dfc 0.00 0.00 5715.56 182584.05 1440771.98 1460672.37 15.41 568.02 2.93 0.01 112.37

    插入数据时:

    1. Device: rrqm/s wrqm/s r/s w/s rsec/s wsec/s avgrq-sz avgqu-sz await svctm %util
    2. dfa 0.00 0.00 0.00 235936.00 0.00 1887488.00 8.00 2676.34 11.17 0.00 99.10
    3. dfc 0.00 0.00 0.00 239830.00 0.00 1918632.00 8.00 10.66 0.04 0.00 101.30

有几个工具你可能用得上,perf、systemtap和goprof。如果要较全面的分析,建议把 PostgreSQL –enable-profiling 打开用于诊断。