Migrate continuous queries to tasks
If using the command, by default, all continuous queries are output to ~/continuous_queries.txt
during the upgrade process. To customize the destination path of the continuous queries file, use the --continuous-query-export-path
flag with the influxd upgrade
command.
To manually output continuous queries:
Use the InfluxDB 1.x
influx
interactive shell to runSHOW CONTINUOUS QUERIES
:$ influx
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.8.10
> SHOW CONTINUOUS QUERIES
Copy and save the displayed continuous queries.
To migrate InfluxDB 1.x continuous queries to InfluxDB 2.1 tasks, convert the InfluxQL query syntax to Flux. The majority of continuous queries are simple downsampling queries and can be converted quickly using the aggregateWindow() function. For example:
Example continuous query
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
BEGIN
INTO "my-db"."example-rp"."average-example-measurement"
FROM "example-measurement"
GROUP BY time(1h)
END
Equivalent Flux task
option task = {name: "downsample-daily", every: 1d}
from(bucket: "my-db/")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> filter(fn: (r) => r._field == "example-field")
|> aggregateWindow(every: 1h, fn: mean)
|> set(key: "_measurement", value: "average-example-measurement")
|> to(org: "example-org", bucket: "my-db/example-rp")
Review the following statements and clauses to see how to convert your CQs to Flux:
ON clause
The ON
clause defines the database to query. In InfluxDB OSS 2.1, database and retention policy combinations are mapped to specific buckets (for more information, see Database and retention policy mapping).
InfluxQL
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
-- ...
Flux
from(bucket: "my-db/")
// ...
SELECT statement
The SELECT
statement queries data by field, tag, and time from a specific measurement. SELECT
statements can take many different forms and converting them to Flux depends on your use case. For information about Flux and InfluxQL function parity, see Flux vs InfluxQL. See .
INTO clause
The INTO
clause defines the measurement to write results to. INTO
also supports fully-qualified measurements that include the database and retention policy. In InfluxDB OSS 2.1, database and retention policy combinations are mapped to specific buckets (for more information, see ).
To write to a measurement different than the measurement queried, use set() or to change the measurement name. Use the to()
function to specify the bucket to write results to.
InfluxQL
-- ...
INTO "example-db"."example-rp"."example-measurement"
-- ...
Flux
// ...
|> to(bucket: "example-db/example-rp")
Write pivoted data to InfluxDB
InfluxDB 1.x query results include a column for each field. InfluxDB 2.1 does not do this by default, but it is possible with or schema.fieldsAsCols().
If you use to()
to write pivoted data back to InfluxDB 2.1, each field column is stored as a tag. To write pivoted fields back to InfluxDB as fields, import the experimental
package and use the .
InfluxQL
CREATE CONTINUOUS QUERY "downsample-daily" ON "my-db"
BEGIN
SELECT mean("example-field-1"), mean("example-field-2")
INTO "example-db"."example-rp"."example-measurement"
FROM "example-measurement"
GROUP BY time(1h)
END
Flux
// ...
from(bucket: "my-db/")
|> range(start: -task.every)
|> filter(fn: (r) => r._measurement == "example-measurement")
|> filter(fn: (r) => r._field == "example-field-1" or r._field == "example-field-2")
|> aggregateWindow(every: task.every, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> experimental.to(bucket: "example-db/example-rp")
FROM clause
The from clause defines the measurement to query. Use the to specify the measurement to query.
InfluxQL
-- ...
FROM "example-measurement"
-- ...
Flux
|> filter(fn: (r) => r._measurement == "example-measurement")
AS clause
InfluxQL
-- ...
AS newfield
-- ...
Flux
// ...
|> set(key: "_field", value: "newfield")
// ...
|> map(fn: (r) => ({ r with _field: "newfield"}))
WHERE clause
The WHERE
clause uses predicate logic to filter results based on fields, tags, or timestamps. Use the filter() function and Flux to filter results based on fields and tags. Use the range() function to filter results based on timestamps.
InfluxQL
Flux
// ...
|> range(start: -7d)
|> filter(fn: (r) => r["example-tag"] == "foo")
GROUP BY clause
The InfluxQL GROUP BY
clause groups data by specific tags or by time (typically to calculate an aggregate value for windows of time).
Group by tags
Use the to modify the group key and change how data is grouped.
InfluxQL
-- ...
GROUP BY "location"
Flux
|> group(columns: ["location"])
Group by time
Use the aggregateWindow() function to group data into time windows and perform an aggregation on each window. In CQs, the interval specified in the GROUP BY time()
clause determines the CQ execution interval. Use the GROUP BY time()
interval to set the every
task option.
InfluxQL
-- ...
SELECT MEAN("example-field")
FROM "example-measurement"
GROUP BY time(1h)
Flux
option task = {name: "task-name", every: 1h}
// ...
|> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field")
|> aggregateWindow(every: task.every, fn: mean)
RESAMPLE clause
The CQ RESAMPLE
clause uses data from the last specified duration to calculate a new aggregate point. The EVERY
interval in RESAMPLE
defines how often the CQ runs. The FOR
interval defines the total time range queried by the CQ.
To accomplish this same functionality in a Flux task, set the start
parameter in the range()
function to the negative FOR
duration. Define the task execution interval in the task
options. For example:
InfluxQL
CREATE CONTINUOUS QUERY "resample-example" ON "my-db"
RESAMPLE EVERY 1m FOR 30m
BEGIN
SELECT exponential_moving_average(mean("example-field"), 30)
INTO "resample-average-example-measurement"
FROM "example-measurement"
WHERE region = 'example-region'
GROUP BY time(1m)
END
Flux
option task = {name: "resample-example", every: 1m}
from(bucket: "my-db/")
|> range(start: -30m)
|> filter(fn: (r) => r._measurement == "example-measurement" and r._field == "example-field" and r.region == "example-region")
|> aggregateWindow(every: 1m, fn: mean)
|> exponentialMovingAverage(n: 30)
|> set(key: "_measurement", value: "resample-average-example-measurement")
|> to(bucket: "my-db/")
The following resources are available and may be helpful when converting continuous queries to Flux tasks.
Documentation
Community
- Post in the
- Ask in the InfluxDB Community Slack