Apache Spark
For Scala 2.11:
<dependency>
<groupId>com.yugabyte.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.5-yb-2</version>
</dependency>
You can run our Spark-based sample app with:
$ java -jar yb-sample-apps.jar --workload CassandraSparkWordCount --nodes 127.0.0.1:9042
It reads data from a table with sentences — by default, it generates an input table ybdemo_keyspace.lines
, computes the frequencies of the words, and writes the result to the output table ybdemo_keyspace.wordcounts
.
Examining the source code
- the source file in our GitHub source repo
- untar the jar
java/yb-sample-apps-sources.jar
in the download bundle
Most of the logic is in the method of the CassandraSparkWordCount
class (in the file src/main/java/com/yugabyte/sample/apps/CassandraSparkWordCount.java
). Some of the key portions of the sample program are explained in the sections below.
The SparkConf object is configured as follows:
Setting the input source
To set the input data for Spark, you can do one of the following.
- Reading from a table with a column
line
as the input
// Read rows from table and convert them to an RDD.
JavaRDD<String> rows = javaFunctions(sc).cassandraTable(keyspace, inputTable)
.map(row -> row.getString("line"));
- Reading from a file as the input:
// Read the input file and convert it to an RDD.
JavaRDD<String> rows = sc.textFile(inputFile);
Setting the output table
The output is written to the table.
// Create the output table.
session.execute("CREATE TABLE IF NOT EXISTS " + outTable +
" (word VARCHAR PRIMARY KEY, count INT);");
// Save the output to the CQL table.
javaFunctions(counts).writerBuilder(keyspace, outputTable, mapTupleToRow(String.class, Integer.class))
.withColumnSelector(someColumns("word", "count"))
.saveToCassandra();
Start PySpark with for Scala 2.10:
For Scala 2.11: