TDengine Rust Connector

taos is the official Rust connector for TDengine. Rust developers can develop applications to access the TDengine instance data.

taos provides two ways to establish connections. One is the Native Connection, which connects to TDengine instances via the TDengine client driver (taosc). The other is the WebSocket connection, which connects to TDengine instances via the WebSocket interface provided by taosAdapter. You can specify a connection type with Cargo features. By default, both types are supported. The Websocket connection can be used on any platform. The native connection can be used on any platform that the TDengine Client supports.

The source code for the Rust connectors is located on .

Native connections are supported on the same platforms as the TDengine client driver. Websocket connections are supported on all platforms that can run Go.

Version support

Please refer to version support list

The Rust Connector is still under rapid development and is not guaranteed to be backward compatible before 1.0. We recommend using TDengine version 3.0 or higher to avoid known issues.

  • Install the Rust development toolchain
  • If using the native connection, please install the TDengine client driver. Please refer to

Add taos dependency

Depending on the connection method, add the dependency in your Rust project as follows:

  • Support Both
  • Websocket only
  • native connection only

In cargo.toml, add taos:

In cargo.toml, add and enable the ws feature:

  1. [dependencies]
  2. taos = { version = "*", default-features = false, features = ["ws"] }

In cargo.toml, add taos and enable the native feature:

  1. [dependencies]
  2. taos = { version = "*", default-features = false, features = ["native"] }

Establishing a connection

TaosBuilder creates a connection constructor through the DSN connection description string.

  1. let builder = TaosBuilder::from_dsn("taos://")?;

You can now use this object to create the connection.

  1. let conn = builder.build()?;

The connection object can create more than one.

  1. let conn1 = builder.build()?;
  2. let conn2 = builder.build()?;

The structure of the DSN description string is as follows:

  1. <driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
  2. |------|------------|---|-----------|-----------|------|------|------------|-----------------------|
  3. |driver| protocol | | username | password | host | port | database | params |

The parameters are described as follows:

  • driver: Specify a driver name so that the connector can choose which method to use to establish the connection. Supported driver names are as follows:
    • taos: Table names use the TDengine connector driver.
    • tmq: Use the TMQ to subscribe to data.
    • http/ws: Use Websocket to establish connections.
    • https/wss: Use Websocket to establish connections, and enable SSL/TLS.
  • protocol: Specify which connection method to use. For example, taos+ws://localhost:6041 uses Websocket to establish connections.
  • username/password: Username and password used to create connections.
  • host/port: Specifies the server and port to establish a connection. If you do not specify a hostname or port, native connections default to localhost:6030 and Websocket connections default to localhost:6041.
  • database: Specify the default database to connect to. It’s optional.
  • params:Optional parameters.

A sample DSN description string is as follows:

  1. taos+ws://localhost:6041/test

This indicates that the Websocket connection method is used on port 6041 to connect to the server localhost and use the database test by default.

You can create DSNs to connect to servers in your environment.

  1. use taos::*;
  2. // use native protocol.
  3. let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
  4. let conn1 = builder.build();
  5. // use websocket protocol.
  6. let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
  1. async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
  2. // prepare database
  3. taos.exec_many([
  4. format!("DROP DATABASE IF EXISTS `{db}`"),
  5. format!("CREATE DATABASE `{db}`"),
  6. format!("USE `{db}`"),
  7. ])
  8. .await?;
  9. let inserted = taos.exec_many([
  10. // create super table
  11. "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
  12. TAGS (`groupid` INT, `location` BINARY(24))",
  13. // create child table
  14. "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
  15. // insert into child table
  16. "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
  17. // insert with NULL values
  18. "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
  19. // insert and automatically create table with tags if not exists
  20. "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
  21. // insert many records in a single sql
  22. "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
  23. ]).await?;
  24. assert_eq!(inserted, 6);
  25. let mut result = taos.query("select * from `meters`").await?;
  26. for field in result.fields() {
  27. println!("got field: {}", field.name());
  28. }
  29. let values = result.
  30. }

