• Version
    • Snapshot Repository
      • Maven
      • Gradle

    influxdb-client-scala

    The reference Scala client that allows query and write for the InfluxDB 2.0 by .

    The QueryScalaApi is based on the . The streaming can be configured by:

    • - Size of a buffer for incoming responses. Default 10000.
    • overflowStrategy - Strategy that is used when incoming response cannot fit inside the buffer. Default akka.stream.OverflowStrategies.Backpressure.

    The following example demonstrates querying using the Flux language:

    1. package example
    2. import akka.actor.ActorSystem
    3. import akka.stream.scaladsl.Sink
    4. import com.influxdb.client.scala.InfluxDBClientScalaFactory
    5. import com.influxdb.query.FluxRecord
    6. import scala.concurrent.Await
    7. import scala.concurrent.duration.Duration
    8. object InfluxDB2ScalaExample {
    9. implicit val system: ActorSystem = ActorSystem("it-tests")
    10. def main(args: Array[String]): Unit = {
    11. val influxDBClient = InfluxDBClientScalaFactory
    12. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
    13. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
    14. + " |> range(start: -1d)"
    15. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))")
    16. val results = influxDBClient.getQueryScalaApi().query(fluxQuery)
    17. //Example of additional result stream processing on client side
    18. val sink = results
    19. //filter on client side using `filter` built-in operator
    20. .filter(it => "cpu0" == it.getValueByKey("cpu"))
    21. //take first 20 records
    22. .take(20)
    23. //print results
    24. .runWith(Sink.foreach[FluxRecord](it => println(s"Measurement: ${it.getMeasurement}, value: ${it.getValue}")
    25. ))
    26. // wait to finish
    27. Await.result(sink, Duration.Inf)
    28. system.terminate()
    29. }
    30. }

    It is possible to parse a result line-by-line using the queryRaw method:

    1. package example
    2. import akka.actor.ActorSystem
    3. import akka.stream.scaladsl.Sink
    4. import com.influxdb.client.scala.InfluxDBClientScalaFactory
    5. import scala.concurrent.Await
    6. import scala.concurrent.duration.Duration
    7. object InfluxDB2ScalaExampleRaw {
    8. implicit val system: ActorSystem = ActorSystem("it-tests")
    9. def main(args: Array[String]): Unit = {
    10. val influxDBClient = InfluxDBClientScalaFactory
    11. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
    12. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
    13. + " |> range(start: -5m)"
    14. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))"
    15. + " |> sample(n: 5, pos: 1)")
    16. val sink = influxDBClient.getQueryScalaApi().queryRaw(fluxQuery)
    17. //print results
    18. .runWith(Sink.foreach[String](it => println(s"Line: $it")))
    19. // wait to finish
    20. Await.result(sink, Duration.Inf)
    21. influxDBClient.close()
    22. system.terminate()
    23. }
    24. }

    A client can be configured via configuration file. The configuration file has to be named as influx2.properties and has to be in root of classpath.

    The influx2.readTimeout, influx2.writeTimeout and supports ms, s and m as unit. Default is milliseconds.

    Configuration example
    1. influx2.url=http://localhost:8086
    2. influx2.org=my-org
    3. influx2.bucket=my-bucket
    4. influx2.token=my-token
    5. influx2.logLevel=BODY
    6. influx2.readTimeout=5s
    7. influx2.writeTimeout=10s
    8. influx2.connectTimeout=5s

    and then:

    Client connection string

    A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.

    1. val influxDBClient = InfluxDBClientScalaFactory
    2. .create("http://localhost:8086?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)

    The following options are supported:

    Property namedefaultdescription
    org-default destination organization for writes and queries
    bucket-default destination bucket for writes
    token-the token to use for the authorization
    logLevelNONErest client verbosity level
    readTimeout10000 msread timeout
    writeTimeout10000 mswrite timeout
    connectTimeout10000 mssocket timeout

    The readTimeout, writeTimeout and connectTimeout supports ms, s and m as unit. Default is milliseconds.

    1. influxDBClient.enableGzip();

    Log HTTP Request and Response

    The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that applying the BODY LogLevel will disable chunking while streaming and will load the whole response into memory.

    1. influxDBClient.setLogLevel(LogLevel.HEADERS)

    Server availability can be checked using the influxDBClient.health() endpoint.

    Construct queries using the flux-dsl query builder

    The latest version for Maven dependency:

    1. <dependency>
    2. <groupId>com.influxdb</groupId>
    3. <artifactId>influxdb-client-scala</artifactId>
    4. <version>1.13.0</version>
    5. </dependency>

    Or when using with Gradle:

    1. dependencies {
    2. compile "com.influxdb:influxdb-client-scala:1.13.0"
    3. }

    The snapshots are deployed into .

    Maven

    1. <repository>
    2. <id>ossrh</id>
    3. <name>OSS Snapshot repository</name>
    4. <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
    5. <releases>
    6. <enabled>false</enabled>
    7. </releases>
    8. <snapshots>
    9. <enabled>true</enabled>

    Gradle