Scala API Extensions

    If you want to enjoy the full Scala experience you can choose to opt-in toextensions that enhance the Scala API via implicit conversions.

    To use all the available extensions, you can just add a simple for theDataSet API

    1. import org.apache.flink.streaming.api.scala.extensions._

    Alternatively, you can import individual extensions a-là-carte to only use thoseyou prefer.

    Normally, both the DataSet and DataStream APIs don’t accept anonymous patternmatching functions to deconstruct tuples, case classes or collections, like thefollowing:

    1. val data: DataSet[(Int, String, Double)] = // [...]
    2. data.map {
    3. case (id, name, temperature) => // [...]
    4. // The previous line causes the following compilation error:
    5. // "The argument types of an anonymous function must be fully known. (SLS 8.5)"
    6. }

    DataSet API

    DataStream API

    MethodOriginalExample
    mapWithmap (DataStream)
    1. data.mapWith { case (, value) => value.toString}
    flatMapWithflatMap (DataStream)
      filterWithfilter (DataStream)
      keyingBykeyBy (DataStream)
      1. data.keyingBy { case (id, , ) => id}
      mapWithmap (ConnectedDataStream)
      flatMapWithflatMap (ConnectedDataStream)
      1. data.flatMapWith( flatMap1 = case (, json) => parse(json), flatMap2 = case (, , json, ) => parse(json))
      keyingBykeyBy (ConnectedDataStream)
      1. data.keyingBy( key1 = case (, timestamp) => timestamp, key2 = case (id, , ) => id)
      reduceWithreduce (KeyedStream, WindowedStream)
      1. data.reduceWith { case ((, sum1), (, sum2) => sum1 + sum2}
      foldWithfold (KeyedStream, WindowedStream)
      1. data.foldWith(User(bought = 0)) { case (User(b), (, items)) => User(b + items.size)}
      applyWithapply (WindowedStream)
      1. data.applyWith(0)( foldFunction = case (sum, amount) => sum + amount windowFunction = case (k, w, sum) => // […])
      projectingapply (JoinedStream)
      1. data1.join(data2). whereClause(case (pk, ) => pk). isEqualTo(case (, fk) => fk). projecting { case ((pk, tx), (products, fk)) => tx -> products }

      For more information on the semantics of each method, please refer to theDataSet and API documentation.

      To use this extension exclusively, you can add the following import:

      1. import org.apache.flink.api.scala.extensions.acceptPartialFunctions

      The following snippet shows a minimal example of how to use these extensionmethods together (with the DataSet API):