There are two ways to query data: Using built-in types or the deserialization framework.

  1. // Query option 1, use rows stream.
  2. let mut rows = result.rows();
  3. while let Some(row) = rows.try_next().await? {
  4. for (name, value) in row {
  5. println!("got value of {}: {}", name, value);
  6. }
  7. // Query options 2, use deserialization with serde.
  8. #[derive(Debug, serde::Deserialize)]
  9. #[allow(dead_code)]
  10. struct Record {
  11. // deserialize timestamp to chrono::DateTime<Local>
  12. ts: DateTime<Local>,
  13. // float to f32
  14. current: Option<f32>,
  15. // int to i32
  16. voltage: Option<i32>,
  17. phase: Option<f32>,
  18. groupid: i32,
  19. // binary/varchar to String
  20. location: String,
  21. }
  22. let records: Vec<Record> = taos
  23. .query("select * from `meters`")
  24. .deserialize()
  25. .try_collect()
  26. .await?;
  27. dbg!(records);
  28. Ok(())

Write data

SQL Write

STMT Write

  1. use taos::*;
  2. #[tokio::main]
  3. async fn main() -> anyhow::Result<()> {
  4. let taos = TaosBuilder::from_dsn("taos://")?.build()?;
  5. taos.create_database("power").await?;
  6. taos.use_database("power").await?;
  7. taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
  8. let mut stmt = Stmt::init(&taos)?;
  9. stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
  10. // bind table name and tags
  11. stmt.set_tbname_tags(
  12. "d1001",
  13. &[
  14. Value::VarChar("California.SanFransico".into()),
  15. Value::Int(2),
  16. ],
  17. )?;
  18. // bind values.
  19. let values = vec![
  20. ColumnView::from_millis_timestamp(vec![1648432611249]),
  21. ColumnView::from_floats(vec![10.3]),
  22. ColumnView::from_ints(vec![219]),
  23. ColumnView::from_floats(vec![0.31]),
  24. ];
  25. stmt.bind(&values)?;
  26. // bind one more row
  27. let values2 = vec![
  28. ColumnView::from_millis_timestamp(vec![1648432611749]),
  29. ColumnView::from_floats(vec![12.6]),
  30. ColumnView::from_ints(vec![218]),
  31. ColumnView::from_floats(vec![0.33]),
  32. ];
  33. stmt.bind(&values2)?;
  34. stmt.add_batch()?;
  35. // execute.
  36. let rows = stmt.execute()?;
  37. assert_eq!(rows, 2);
  38. Ok(())
  39. }

  1. use taos::sync::*;
  2. fn main() -> anyhow::Result<()> {
  3. let taos = TaosBuilder::from_dsn("ws:///power")?.build()?;
  4. let mut result = taos.query("SELECT ts, current FROM meters LIMIT 2")?;
  5. // print column names
  6. let meta = result.fields();
  7. println!("{}", meta.iter().map(|field| field.name()).join("\t"));
  8. // print rows
  9. let rows = result.rows();
  10. for row in rows {
  11. let row = row?;
  12. for (_name, value) in row {
  13. print!("{}\t", value);
  14. }
  15. println!();
  16. }
  17. Ok(())
  18. }
  19. // output(suppose you are in +8 timezone):
  20. // ts current
  21. // 2018-10-03T14:38:05+08:00 10.3
  22. // 2018-10-03T14:38:15+08:00 12.6

API Reference

Connector Constructor

You create a connector constructor by using a DSN.

    You use the builder object to create multiple connections.

    1. let conn: Taos = cfg.build();

    In complex applications, we recommend enabling connection pools. implements connection pools based on r2d2.

    As follows, a connection pool with default parameters can be generated.

    1. let pool = TaosBuilder::from_dsn(dsn)?.pool()?;

    You can set the same connection pool parameters using the connection pool’s constructor.

    1. let dsn = "taos://localhost:6030";
    2. let opts = PoolBuilder::new()
    3. .max_size(5000) // max connections
    4. .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
    5. .min_idle(Some(1000)) // minimal idle connections
    6. .connection_timeout(Duration::from_secs(2));
    7. let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;

    In the application code, use pool.get()? to get a connection object .

    1. let taos = pool.get()?;

    Connectors

    The object provides an API to perform operations on multiple databases.

    1. exec: Execute some non-query SQL statements, such as CREATE, ALTER, INSERT, etc.

    2. exec_many: Run multiple SQL statements simultaneously or in order.

      1. taos.exec_many([
      2. "CREATE DATABASE test",
      3. "USE test",
      4. "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
      5. ]).await?;
    3. query: Run a query statement and return a [ResultSet] object.

      1. let mut q = taos.query("select * from log.logs").await?;

      The [ResultSet] object stores query result data and the names, types, and lengths of returned columns

      You can obtain column information by using [.fields()].

      It fetches data line by line.

      1. let mut rows = result.rows();
      2. let mut nrows = 0;
      3. while let Some(row) = rows.try_next().await? {
      4. for (col, (name, value)) in row.enumerate() {
      5. println!(
      6. "[{}] got value in col {} (named `{:>8}`): {}",
      7. nrows, col, name, value
      8. );
      9. }
      10. nrows += 1;
      11. }

      Or use the serde deserialization framework.

      1. #[derive(Debug, Deserialize)]
      2. struct Record {
      3. // deserialize timestamp to chrono::DateTime<Local>
      4. ts: DateTime<Local>,
      5. // float to f32
      6. current: Option<f32>,
      7. // int to i32
      8. voltage: Option<i32>,
      9. phase: Option<f32>,
      10. groupid: i32,
      11. // binary/varchar to String
      12. location: String,
      13. }
      14. let records: Vec<Record> = taos
      15. .query("select * from `meters`")
      16. .await?
      17. .deserialize()
      18. .try_collect()
      19. .await?;

    provides Rust methods for some SQL statements to reduce the number of format!s.

    • .describe(table: &str): Executes DESCRIBE and returns a Rust data structure.
    • .create_database(database: &str): Executes the CREATE DATABASE statement.
    • .use_database(database: &str): Executes the USE statement.

    In addition, this structure is also the entry point for [Parameter Binding](#Parameter Binding Interface) and [Line Protocol Interface](#Line Protocol Interface). Please refer to the specific API descriptions for usage.

    Bind Interface

    Similar to the C interface, Rust provides the bind interface’s wrapping. First, the object creates a parameter binding object Stmt for an SQL statement.

    1. let mut stmt = Stmt::init(&taos).await?;
    2. stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;

    The bind object provides a set of interfaces for implementing parameter binding.

    .set_tbname(name)

    To bind table names.

    1. let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
    2. stmt.set_tbname("d0")?;

    .set_tags(&[tag])

    Bind sub-table table names and tag values when the SQL statement uses a super table.

    1. let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
    2. stmt.set_tbname("d0")?;
    3. stmt.set_tags(&[Value::VarChar("taos".to_string())])?;

    .bind(&[column])

    Bind value types. Use the [ColumnView] structure to create and bind the required types.

    1. let params = vec![
    2. ColumnView::from_millis_timestamp(vec![164000000000]),
    3. ColumnView::from_bools(vec![true]),
    4. ColumnView::from_tiny_ints(vec![i8::MAX]),
    5. ColumnView::from_small_ints(vec![i16::MAX]),
    6. ColumnView::from_ints(vec![i32::MAX]),
    7. ColumnView::from_big_ints(vec![i64::MAX]),
    8. ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
    9. ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
    10. ColumnView::from_unsigned_ints(vec![u32::MAX]),
    11. ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
    12. ColumnView::from_floats(vec![f32::MAX]),
    13. ColumnView::from_doubles(vec![f64::MAX]),
    14. ColumnView::from_varchar(vec!["ABC"]),
    15. ColumnView::from_nchar(vec!["涛思数据"]),
    16. ];
    17. let rows = stmt.bind(&params)?.add_batch()?.execute()?;

    .execute()

    Execute SQL. objects can be reused, re-binded, and executed after execution. Before execution, ensure that all data has been added to the queue with .add_batch.

    1. stmt.execute()?;
    2. // next bind cycle.
    3. //stmt.set_tbname()?;
    4. //stmt.bind()?;
    5. //stmt.execute()?;

    For a working example, see GitHub.

    TDengine starts subscriptions through TMQ.

    You create a TMQ connector by using a DSN.

    1. let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;

    Create a consumer:

    1. let mut consumer = tmq.build()?;

    A single consumer can subscribe to one or more topics.

    1. consumer.subscribe(["tmq_meters"]).await?;

    The TMQ is of type. You can use the corresponding API to consume each message in the queue and then use .commit to mark them as consumed.

    Unsubscribe:

    1. consumer.unsubscribe().await;

    The following parameters can be configured for the TMQ DSN. Only group.id is mandatory.

    • group.id: Within a consumer group, load balancing is implemented by consuming messages on an at-least-once basis.
    • client.id: Subscriber client ID.
    • auto.offset.reset: Initial point of subscription. earliest subscribes from the beginning, and latest subscribes from the newest message. The default is earliest. Note: This parameter is set per consumer group.
    • enable.auto.commit: Automatically commits. This can be enabled when data consistency is not essential.
    • auto.commit.interval.ms: Interval for automatic commits.

    For more information, see GitHub sample file.

    For information about other structure APIs, see the .