JDBC驱动
SparkSQL 可以通过 JDBC 驱动连接 SequoiaDB 进行操作。
下载安装 Spark 和 SequoiaDB 数据库,将 Spark-SequoiaDB 连接组件和 SequoiaDB Java 驱动的 jar 包复制到 Spark 安装路径下的 目录下
新建一个 java 项目,并导入 sparkSQL 的 JDBC 驱动程序依赖包,可使用 maven 下载,参考配置如下:
> db.test.test.find()
{
"_id": {
"$oid": "5d5911f41125bc9c9aa2bc0b"
},
"c1": 0,
"c2": "mary",
"c3": 15
}
{
"_id": {
"$oid": "5d5912041125bc9c9aa2bc0c"
},
"c1": 1,
"c2": "lili",
"c3": 25
}
编写并执行示例代码
运行结果如下:
connection success!
Running:select * from test
1 lili 25
0 mary 15
SparkSQL对接SequoiaSQL
SparkSQL 可以通过 DataFrames 使用 JDBC 对 SequoiaSQL-MySQL 或 SequoiaSQL-PGSQL 进行读写操作。
在读实例执行创建测试库、测试用户、授权及准备数据,在写实例执行创建测试库、测试用户及授权
编写示例代码
package com.sequoiadb.test;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.io.File;
import java.io.FileInputStream;
import java.util.Properties;
public final class JDBCDemo {
public static void main(String[] args) throws Exception {
String readUrl = "jdbc:mysql://192.168.30.81/sparktest" ;
String writeUrl = "jdbc:mysql://192.168.30.82/sparktest" ;
SparkSession spark = SparkSession.builder().appName("JDBCDemo").getOrCreate();
Properties dbProperties = new Properties();
dbProperties.setProperty("password", "sparktest" );
System.out.println("A DataFrame loaded from the entire contents of a table over JDBC.");
String where = "sparktest.people";
Dataset<Row> entireDF = spark.read().jdbc(readUrl, where, dbProperties);
entireDF.printSchema();
System.out.println("Filtering the table to just show the males.");
entireDF.filter("is_male = 1").show();
System.out.println("Alternately, pre-filter the table for males before loading over JDBC.");
where = "(select * from sparktest.people where is_male = 1) as subset";
Dataset<Row> malesDF = spark.read().jdbc(readUrl, where, dbProperties);
malesDF.show();
System.out.println("Update weights by 2 pounds (results in a new DataFrame with same column names)");
Dataset<Row> heavyDF = entireDF.withColumn("updated_weight_lb", entireDF.col("weight_lb").plus(2));
Dataset<Row> updatedDF = heavyDF.select("id", "name", "is_male", "height_in", "updated_weight_lb")
.withColumnRenamed("updated_weight_lb", "weight_lb");
updatedDF.show();
System.out.println("Save the updated data to a new table with JDBC");
where = "sparktest.updated_people";
updatedDF.write().mode("error").jdbc(writeUrl, where, dbProperties);
System.out.println("Load the new table into a new DataFrame to confirm that it was saved successfully.");
Dataset<Row> retrievedDF = spark.read().jdbc(writeUrl, where, dbProperties);
retrievedDF.show();
spark.stop();
}
}
编译并提交任务
A DataFrame loaded from the entire contents of a table over JDBC.
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- is_male: boolean (nullable = true)
|-- height_in: integer (nullable = true)
|-- weight_lb: integer (nullable = true)
+---+-------+-------+---------+---------+
| id| name|is_male|height_in|weight_lb|
+---+-------+-------+---------+---------+
| 1| Alice| false| 60| 125|
| 2| Brian| true| 64| 131|
| 3|Charlie| true| 74| 183|
| 4| Doris| false| 58| 102|
| 5| Ellen| false| 66| 140|
| 6| Frank| true| 66| 151|
| 8| Harold| true| 61| 128|
+---+-------+-------+---------+---------+
Filtering the table to just show the males.
| id| name|is_male|height_in|weight_lb|
+---+-------+-------+---------+---------+
| 2| Brian| true| 64| 131|
| 3|Charlie| true| 74| 183|
| 6| Frank| true| 66| 151|
| 7| Gerard| true| 68| 190|
| 8| Harold| true| 61| 128|
+---+-------+-------+---------+---------+
Alternately, pre-filter the table for males before loading over JDBC.
+---+-------+-------+---------+---------+
| id| name|is_male|height_in|weight_lb|
+---+-------+-------+---------+---------+
| 2| Brian| true| 64| 131|
| 3|Charlie| true| 74| 183|
| 6| Frank| true| 66| 151|
| 7| Gerard| true| 68| 190|
| 8| Harold| true| 61| 128|
+---+-------+-------+---------+---------+
Update weights by 2 pounds (results in a new DataFrame with same column names)
+---+-------+-------+---------+---------+
| id| name|is_male|height_in|weight_lb|
+---+-------+-------+---------+---------+
| 1| Alice| false| 60| 127|
| 2| Brian| true| 64| 133|
| 3|Charlie| true| 74| 185|
| 4| Doris| false| 58| 104|
| 5| Ellen| false| 66| 142|
| 6| Frank| true| 66| 153|
| 7| Gerard| true| 68| 192|
| 8| Harold| true| 61| 130|
+---+-------+-------+---------+---------+
Save the updated data to a new table with JDBC
Load the new table into a new DataFrame to confirm that it was saved successfully.
+---+-------+-------+---------+---------+
| id| name|is_male|height_in|weight_lb|
+---+-------+-------+---------+---------+
| 1| Alice| false| 60| 127|
| 2| Brian| true| 64| 133|
| 3|Charlie| true| 74| 185|
| 4| Doris| false| 58| 104|
| 5| Ellen| false| 66| 142|
| 6| Frank| true| 66| 153|
| 7| Gerard| true| 68| 192|
+---+-------+-------+---------+---------+