Loops
This example selects over the three channel receivers. When a message is received on any channel, it is written to STDOUT. When a channel is closed, recv()
returns with None
. By using pattern matching, the select!
macro continues waiting on the remaining channels. When all channels are closed, the else
branch is evaluated and the loop is terminated.
The select!
macro randomly picks branches to check first for readiness. When multiple channels have pending values, a random channel will be picked to receive from. This is to handle the case where the receive loop processes messages slower than they are pushed into the channels, meaning that the channels start to fill up. If select!
did not randomly pick a branch to check first, on each iteration of the loop, rx1
would be checked first. If rx1
always contained a new message, the remaining channels would never be checked.
Now we will show how to run an asynchronous operation across multiple calls to select!
. In this example, we have an MPSC channel with item type i32
, and an asynchronous function. We want to run the asynchronous function until it completes or an even integer is received on the channel.
Note how, instead of calling action()
in the select!
macro, it is called outside the loop. The return of action()
is assigned to operation
without calling .await
. Then we call tokio::pin!
on operation
.
The other select!
branch receives a message from the channel. If the message is even, we are done looping. Otherwise, start the select!
again.
This is the first time we use tokio::pin!
. We aren’t going to get into the details of pinning yet. The thing to note is that, to .await
a reference, the value being referenced must be pinned or implement Unpin
.
If we remove the tokio::pin!
line and try to compile, we get the following error:
This error isn’t very clear and we haven’t talked much about Future
yet either. For now, think of Future
as the trait that must be implemented by a value in order to call .await
on it. If you hit an error about Future
not being implemented when attempting to call .await
on a reference, then the future probably needs to be pinned.
Read more about Pin
on the .
Modifying a branch
- An async operation to perform on
i32
values.
The logic we want to implement is:
- Wait for an even number on the channel.
- Wait for the operation, but at the same time listen for more even numbers on the channel.
- If a new even number is received before the existing operation completes, abort the existing operation and start it over with the new even number.
We use a similar strategy as the previous example. The async fn is called outside of the loop and assigned to operation
. The operation
variable is pinned. The loop selects on both operation
and the channel receiver.
Notice how action
takes Option<i32>
as an argument. Before we receive the first even number, we need to instantiate operation
to something. We make action
take Option
and return Option
. If is passed in, None
is returned. The first loop iteration, operation
completes immediately with None
.
This example uses some new syntax. The first branch includes , if !done
. This is a branch precondition. Before explaining how it works, lets look at what happens if the precondition is omitted. Leaving out , if !done
and running the example results in the following output:
This error happens when attempting to use operation
after it has already completed. Usually, when using .await
, the value being awaited is consumed. In this example, we await on a reference. This means operation
is still around after it has completed.