influxdb-client-scala
The reference Scala client that allows query and write for the InfluxDB 2.0 by .
The QueryScalaApi is based on the . The streaming can be configured by:
- - Size of a buffer for incoming responses. Default 10000.
overflowStrategy
- Strategy that is used when incoming response cannot fit inside the buffer. Defaultakka.stream.OverflowStrategies.Backpressure
.
The following example demonstrates querying using the Flux language:
package example
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import com.influxdb.client.scala.InfluxDBClientScalaFactory
import com.influxdb.query.FluxRecord
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object InfluxDB2ScalaExample {
implicit val system: ActorSystem = ActorSystem("it-tests")
def main(args: Array[String]): Unit = {
val influxDBClient = InfluxDBClientScalaFactory
.create("http://localhost:8086", "my-token".toCharArray, "my-org")
val fluxQuery = ("from(bucket: \"my-bucket\")\n"
+ " |> range(start: -1d)"
+ " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))")
val results = influxDBClient.getQueryScalaApi().query(fluxQuery)
//Example of additional result stream processing on client side
val sink = results
//filter on client side using `filter` built-in operator
.filter(it => "cpu0" == it.getValueByKey("cpu"))
//take first 20 records
.take(20)
//print results
.runWith(Sink.foreach[FluxRecord](it => println(s"Measurement: ${it.getMeasurement}, value: ${it.getValue}")
))
// wait to finish
Await.result(sink, Duration.Inf)
system.terminate()
}
}
It is possible to parse a result line-by-line using the queryRaw
method:
package example
import akka.actor.ActorSystem
import akka.stream.scaladsl.Sink
import com.influxdb.client.scala.InfluxDBClientScalaFactory
import scala.concurrent.Await
import scala.concurrent.duration.Duration
object InfluxDB2ScalaExampleRaw {
implicit val system: ActorSystem = ActorSystem("it-tests")
def main(args: Array[String]): Unit = {
val influxDBClient = InfluxDBClientScalaFactory
.create("http://localhost:8086", "my-token".toCharArray, "my-org")
val fluxQuery = ("from(bucket: \"my-bucket\")\n"
+ " |> range(start: -5m)"
+ " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))"
+ " |> sample(n: 5, pos: 1)")
val sink = influxDBClient.getQueryScalaApi().queryRaw(fluxQuery)
//print results
.runWith(Sink.foreach[String](it => println(s"Line: $it")))
// wait to finish
Await.result(sink, Duration.Inf)
influxDBClient.close()
system.terminate()
}
}
A client can be configured via configuration file. The configuration file has to be named as influx2.properties
and has to be in root of classpath.
The influx2.readTimeout
, influx2.writeTimeout
and supports ms
, s
and m
as unit. Default is milliseconds.
Configuration example
influx2.url=http://localhost:8086
influx2.org=my-org
influx2.bucket=my-bucket
influx2.token=my-token
influx2.logLevel=BODY
influx2.readTimeout=5s
influx2.writeTimeout=10s
influx2.connectTimeout=5s
and then:
Client connection string
A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.
val influxDBClient = InfluxDBClientScalaFactory
.create("http://localhost:8086?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)
The following options are supported:
Property name | default | description |
---|---|---|
org | - | default destination organization for writes and queries |
bucket | - | default destination bucket for writes |
token | - | the token to use for the authorization |
logLevel | NONE | rest client verbosity level |
readTimeout | 10000 ms | read timeout |
writeTimeout | 10000 ms | write timeout |
connectTimeout | 10000 ms | socket timeout |
The readTimeout
, writeTimeout
and connectTimeout
supports ms
, s
and m
as unit. Default is milliseconds.
influxDBClient.enableGzip();
Log HTTP Request and Response
The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that applying the BODY
LogLevel will disable chunking while streaming and will load the whole response into memory.
influxDBClient.setLogLevel(LogLevel.HEADERS)
Server availability can be checked using the influxDBClient.health()
endpoint.
Construct queries using the flux-dsl query builder
The latest version for Maven dependency:
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-scala</artifactId>
<version>1.13.0</version>
</dependency>
Or when using with Gradle:
dependencies {
compile "com.influxdb:influxdb-client-scala:1.13.0"
}
The snapshots are deployed into .
Maven
<repository>
<id>ossrh</id>
<name>OSS Snapshot repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>