TDengine Rust Connector

    taos 是 TDengine 的官方 Rust 语言连接器。Rust 开发人员可以通过它开发存取 TDengine 数据库的应用软件。

    taos 提供两种建立连接的方式。一种是原生连接,它通过 TDengine 客户端驱动程序(taosc)连接 TDengine 运行实例。另外一种是 Websocket 连接,它通过 taosAdapter 的 Websocket 接口连接 TDengine 运行实例。你可以通过不同的 “特性(即 Cargo 关键字 features)” 来指定使用哪种连接器(默认同时支持)。Websocket 连接支持任何平台,原生连接支持所有 TDengine 客户端能运行的平台。

    该 Rust 连接器的源码托管在 GitHub

    原生连接支持的平台和 TDengine 客户端驱动支持的平台一致。 Websocket 连接支持所有能运行 Rust 的平台。

    版本支持

    请参考

    Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。

    • 安装 Rust 开发工具链
    • 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考安装客户端驱动

    添加 taos 依赖

    根据选择的连接方式,按照如下说明在 Rust 项目中添加 依赖:

    • 同时支持
    • 仅 Websocket
    • 仅原生连接

    Cargo.toml 文件中添加 taos

    Cargo.toml 文件中添加 ,并启用 ws 特性。

    1. [dependencies]
    2. taos = { version = "*", default-features = false, features = ["ws"] }

    Cargo.toml 文件中添加 taos,并启用 native 特性:

    1. [dependencies]
    2. taos = { version = "*", default-features = false, features = ["native"] }

    建立连接

    TaosBuilder 通过 DSN 连接描述字符串创建一个连接构造器。

    1. let builder = TaosBuilder::from_dsn("taos://")?;

    现在您可以使用该对象创建连接:

    1. let conn = builder.build()?;

    连接对象可以创建多个:

    1. let conn1 = builder.build()?;
    2. let conn2 = builder.build()?;

    DSN 描述字符串基本结构如下:

    1. <driver>[+<protocol>]://[[<username>:<password>@]<host>:<port>][/<database>][?<p1>=<v1>[&<p2>=<v2>]]
    2. |------|------------|---|-----------|-----------|------|------|------------|-----------------------|
    3. |driver| protocol | | username | password | host | port | database | params |

    各部分意义见下表:

    • driver: 必须指定驱动名以便连接器选择何种方式创建连接,支持如下驱动名:
      • taos: 表名使用 TDengine 连接器驱动。
      • tmq: 使用 TMQ 订阅数据。
      • http/ws: 使用 Websocket 创建连接。
      • https/wss: 在 Websocket 连接方式下显示启用 SSL/TLS 连接。
    • protocol: 显示指定以何种方式建立连接,例如:taos+ws://localhost:6041 指定以 Websocket 方式建立连接。
    • username/password: 用于创建连接的用户名及密码。
    • host/port: 指定创建连接的服务器及端口,当不指定服务器地址及端口时(taos://),原生连接默认为 localhost:6030,Websocket 连接默认为 localhost:6041
    • database: 指定默认连接的数据库名,可选参数。
    • params:其他可选参数。

    一个完整的 DSN 描述字符串示例如下:

    1. taos+ws://localhost:6041/test

    表示使用 Websocket(ws)方式通过 6041 端口连接服务器 localhost,并指定默认数据库为 test

    这使得用户可以通过 DSN 指定连接方式:

    1. use taos::*;
    2. // use native protocol.
    3. let builder = TaosBuilder::from_dsn("taos://localhost:6030")?;
    4. let conn1 = builder.build();
    5. // use websocket protocol.
    6. let conn2 = TaosBuilder::from_dsn("taos+ws://localhost:6041")?;
    1. async fn demo(taos: &Taos, db: &str) -> Result<(), Error> {
    2. // prepare database
    3. taos.exec_many([
    4. format!("DROP DATABASE IF EXISTS `{db}`"),
    5. format!("CREATE DATABASE `{db}`"),
    6. format!("USE `{db}`"),
    7. ])
    8. .await?;
    9. let inserted = taos.exec_many([
    10. // create super table
    11. "CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) \
    12. TAGS (`groupid` INT, `location` BINARY(24))",
    13. // create child table
    14. "CREATE TABLE `d0` USING `meters` TAGS(0, 'California.LosAngles')",
    15. // insert into child table
    16. "INSERT INTO `d0` values(now - 10s, 10, 116, 0.32)",
    17. // insert with NULL values
    18. "INSERT INTO `d0` values(now - 8s, NULL, NULL, NULL)",
    19. // insert and automatically create table with tags if not exists
    20. "INSERT INTO `d1` USING `meters` TAGS(1, 'California.SanFrancisco') values(now - 9s, 10.1, 119, 0.33)",
    21. // insert many records in a single sql
    22. "INSERT INTO `d1` values (now-8s, 10, 120, 0.33) (now - 6s, 10, 119, 0.34) (now - 4s, 11.2, 118, 0.322)",
    23. ]).await?;
    24. assert_eq!(inserted, 6);
    25. let mut result = taos.query("select * from `meters`").await?;
    26. for field in result.fields() {
    27. println!("got field: {}", field.name());
    28. }
    29. let values = result.
    30. }

    查询数据可以通过两种方式:使用内建类型或 序列化框架。

    1. // Query option 1, use rows stream.
    2. let mut rows = result.rows();
    3. while let Some(row) = rows.try_next().await? {
    4. for (name, value) in row {
    5. println!("got value of {}: {}", name, value);
    6. }
    7. // Query options 2, use deserialization with serde.
    8. #[derive(Debug, serde::Deserialize)]
    9. #[allow(dead_code)]
    10. struct Record {
    11. // deserialize timestamp to chrono::DateTime<Local>
    12. ts: DateTime<Local>,
    13. // float to f32
    14. current: Option<f32>,
    15. // int to i32
    16. voltage: Option<i32>,
    17. phase: Option<f32>,
    18. groupid: i32,
    19. // binary/varchar to String
    20. location: String,
    21. let records: Vec<Record> = taos
    22. .query("select * from `meters`")
    23. .await?
    24. .deserialize()
    25. .try_collect()
    26. .await?;
    27. dbg!(records);
    28. Ok(())

    写入数据

    SQL 写入

    STMT 写入

    1. use taos::*;
    2. #[tokio::main]
    3. async fn main() -> anyhow::Result<()> {
    4. let taos = TaosBuilder::from_dsn("taos://")?.build()?;
    5. taos.create_database("power").await?;
    6. taos.use_database("power").await?;
    7. taos.exec("CREATE STABLE IF NOT EXISTS meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)").await?;
    8. let mut stmt = Stmt::init(&taos)?;
    9. stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;
    10. // bind table name and tags
    11. stmt.set_tbname_tags(
    12. "d1001",
    13. &[
    14. Value::VarChar("California.SanFransico".into()),
    15. Value::Int(2),
    16. ],
    17. )?;
    18. // bind values.
    19. let values = vec![
    20. ColumnView::from_millis_timestamp(vec![1648432611249]),
    21. ColumnView::from_floats(vec![10.3]),
    22. ColumnView::from_ints(vec![219]),
    23. ColumnView::from_floats(vec![0.31]),
    24. ];
    25. stmt.bind(&values)?;
    26. // bind one more row
    27. let values2 = vec![
    28. ColumnView::from_millis_timestamp(vec![1648432611749]),
    29. ColumnView::from_floats(vec![12.6]),
    30. ColumnView::from_ints(vec![218]),
    31. ColumnView::from_floats(vec![0.33]),
    32. ];
    33. stmt.bind(&values2)?;
    34. stmt.add_batch()?;
    35. // execute.
    36. let rows = stmt.execute()?;
    37. assert_eq!(rows, 2);
    38. Ok(())
    39. }

    1. use taos::sync::*;
    2. fn main() -> anyhow::Result<()> {
    3. let taos = TaosBuilder::from_dsn("ws:///power")?.build()?;
    4. let mut result = taos.query("SELECT ts, current FROM meters LIMIT 2")?;
    5. // print column names
    6. let meta = result.fields();
    7. println!("{}", meta.iter().map(|field| field.name()).join("\t"));
    8. // print rows
    9. let rows = result.rows();
    10. for row in rows {
    11. let row = row?;
    12. for (_name, value) in row {
    13. print!("{}\t", value);
    14. }
    15. println!();
    16. }
    17. Ok(())
    18. }
    19. // output(suppose you are in +8 timezone):
    20. // ts current
    21. // 2018-10-03T14:38:05+08:00 10.3
    22. // 2018-10-03T14:38:15+08:00 12.6

    API 参考

    连接构造器

    通过 DSN 来构建一个连接器构造器。

      使用 builder 对象创建多个连接:

      1. let conn: Taos = cfg.build();

      连接池

      在复杂应用中,建议启用连接池。 的连接池使用 r2d2 实现。

      如下,可以生成一个默认参数的连接池。

      1. let pool = TaosBuilder::from_dsn(dsn)?.pool()?;

      同样可以使用连接池的构造器,对连接池参数进行设置:

      1. let dsn = "taos://localhost:6030";
      2. let opts = PoolBuilder::new()
      3. .max_size(5000) // max connections
      4. .max_lifetime(Some(Duration::from_secs(60 * 60))) // lifetime of each connection
      5. .min_idle(Some(1000)) // minimal idle connections
      6. .connection_timeout(Duration::from_secs(2));
      7. let pool = TaosBuilder::from_dsn(dsn)?.with_pool_builder(opts)?;

      在应用代码中,使用 pool.get()? 来获取一个连接对象 。

      1. let taos = pool.get()?;

      对象提供了多个数据库操作的 API:

      1. exec: 执行某个非查询类 SQL 语句,例如 CREATE,,INSERT 等。

        1. let affected_rows = taos.exec("INSERT INTO tb1 VALUES(now, NULL)").await?;
      2. exec_many: 同时(顺序)执行多个 SQL 语句。

        1. taos.exec_many([
        2. "CREATE DATABASE test",
        3. "USE test",
        4. "CREATE TABLE `tb1` (`ts` TIMESTAMP, `val` INT)",
        5. ]).await?;
      3. query:执行查询语句,返回 [ResultSet] 对象。

        1. let mut q = taos.query("select * from log.logs").await?;

        [ResultSet] 对象存储了查询结果数据和返回的列的基本信息(列名,类型,长度):

        列信息使用 [.fields()] 方法获取:

        逐行获取数据:

        1. let mut rows = result.rows();
        2. let mut nrows = 0;
        3. while let Some(row) = rows.try_next().await? {
        4. for (col, (name, value)) in row.enumerate() {
        5. println!(
        6. "[{}] got value in col {} (named `{:>8}`): {}",
        7. nrows, col, name, value
        8. );
        9. }
        10. nrows += 1;
        11. }

        或使用 serde 序列化框架。

        1. #[derive(Debug, Deserialize)]
        2. struct Record {
        3. // deserialize timestamp to chrono::DateTime<Local>
        4. ts: DateTime<Local>,
        5. // float to f32
        6. current: Option<f32>,
        7. // int to i32
        8. voltage: Option<i32>,
        9. phase: Option<f32>,
        10. groupid: i32,
        11. // binary/varchar to String
        12. location: String,
        13. }
        14. let records: Vec<Record> = taos
        15. .query("select * from `meters`")
        16. .await?
        17. .deserialize()
        18. .try_collect()
        19. .await?;

      提供部分 SQL 的 Rust 方法化以减少 format! 代码块的频率:

      • .describe(table: &str): 执行 DESCRIBE 并返回一个 Rust 数据结构。
      • .create_database(database: &str): 执行 CREATE DATABASE 语句。
      • .use_database(database: &str): 执行 USE 语句。

      除此之外,该结构也是 参数绑定 和 的入口,使用方法请参考具体的 API 说明。

      参数绑定接口

      与 C 接口类似,Rust 提供参数绑定接口。首先,通过 对象创建一个 SQL 语句的参数绑定对象 Stmt

      1. let mut stmt = Stmt::init(&taos).await?;
      2. stmt.prepare("INSERT INTO ? USING meters TAGS(?, ?) VALUES(?, ?, ?, ?)")?;

      参数绑定对象提供了一组接口用于实现参数绑定:

      .set_tbname(name)

      用于绑定表名。

      1. let mut stmt = taos.stmt("insert into ? values(? ,?)")?;
      2. stmt.set_tbname("d0")?;

      .set_tags(&[tag])

      当 SQL 语句使用超级表时,用于绑定子表表名和标签值:

      1. let mut stmt = taos.stmt("insert into ? using stb0 tags(?) values(? ,?)")?;
      2. stmt.set_tbname("d0")?;
      3. stmt.set_tags(&[Value::VarChar("涛思".to_string())])?;

      .bind(&[column])

      用于绑定值类型。使用 [ColumnView] 结构体构建需要的类型并绑定:

      1. let params = vec![
      2. ColumnView::from_millis_timestamp(vec![164000000000]),
      3. ColumnView::from_bools(vec![true]),
      4. ColumnView::from_tiny_ints(vec![i8::MAX]),
      5. ColumnView::from_small_ints(vec![i16::MAX]),
      6. ColumnView::from_ints(vec![i32::MAX]),
      7. ColumnView::from_big_ints(vec![i64::MAX]),
      8. ColumnView::from_unsigned_tiny_ints(vec![u8::MAX]),
      9. ColumnView::from_unsigned_small_ints(vec![u16::MAX]),
      10. ColumnView::from_unsigned_ints(vec![u32::MAX]),
      11. ColumnView::from_unsigned_big_ints(vec![u64::MAX]),
      12. ColumnView::from_floats(vec![f32::MAX]),
      13. ColumnView::from_doubles(vec![f64::MAX]),
      14. ColumnView::from_varchar(vec!["ABC"]),
      15. ColumnView::from_nchar(vec!["涛思数据"]),
      16. ];
      17. let rows = stmt.bind(&params)?.add_batch()?.execute()?;

      .execute()

      执行 SQL。 对象可以复用,在执行后可以重新绑定并执行。执行前请确保所有数据已通过 .add_batch 加入到执行队列中。

      1. stmt.execute()?;
      2. // next bind cycle.
      3. //stmt.set_tbname()?;
      4. //stmt.bind()?;
      5. //stmt.execute()?;

      一个可运行的示例请见 GitHub 上的示例

      订阅

      TDengine 通过消息队列 TMQ 启动一个订阅。

      从 DSN 开始,构建一个 TMQ 连接器。

      1. let tmq = TmqBuilder::from_dsn("taos://localhost:6030/?group.id=test")?;

      创建消费者:

      1. let mut consumer = tmq.build()?;

      消费者可订阅一个或多个 TOPIC

      1. consumer.subscribe(["tmq_meters"]).await?;

      TMQ 消息队列是一个 类型,可以使用相应 API 对每个消息进行消费,并通过 .commit 进行已消费标记。

      停止订阅:

      1. consumer.unsubscribe().await;

      对于 TMQ DSN, 有以下配置项可以进行设置,需要注意的是,group.id 是必须的。

      • group.id: 同一个消费者组,将以至少消费一次的方式进行消息负载均衡。
      • client.id: 可选的订阅客户端识别项。
      • auto.offset.reset: 可选初始化订阅起点, earliest 为从头开始订阅, latest 为仅从最新数据开始订阅,默认为从头订阅。注意,此选项在同一个 group.id 中仅生效一次。
      • enable.auto.commit: 当设置为 时,将启用自动标记模式,当对数据一致性不敏感时,可以启用此方式。
      • auto.commit.interval.ms: 自动标记的时间间隔。

      完整订阅示例参见 GitHub 示例文件.

      其他相关结构体 API 使用说明请移步 Rust 文档托管网页:。