JDBC驱动

    SparkSQL 可以通过 JDBC 驱动连接 SequoiaDB 进行操作。

    1. 下载安装 Spark 和 SequoiaDB 数据库,将 Spark-SequoiaDB 连接组件和 SequoiaDB Java 驱动的 jar 包复制到 Spark 安装路径下的 目录下

    2. 新建一个 java 项目,并导入 sparkSQL 的 JDBC 驱动程序依赖包,可使用 maven 下载,参考配置如下:

    1. > db.test.test.find()
    2. {
    3. "_id": {
    4. "$oid": "5d5911f41125bc9c9aa2bc0b"
    5. },
    6. "c1": 0,
    7. "c2": "mary",
    8. "c3": 15
    9. }
    10. {
    11. "_id": {
    12. "$oid": "5d5912041125bc9c9aa2bc0c"
    13. },
    14. "c1": 1,
    15. "c2": "lili",
    16. "c3": 25
    17. }

    编写并执行示例代码

    运行结果如下:

    1. connection success!
    2. Running:select * from test
    3. 1 lili 25
    4. 0 mary 15

    SparkSQL对接SequoiaSQL

    SparkSQL 可以通过 DataFrames 使用 JDBC 对 SequoiaSQL-MySQL 或 SequoiaSQL-PGSQL 进行读写操作。

    1. 在读实例执行创建测试库、测试用户、授权及准备数据,在写实例执行创建测试库、测试用户及授权

    1. 编写示例代码

      1. package com.sequoiadb.test;
      2. import org.apache.spark.sql.Dataset;
      3. import org.apache.spark.sql.Row;
      4. import org.apache.spark.sql.SparkSession;
      5. import java.io.File;
      6. import java.io.FileInputStream;
      7. import java.util.Properties;
      8. public final class JDBCDemo {
      9. public static void main(String[] args) throws Exception {
      10. String readUrl = "jdbc:mysql://192.168.30.81/sparktest" ;
      11. String writeUrl = "jdbc:mysql://192.168.30.82/sparktest" ;
      12. SparkSession spark = SparkSession.builder().appName("JDBCDemo").getOrCreate();
      13. Properties dbProperties = new Properties();
      14. dbProperties.setProperty("password", "sparktest" );
      15. System.out.println("A DataFrame loaded from the entire contents of a table over JDBC.");
      16. String where = "sparktest.people";
      17. Dataset<Row> entireDF = spark.read().jdbc(readUrl, where, dbProperties);
      18. entireDF.printSchema();
      19. System.out.println("Filtering the table to just show the males.");
      20. entireDF.filter("is_male = 1").show();
      21. System.out.println("Alternately, pre-filter the table for males before loading over JDBC.");
      22. where = "(select * from sparktest.people where is_male = 1) as subset";
      23. Dataset<Row> malesDF = spark.read().jdbc(readUrl, where, dbProperties);
      24. malesDF.show();
      25. System.out.println("Update weights by 2 pounds (results in a new DataFrame with same column names)");
      26. Dataset<Row> heavyDF = entireDF.withColumn("updated_weight_lb", entireDF.col("weight_lb").plus(2));
      27. Dataset<Row> updatedDF = heavyDF.select("id", "name", "is_male", "height_in", "updated_weight_lb")
      28. .withColumnRenamed("updated_weight_lb", "weight_lb");
      29. updatedDF.show();
      30. System.out.println("Save the updated data to a new table with JDBC");
      31. where = "sparktest.updated_people";
      32. updatedDF.write().mode("error").jdbc(writeUrl, where, dbProperties);
      33. System.out.println("Load the new table into a new DataFrame to confirm that it was saved successfully.");
      34. Dataset<Row> retrievedDF = spark.read().jdbc(writeUrl, where, dbProperties);
      35. retrievedDF.show();
      36. spark.stop();
      37. }
      38. }
    2. 编译并提交任务

      1. A DataFrame loaded from the entire contents of a table over JDBC.
      2. root
      3. |-- id: integer (nullable = true)
      4. |-- name: string (nullable = true)
      5. |-- is_male: boolean (nullable = true)
      6. |-- height_in: integer (nullable = true)
      7. |-- weight_lb: integer (nullable = true)
      8. +---+-------+-------+---------+---------+
      9. | id| name|is_male|height_in|weight_lb|
      10. +---+-------+-------+---------+---------+
      11. | 1| Alice| false| 60| 125|
      12. | 2| Brian| true| 64| 131|
      13. | 3|Charlie| true| 74| 183|
      14. | 4| Doris| false| 58| 102|
      15. | 5| Ellen| false| 66| 140|
      16. | 6| Frank| true| 66| 151|
      17. | 8| Harold| true| 61| 128|
      18. +---+-------+-------+---------+---------+
      19. Filtering the table to just show the males.
      20. | id| name|is_male|height_in|weight_lb|
      21. +---+-------+-------+---------+---------+
      22. | 2| Brian| true| 64| 131|
      23. | 3|Charlie| true| 74| 183|
      24. | 6| Frank| true| 66| 151|
      25. | 7| Gerard| true| 68| 190|
      26. | 8| Harold| true| 61| 128|
      27. +---+-------+-------+---------+---------+
      28. Alternately, pre-filter the table for males before loading over JDBC.
      29. +---+-------+-------+---------+---------+
      30. | id| name|is_male|height_in|weight_lb|
      31. +---+-------+-------+---------+---------+
      32. | 2| Brian| true| 64| 131|
      33. | 3|Charlie| true| 74| 183|
      34. | 6| Frank| true| 66| 151|
      35. | 7| Gerard| true| 68| 190|
      36. | 8| Harold| true| 61| 128|
      37. +---+-------+-------+---------+---------+
      38. Update weights by 2 pounds (results in a new DataFrame with same column names)
      39. +---+-------+-------+---------+---------+
      40. | id| name|is_male|height_in|weight_lb|
      41. +---+-------+-------+---------+---------+
      42. | 1| Alice| false| 60| 127|
      43. | 2| Brian| true| 64| 133|
      44. | 3|Charlie| true| 74| 185|
      45. | 4| Doris| false| 58| 104|
      46. | 5| Ellen| false| 66| 142|
      47. | 6| Frank| true| 66| 153|
      48. | 7| Gerard| true| 68| 192|
      49. | 8| Harold| true| 61| 130|
      50. +---+-------+-------+---------+---------+
      51. Save the updated data to a new table with JDBC
      52. Load the new table into a new DataFrame to confirm that it was saved successfully.
      53. +---+-------+-------+---------+---------+
      54. | id| name|is_male|height_in|weight_lb|
      55. +---+-------+-------+---------+---------+
      56. | 1| Alice| false| 60| 127|
      57. | 2| Brian| true| 64| 133|
      58. | 3|Charlie| true| 74| 185|
      59. | 4| Doris| false| 58| 104|
      60. | 5| Ellen| false| 66| 142|
      61. | 6| Frank| true| 66| 153|
      62. | 7| Gerard| true| 68| 192|
      63. +---+-------+-------+---------+---------+