Streams

Tokio provides stream support in a separate crate: tokio-stream.

Iteration

Currently, the Rust programming language does not support async for loops. Instead, iterating streams is done using a while let loop paired with StreamExt::next().

  1. use tokio_stream::StreamExt;
  2. #[tokio::main]
  3. async fn main() {
  4. let mut stream = tokio_stream::iter(&[1, 2, 3]);
  5. while let Some(v) = stream.next().await {
  6. println!("GOT = {:?}", v);
  7. }
  8. }

Like iterators, the next() method returns Option<T> where T is the stream’s value type. Receiving None indicates that stream iteration is terminated.

Let’s go over a slightly more complicated example using the Mini-Redis client.

Full code can be found .

  1. use tokio_stream::StreamExt;
  2. use mini_redis::client;
  3. async fn publish() -> mini_redis::Result<()> {
  4. let mut client = client::connect("127.0.0.1:6379").await?;
  5. // Publish some data
  6. client.publish("numbers", "1".into()).await?;
  7. client.publish("numbers", "two".into()).await?;
  8. client.publish("numbers", "3".into()).await?;
  9. client.publish("numbers", "four".into()).await?;
  10. client.publish("numbers", "five".into()).await?;
  11. client.publish("numbers", "6".into()).await?;
  12. Ok(())
  13. }
  14. async fn subscribe() -> mini_redis::Result<()> {
  15. let client = client::connect("127.0.0.1:6379").await?;
  16. let subscriber = client.subscribe(vec!["numbers".to_string()]).await?;
  17. let messages = subscriber.into_stream();
  18. tokio::pin!(messages);
  19. while let Some(msg) = messages.next().await {
  20. println!("got = {:?}", msg);
  21. }
  22. Ok(())
  23. }
  24. #[tokio::main]
  25. async fn main() -> mini_redis::Result<()> {
  26. tokio::spawn(async {
  27. publish().await
  28. subscribe().await?;
  29. println!("DONE");
  30. Ok(())

A task is spawned to publish messages to the Mini-Redis server on the “numbers” channel. Then, on the main task, we subscribe to the “numbers” channel and display received messages.

After subscribing, into_stream() is called on the returned subscriber. This consumes the Subscriber, returning a stream that yields messages as they arrive. Before we start iterating the messages, note that the stream is to the stack using tokio::pin!. Calling next() on a stream requires the stream to be . The into_stream() function returns a stream that is not pinned, we must explicitly pin it in order to iterate it.

A Rust value is “pinned” when it can no longer be moved in memory. A key property of a pinned value is that pointers can be taken to the pinned data and the caller can be confident the pointer stays valid. This feature is used by async/await to support borrowing data across .await points.

If we forget to pin the stream, we get an error like this:

  1. error[E0277]: `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>` cannot be unpinned
  2. --> streams/src/main.rs:29:36
  3. |
  4. 29 | while let Some(msg) = messages.next().await {
  5. | ^^^^ within `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Subscriber::into_stream::{closure#0} for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {ResumeTy, &'r mut Subscriber, Subscriber, impl Future, (), std::result::Result<Option<Message>, Box<(dyn std::error::Error + Send + Sync + 't0)>>, Box<(dyn std::error::Error + Send + Sync + 't1)>, &'t2 mut async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't3)>>>, async_stream::yielder::Sender<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't4)>>>, std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 't5)>>, impl Future, Option<Message>, Message}]>`
  6. |
  7. = note: required because it appears within the type `impl Future`
  8. = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<Message, Box<(dyn std::error::Error + Send + Sync + 'static)>>, impl Future>`
  9. = note: required because it appears within the type `impl Stream`
  10. = note: required because it appears within the type `tokio_stream::filter::_::__Origin<'_, impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
  11. = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>`
  12. = note: required because it appears within the type `tokio_stream::map::_::__Origin<'_, tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
  13. = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>`
  14. = note: required because it appears within the type `tokio_stream::take::_::__Origin<'_, tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`
  15. = note: required because of the requirements on the impl of `Unpin` for `tokio_stream::take::Take<tokio_stream::map::Map<tokio_stream::filter::Filter<impl Stream, [closure@streams/src/main.rs:22:17: 25:10]>, [closure@streams/src/main.rs:26:14: 26:40]>>`

Before trying to run this, start the Mini-Redis server:

  1. $ mini-redis-server

Then try running the code. We will see the messages outputted to STDOUT.

Some early messages may be dropped as there is a race between subscribing and publishing. The program never exits. A subscription to a Mini-Redis channel stays active as long as the server is active.

Let’s see how we can work with streams to expand on this program.

Adapters

Functions that take a and return another Stream are often called ‘stream adapters’, as they’re a form of the ‘adapter pattern’. Common stream adapters include , take, and .

Lets update the Mini-Redis so that it will exit. After receiving three messages, stop iterating messages. This is done using take. This adapter limits the stream to yield at most n messages.

  1. let messages = subscriber
  2. .into_stream()
  3. .take(3);

Running the program again, we get:

  1. got = Ok(Message { channel: "numbers", content: b"1" })
  2. got = Ok(Message { channel: "numbers", content: b"two" })
  3. got = Ok(Message { channel: "numbers", content: b"3" })

This time the program ends.

Now, let’s limit the stream to single digit numbers. We will check this by checking for the message length. We use the adapter to drop any message that does not match the predicate.

  1. let messages = subscriber
  2. .into_stream()
  3. .filter(|msg| match msg {
  4. Ok(msg) if msg.content.len() == 1 => true,
  5. _ => false,
  6. })
  7. .take(3);

Running the program again, we get:

  1. got = Ok(Message { channel: "numbers", content: b"1" })
  2. got = Ok(Message { channel: "numbers", content: b"3" })
  3. got = Ok(Message { channel: "numbers", content: b"6" })

Finally, we will tidy up the output by stripping the Ok(Message { ... }) part of the output. This is done with map. Because this is applied after filter, we know the message is Ok, so we can use unwrap().

Now, the output is:

  1. got = b"1"
  2. got = b"3"
  3. got = b"6"

Another option would be to combine the and map steps into a single call using .

There are more available adapters. See the list here.

Implementing Stream

The Stream trait is very similar to the trait.

  1. use std::pin::Pin;
  2. use std::task::{Context, Poll};
  3. pub trait Stream {
  4. type Item;
  5. fn poll_next(
  6. cx: &mut Context<'_>
  7. ) -> Poll<Option<Self::Item>>;
  8. fn size_hint(&self) -> (usize, Option<usize>) {
  9. (0, None)
  10. }
  11. }

The Stream::poll_next() function is much like Future::poll, except it can be called repeatedly to receive many values from the stream. Just as we saw in Async in depth, when a stream is not ready to return a value, Poll::Pending is returned instead. The task’s waker is registered. Once the stream should be polled again, the waker is notified.

The size_hint() method is used the same way as it is with .

Usually, when manually implementing a Stream, it is done by composing futures and other streams. As an example, let’s build off of the Delay future we implemented in Async in depth. We will convert it to a stream that yields () three times at 10 ms intervals

  1. use tokio_stream::Stream;
  2. use std::pin::Pin;
  3. use std::task::{Context, Poll};
  4. use std::time::Duration;
  5. struct Interval {
  6. rem: usize,
  7. delay: Delay,
  8. }
  9. impl Stream for Interval {
  10. type Item = ();
  11. fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
  12. -> Poll<Option<()>>
  13. {
  14. if self.rem == 0 {
  15. // No more delays
  16. return Poll::Ready(None);
  17. }
  18. match Pin::new(&mut self.delay).poll(cx) {
  19. Poll::Ready(_) => {
  20. let when = self.delay.when + Duration::from_millis(10);
  21. self.delay = Delay { when };
  22. self.rem -= 1;
  23. Poll::Ready(Some(()))
  24. }
  25. Poll::Pending => Poll::Pending,
  26. }
  27. }
  28. }

async-stream

Manually implementing streams using the Stream trait can be tedious. Unfortunately, the Rust programming language does not yet support async/await syntax for defining streams. This is in the works, but not yet ready.

The crate is available as a temporary solution. This crate provides an async_stream! macro that transforms the input into a stream. Using this crate, the above interval can be implemented like this:

  1. use async_stream::stream;
  2. use std::time::{Duration, Instant};
  3. stream! {
  4. let mut when = Instant::now();
  5. for _ in 0..3 {
  6. let delay = Delay { when };
  7. delay.await;
  8. yield ();
  9. when += Duration::from_millis(10);