例如avg聚合,对一张表进行计算时,一个任务中操作和多个并行任务操作,算法是不一样的。

PostgreSQL提供了一套标准的接口,可以支持聚合函数的并行操作。

创建聚合函数的语法如下:

相比非并行,多了一个过程,那就是combinefunc的过程(也叫partial agg)。

非并行模式的聚合流程大致如下:

  1. sfunc( internal-state, next-data-values ) ---> next-internal-state
  2. 最后调用一次(可选)
  3. ffunc( internal-state ) ---> aggregate-value

并行模式的聚合流程大致如下,如果没有写combinefunc,那么实际上聚合过程并没有实现并行而只是扫描并行:

pic

  1. postgres=# set max_parallel_workers=4;
  2. SET
  3. postgres=# set max_parallel_workers_per_gather =4;
  4. SET
  5. postgres=# set parallel_setup_cost =0;
  6. SET
  7. postgres=# set parallel_tuple_cost =0;
  8. SET
  9. postgres=# alter table test set (parallel_workers =4);
  10. ALTER TABLE
  11. postgres=# explain (analyze,verbose,timing,costs,buffers) select count(*) from test;
  12. QUERY PLAN
  13. -----------------------------------------------------------------------------------------------------------------------------------------------
  14. -- final并行,可有可无,看具体的聚合算法
  15. Finalize Aggregate (cost=15837.02..15837.03 rows=1 width=8) (actual time=57.296..57.296 rows=1 loops=1)
  16. Output: count(*)
  17. Buffers: shared hit=3060
  18. -> Gather (cost=15837.00..15837.01 rows=4 width=8) (actual time=57.287..57.292 rows=5 loops=1)
  19. Output: (PARTIAL count(*))
  20. Workers Planned: 4
  21. Workers Launched: 4
  22. Buffers: shared hit=3060
  23. -- 一下就是combinefunc完成的聚合并行(显示为PARTIAL agg)
  24. -> Partial Aggregate (cost=15837.00..15837.01 rows=1 width=8) (actual time=52.333..52.333 rows=1 loops=5)
  25. Output: PARTIAL count(*)
  26. Buffers: shared hit=12712
  27. Worker 0: actual time=50.917..50.918 rows=1 loops=1
  28. Buffers: shared hit=2397
  29. Worker 1: actual time=51.293..51.294 rows=1 loops=1
  30. Buffers: shared hit=2423
  31. Worker 2: actual time=51.062..51.063 rows=1 loops=1
  32. Buffers: shared hit=2400
  33. Worker 3: actual time=51.436..51.436 rows=1 loops=1
  34. Buffers: shared hit=2432
  35. -> Parallel Seq Scan on public.test (cost=0.00..15212.00 rows=250000 width=0) (actual time=0.010..30.499 rows=200000 loops=5)
  36. Buffers: shared hit=12712
  37. Worker 0: actual time=0.013..30.343 rows=190269 loops=1
  38. Buffers: shared hit=2397
  39. Worker 1: actual time=0.010..30.401 rows=192268 loops=1
  40. Buffers: shared hit=2423
  41. Worker 2: actual time=0.013..30.467 rows=190350 loops=1
  42. Buffers: shared hit=2400
  43. Worker 3: actual time=0.009..30.221 rows=192861 loops=1
  44. Buffers: shared hit=2432
  45. Planning time: 0.074 ms
  46. Execution time: 60.169 ms
  47. (31 rows)

了解了并行聚合的原理后,我们就可以写自定义聚合函数的并行计算了。

例如我们要支持一个数组的聚合,并且在聚合过程中我们要实现对元素去重。

