Apache Avro Data Source Guide

    The module is external and not included in spark-submit or spark-shell by default.

    As with any Spark applications, spark-submit is used to launch your application. spark-avro_2.12 and its dependencies can be directly added to spark-submit using --packages, such as,

    For experimenting on spark-shell, you can also use --packages to add org.apache.spark:spark-avro_2.12 and its dependencies directly,

    1. ./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.4.0 ...

    See for more details about submitting applications with external dependencies.

    Load and Save Functions

    Since spark-avro module is external, there is no .avro API in DataFrameReader or DataFrameWriter.

    To load/save data in Avro format, you need to specify the data source option format as avro(or org.apache.spark.sql.avro).

    1. val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
    2. usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
    1. df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
    2. df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
    1. df <- read.df("examples/src/main/resources/users.avro", "avro")
    2. write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

    Using Avro record as columns is useful when reading from or writing to a streaming source like Kafka. Each Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.

    • If the “value” field that contains your data is in Avro, you could use from_avro() to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file.
    • to_avro() can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.
    1. import static org.apache.spark.sql.functions.col;
    2. import static org.apache.spark.sql.avro.functions.*;
    3. // `from_avro` requires Avro schema in JSON string format.
    4. String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
    5. Dataset<Row> df = spark
    6. .readStream()
    7. .format("kafka")
    8. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    9. .option("subscribe", "topic1")
    10. // 1. Decode the Avro data into a struct;
    11. // 2. Filter by column `favorite_color`;
    12. // 3. Encode the column `name` in Avro format.
    13. Dataset<Row> output = df
    14. .select(from_avro(col("value"), jsonFormatSchema).as("user"))
    15. .where("user.favorite_color == \"red\"")
    16. .select(to_avro(col("user.name")).as("value"));
    17. StreamingQuery query = output
    18. .writeStream()
    19. .format("kafka")
    20. .option("topic", "topic2")
    21. .start();
    1. from pyspark.sql.avro.functions import from_avro, to_avro
    2. # `from_avro` requires Avro schema in JSON string format.
    3. jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
    4. df = spark\
    5. .readStream\
    6. .format("kafka")\
    7. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
    8. .option("subscribe", "topic1")\
    9. .load()
    10. # 1. Decode the Avro data into a struct;
    11. # 3. Encode the column `name` in Avro format.
    12. output = df\
    13. .select(from_avro("value", jsonFormatSchema).alias("user"))\
    14. .where('user.favorite_color == "red"')\
    15. .select(to_avro("user.name").alias("value"))
    16. query = output\
    17. .writeStream\
    18. .format("kafka")\
    19. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
    20. .option("topic", "topic2")\
    21. .start()

    Data Source Option

    Data source options of Avro can be set via:

    • the .option method on DataFrameReader or DataFrameWriter.
    • the options parameter in function from_avro.

    Configuration of Avro can be done using the setConf method on SparkSession or by running SET key=value commands using SQL.

    Property NameDefaultMeaningSince Version
    spark.sql.legacy.replaceDatabricksSparkAvro.enabledtrueIf it is set to true, the data source provider com.databricks.spark.avro is mapped to the built-in but external Avro data source module for backward compatibility.
    Note: the SQL config has been deprecated in Spark 3.2 and might be removed in the future.
    2.4.0
    spark.sql.avro.compression.codecsnappyCompression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.2.4.0
    spark.sql.avro.deflate.level-1Compression level for the deflate codec used in writing of AVRO files. Valid value must be in the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level in the current implementation.2.4.0
    spark.sql.avro.datetimeRebaseModeInReadEXCEPTIONThe rebasing mode for the values of the date, timestamp-micros, timestamp-millis logical types from the Julian to Proleptic Gregorian calendar:
    • EXCEPTION: Spark will fail the reading if it sees ancient dates/timestamps that are ambiguous between the two calendars.
    • CORRECTED: Spark will not do rebase and read the dates/timestamps as it is.
    • LEGACY: Spark will rebase dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar to Proleptic Gregorian calendar when reading Avro files.
    This config is only effective if the writer info (like Spark, Hive) of the Avro files is unknown.
    3.0.0
    spark.sql.avro.datetimeRebaseModeInWriteEXCEPTIONThe rebasing mode for the values of the date, timestamp-micros, timestamp-millis logical types from the Proleptic Gregorian to Julian calendar:
    • EXCEPTION: Spark will fail the writing if it sees ancient dates/timestamps that are ambiguous between the two calendars.
    • CORRECTED: Spark will not do rebase and write the dates/timestamps as it is.
    • LEGACY: Spark will rebase dates/timestamps from Proleptic Gregorian calendar to the legacy hybrid (Julian + Gregorian) calendar when writing Avro files.
    3.0.0
    spark.sql.avro.filterPushdown.enabledtrueWhen true, enable filter pushdown to Avro datasource.3.1.0

    Compatibility with Databricks spark-avro

    This Avro data source module is originally from and compatible with Databricks’s open source repository .

    By default with the SQL configuration spark.sql.legacy.replaceDatabricksSparkAvro.enabled enabled, the data source provider com.databricks.spark.avro is mapped to this built-in Avro module. For the Spark tables created with Provider property as com.databricks.spark.avro in catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module.

    Note in Databricks’s spark-avro, implicit classes AvroDataFrameWriter and AvroDataFrameReader were created for shortcut function .avro(). In this built-in but external module, both implicit classes are removed. Please use .format("avro") in DataFrameWriter or DataFrameReader instead, which should be clean and good enough.

    Currently Spark supports reading all primitive types and under records of Avro.

    In addition to the types listed above, it supports reading union types. The following three types are considered basic union types:

    1. union(int, long) will be mapped to LongType.
    2. , where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet.

    It also supports reading the following Avro logical types:

    Avro logical typeAvro typeSpark SQL type
    dateintDateType
    timestamp-millislongTimestampType
    timestamp-microslongTimestampType
    decimalfixedDecimalType
    decimalbytesDecimalType

    At the moment, it ignores docs, aliases and other properties present in the Avro file.

    Supported types for Spark SQL -> Avro conversion

    Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below:

    You can also specify the whole output Avro schema with the option avroSchema, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema:

    Spark SQL typeAvro typeAvro logical type
    BinaryTypefixed
    StringTypeenum
    TimestampTypelongtimestamp-millis
    DecimalTypebytesdecimal