首先,创建我们要用到的插件。

  1. CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_prepare(t_from text, t_to text)
  2. RETURNS bool AS
  3. $BODY$
  4. DECLARE
  5. t_exist int;
  6. curs1 refcursor;
  7. r record;
  8. filepath text;
  9. fileindex int8;
  10. s1 text;
  11. s2 text;
  12. s3 text;
  13. c int = 0;
  14. s4 text;
  15. s5 text;
  16. ss4 text;
  17. ss5 text;
  18. sql text;
  19. BEGIN
  20. create table if not exists oss_fdw_load_status(id BIGSERIAL primary key, filename text, size int8, rows int8 default 0, status int default 0);
  21. select count(*) into t_exist from oss_fdw_load_status;
  22. if t_exist != 0 then
  23. RAISE NOTICE 'oss_fdw_load_status not empty';
  24. return false;
  25. end if;
  26. -- 通过 oss_fdw_list_file 函数,把外部表 t_from 匹配的 OSS 中的文件列到表中
  27. insert into oss_fdw_load_status (filename, size) select name,size from oss_fdw_list_file(t_from) order by size desc;
  28. select count(*) into t_exist from oss_fdw_load_status;
  29. if t_exist = 0 then
  30. RAISE NOTICE 'oss_fdw_load_status empty,not task found';
  31. return false;
  32. end if;
  33. return true;
  34. END;
  35. $BODY$
  36. LANGUAGE plpgsql;
  37. -- 数据装载的工作函数
  38. CREATE OR REPLACE FUNCTION rds_oss_fdw_load_data_execute(t_from text, t_to text, num_work int, pass text)
  39. RETURNS bool AS
  40. $BODY$
  41. DECLARE
  42. t_exist int;
  43. curs1 refcursor;
  44. r record;
  45. filepath text;
  46. fileindex int8;
  47. s1 text;
  48. s2 text;
  49. s3 text;
  50. s4 text;
  51. s5 text;
  52. ss4 text;
  53. ss5 text;
  54. sql text;
  55. db text;
  56. BEGIN
  57. select count(*) into t_exist from oss_fdw_load_status;
  58. if t_exist = 0 then
  59. RAISE NOTICE 'oss_fdw_load_status empty';
  60. return false;
  61. end if;
  62. s4 = 'oss_loader';
  63. s5 = 'idle';
  64. ss4 = '''' || s4 ||'''';
  65. ss5 = '''' || s5 ||'''';
  66. sql = 'select count(*) from pg_stat_activity where application_name = ' || ss4 || ' and state != ' || ss5;
  67. select current_database() into db;
  68. select current_user into user;
  69. -- 通过游标,不断获取单个任务
  70. OPEN curs1 FOR SELECT id, filename FROM oss_fdw_load_status order by id;
  71. loop
  72. fetch curs1 into r;
  73. if not found then
  74. exit;
  75. end if;
  76. fileindex = r.id;
  77. filepath = r.filename;
  78. s1 = '''' || t_from ||'''';
  79. s2 = '''' || t_to ||'''';
  80. s3 = '''' || filepath ||'''';
  81. LOOP
  82. -- 查看当前正在工作的任务数,过达到并发数就在这里等待
  83. select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);
  84. IF c < num_work THEN
  85. EXIT;
  86. END IF;
  87. RAISE NOTICE 'current runing % loader', c;
  88. perform pg_sleep(1);
  89. END LOOP;
  90. -- 通过 DBLINK 创建异步任务
  91. perform dis_conn('oss_loader_'||fileindex);
  92. perform dblink_connect('oss_loader_'||fileindex, 'dbname='||db ||' user='||user || ' application_name=oss_loader' || ' password='||pass);
  93. perform dblink_send_query('oss_loader_'||fileindex, format('
  94. begin;
  95. select rds_oss_fdw_load_single_file(%s,%s,%s,%s);
  96. end;'
  97. , fileindex, s1, s2, s3)
  98. );
  99. RAISE NOTICE 'runing loader task % filename %',fileindex, filepath;
  100. end loop;
  101. close curs1;
  102. -- 任务分配完成,等待所有任务完成
  103. LOOP
  104. select a into c from dblink('dbname='||db ||' user='||user || ' password='||pass ,sql)as t(a int);
  105. IF c = 0 THEN
  106. EXIT;
  107. END IF;
  108. perform pg_sleep(1);
  109. END LOOP;
  110. return true;
  111. END;
  112. $BODY$
  113. LANGUAGE plpgsql;
  114. -- 单个文件的数据装在函数
  115. CREATE OR REPLACE FUNCTION rds_oss_fdw_load_single_file(taskid int8, t_from text, t_to text, filepath text)
  116. RETURNS void AS
  117. $BODY$
  118. DECLARE
  119. rowscount int8 = 0;
  120. current text;
  121. sql text;
  122. BEGIN
  123. -- 配置 GUC 参数,指定要导入的 OSS 上的文件
  124. perform set_config('oss_fdw.rds_read_one_file',filepath,true);
  125. select current_setting('oss_fdw.rds_read_one_file') into current;
  126. RAISE NOTICE 'begin load %', current;
  127. -- 通过动态 SQL 导入数据
  128. EXECUTE 'insert into '|| t_to || ' select * from ' || t_from;
  129. GET DIAGNOSTICS rowscount = ROW_COUNT;
  130. -- 导入完成后,把结果保存到状态表中
  131. RAISE NOTICE 'end load id % % to % % rows', taskid, filepath, t_to, rowscount;
  132. update oss_fdw_load_status set rows = rowscount,status = 1 where id = taskid;
  133. return;
  134. EXCEPTION
  135. when others then
  136. RAISE 'run rds_oss_fdw_load_single_file with error';
  137. END;
  138. $BODY$
  139. LANGUAGE plpgsql;
  140. -- 关闭连接不报错
  141. create or replace function dis_conn(name) returns void as $$
  142. declare
  143. begin
  144. perform dblink_disconnect($1);
  145. return;
  146. exception when others then
  147. return;
  148. end;
  149. $$ language plpgsql strict;

执行后,会看到表 oss_fdw_load_status 中,保存了准备导入的所有文件列表,用户可以做适当的删减定制。

2. 数据装载

  1. select rds_oss_fdw_load_data_execute('oss_table','lineitem',10,'mypassword');

期间,我们可以通过下列 SQL 查看正在工作的异步会话状态

4.管理状态

同时,我们也可以随时中断数据导入工作

  1. select pg_terminate_backend(pid),application_name, state ,query from pg_stat_activity where state != 'idle' and pid != pg_backend_pid() and application_name='oss_loader';

6. 性能

使用 TPCC 100GB的数据进行装载测试,耗时 10 分钟,平均 170MB/S

  1. select rds_oss_fdw_load_data_prepare('t_oss2','lineitem');
  2. select rds_oss_fdw_load_data_execute('t_oss2','lineitem',10,'123456Zwj');
  3. select sum(size)/1024/1024 from oss_fdw_load_status;
  4. ?column?
  5. --------------------
  6. 22561.919849395752
  7. (1 row)
  8. select pg_size_pretty(pg_relation_size(oid)) from pg_class where relname = 'lineitem';
  9. pg_size_pretty
  10. ----------------
  11. 101 GB
  12. (1 row)

本文使用 plsql + dblink 的方式加速了 OSS 的数据导入。另外,大家也可以关注到以下三点

  • 1. PostgreSQL 默认的过程语言 pl/pgsql 相当好用,和 SQL 引擎紧密结合且学习成本低。我们推荐用户把业务逻辑用它实现。使用过程语言相对于在客户端执行 SQL,消除了服务器到和客户端的网络开销,有天然的性能优势。
  • 3. 阿里云开发的 oss_fdw 能在 PostgreSQL 和 OSS 之间做快速的数据交换。oss_fdw 支持 CSV 和压缩方式 CSV 数据的读和写,且很容易用并行加速。oss_fdw 的性能相对于 jdbc insert 和 copy 有压倒的性能优势。