1、创建测试表

    2、生成测试数据

    1. CREATE OR REPLACE FUNCTION public.gen_arr(integer, integer)
    2. RETURNS integer[]
    3. LANGUAGE sql
    4. AS $function$
    5. select array(select ($1*random())::int from generate_series(1,$2));
    6. $function$;
    7. insert into test select random()*1000, gen_arr(500,10) from generate_series(1,10000);

    3、创建聚合函数

    数组去重函数

    数组合并与去重函数

    1. postgres=# create or replace function array_uniq_cat(anyarray,anyarray) returns anyarray as $$
    2. select uniq(array_cat($1,$2));
    3. $$ language sql strict parallel safe;
    4. CREATE FUNCTION

    聚合函数(不带COMBINEFUNC)

    1. create aggregate arragg (anyarray) (sfunc = array_uniq_cat, stype=anyarray, PARALLEL=safe);
    1. postgres=# set max_parallel_workers=4;
    2. SET
    3. postgres=# set max_parallel_workers_per_gather =4;
    4. SET
    5. postgres=# set parallel_setup_cost =0;
    6. SET
    7. postgres=# set parallel_tuple_cost =0;
    8. SET
    9. postgres=# alter table test set (parallel_workers =4);
    10. ALTER TABLE
    11. postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;

    很明显没有设置COMBINEFUNC时,未使用并行聚合。

    1. postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;
    2. QUERY PLAN
    3. -----------------------------------------------------------------------------------------------------------------------------------
    4. HashAggregate (cost=4139.74..4141.74 rows=200 width=36) (actual time=602.957..603.195 rows=1001 loops=1)
    5. Output: id, arragg(col)
    6. Group Key: test.id
    7. Buffers: shared hit=6
    8. -> Gather (cost=0.00..163.37 rows=15748 width=36) (actual time=0.328..43.734 rows=10000 loops=1)
    9. Output: id, col
    10. Workers Planned: 4
    11. Workers Launched: 4
    12. Buffers: shared hit=6
    13. -- 只有并行扫描,没有并行聚合。
    14. -> Parallel Seq Scan on public.test (cost=0.00..163.37 rows=3937 width=36) (actual time=0.017..0.891 rows=2000 loops=5)
    15. Output: id, col
    16. Buffers: shared hit=124
    17. Worker 0: actual time=0.019..0.177 rows=648 loops=1
    18. Buffers: shared hit=8
    19. Worker 1: actual time=0.022..0.180 rows=648 loops=1
    20. Buffers: shared hit=8
    21. Worker 2: actual time=0.017..3.772 rows=7570 loops=1
    22. Buffers: shared hit=94
    23. Worker 3: actual time=0.015..0.189 rows=648 loops=1
    24. Buffers: shared hit=8
    25. Planning time: 0.084 ms
    26. Execution time: 603.450 ms
    27. (22 rows)

    使用了并行聚合。

    1. postgres=# explain (analyze,verbose,timing,costs,buffers) select id, arragg(col) from test group by id ;
    2. QUERY PLAN
    3. -----------------------------------------------------------------------------------------------------------------------------------------
    4. Finalize HashAggregate (cost=1361.46..1363.46 rows=200 width=36) (actual time=285.489..285.732 rows=1001 loops=1)
    5. Output: id, arragg(col)
    6. Group Key: test.id
    7. Buffers: shared hit=36
    8. Workers Planned: 4
    9. Workers Launched: 4
    10. Buffers: shared hit=36
    11. -- 并行聚合
    12. -> Partial HashAggregate (cost=1157.46..1159.46 rows=200 width=36) (actual time=57.367..57.727 rows=859 loops=5)
    13. Output: id, PARTIAL arragg(col)
    14. Group Key: test.id
    15. Buffers: shared hit=886
    16. Worker 0: actual time=54.788..54.997 rows=857 loops=1
    17. Buffers: shared hit=213
    18. Worker 1: actual time=56.881..57.255 rows=861 loops=1
    19. Buffers: shared hit=213
    20. Worker 2: actual time=55.415..55.813 rows=856 loops=1
    21. Buffers: shared hit=212
    22. Worker 3: actual time=56.453..56.854 rows=838 loops=1
    23. Buffers: shared hit=212
    24. -> Parallel Seq Scan on public.test (cost=0.00..163.37 rows=3937 width=36) (actual time=0.011..0.736 rows=2000 loops=5)
    25. Output: id, col
    26. Buffers: shared hit=124
    27. Worker 0: actual time=0.009..0.730 rows=1981 loops=1
    28. Buffers: shared hit=25
    29. Worker 1: actual time=0.012..0.773 rows=2025 loops=1
    30. Buffers: shared hit=25
    31. Worker 2: actual time=0.015..0.741 rows=1944 loops=1
    32. Buffers: shared hit=24
    33. Worker 3: actual time=0.012..0.751 rows=1944 loops=1
    34. Buffers: shared hit=24
    35. Planning time: 0.073 ms
    36. Execution time: 285.949 ms
    37. (34 rows)

    实际上并行聚合与分布式数据库聚合阶段原理是一样的,分布式数据库自定义聚合可以参考末尾的文章。

    PostgreSQL内置的array_agg会将数组聚合为多元数组,有些场景无法满足需求。

    1. List of functions
    2. Schema | Name | Result data type | Argument data types | Type
    3. ------------+-------------------------+------------------+-----------------------+--------
    4. pg_catalog | array_agg | anyarray | anyarray | agg
    5. pg_catalog | array_agg | anyarray | anynonarray | agg
    1. postgres=# \set VERBOSITY verbose
    2. postgres=# select array_agg(info) from (values(array[1,2,3]),(array[2,3,4,5])) t(info);
    3. ERROR: 2202E: cannot accumulate arrays of different dimensionality
    4. LOCATION: accumArrayResultArr, arrayfuncs.c:5270
    5. postgres=# select array_agg(info) from (values(array[1,2,3]),(array[3,4,5])) t(info);
    6. array_agg
    7. -------------------
    8. {\{1,2,3\},\{3,4,5\}}
    9. (1 row)

    如果要将数组合并为一元数组,可以自定义一个聚合函数如下:

    1. postgres=# create aggregate arragg (anyarray) (sfunc = array_cat, stype=anyarray, PARALLEL=safe);
    2. CREATE AGGREGATE
    3. postgres=# select arragg(info) from (values(array[1,2,3]),(array[3,4,5])) t(info);
    4. arragg
    5. ---------------
    6. {1,2,3,3,4,5}
    7. (1 row)
    8. postgres=# select arragg(info) from (values(array[1,2,3]),(array[2,3,4,5])) t(info);
    9. arragg
    10. -----------------
    11. (1 row)

    https://www.postgresql.org/docs/10/static/xaggr.html#XAGGR-PARTIAL-AGGREGATES

    《Postgres-XC customized aggregate introduction》