Buffered writes
Consider a bulk stream frame. The value being written is Frame::Bulk(Bytes)
. The wire format of a bulk frame is a frame head, which consists of the $
character followed by the data length in bytes. The majority of the frame is the contents of the Bytes
value. If the data is large, copying it to an intermediate buffer would be costly.
To implement buffered writes, we will use the BufWriter
struct. This struct is initialized with a T: AsyncWrite
and implements AsyncWrite
itself. When write
is called on BufWriter
, the write does not go directly to the inner writer, but to a buffer. When the buffer is full, the contents are flushed to the inner writer and the inner buffer is cleared. There are also optimizations that allow bypassing the buffer in certain cases.
First, the Connection
struct is updated:
Next, write_frame()
is implemented.
use tokio::io::{self, AsyncWriteExt};
use mini_redis::Frame;
async fn write_value(&mut self, frame: &Frame)
-> io::Result<()>
{
match frame {
Frame::Simple(val) => {
self.stream.write_u8(b'+').await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Error(val) => {
self.stream.write_all(val.as_bytes()).await?;
self.stream.write_all(b"\r\n").await?;
}
Frame::Integer(val) => {
self.stream.write_u8(b':').await?;
self.write_decimal(*val).await?;
}
Frame::Null => {
self.stream.write_all(b"$-1\r\n").await?;
}
Frame::Bulk(val) => {
let len = val.len();
self.write_decimal(len as u64).await?;
self.stream.write_all(val).await?;
self.stream.write_all(b"\r\n").await?;
}
}
self.stream.flush().await;
Ok(())
}
- writes a single byte to the writer.
write_all
writes the entire slice to the writer.
The function ends with a call to self.stream.flush().await
. Because BufWriter
stores writes in an intermediate buffer, calls to write
do not guarantee that the data is written to the socket. Before returning, we want the frame to be written to the socket. The call to flush()
writes any data pending in the buffer to the socket.
Another alternative would be to not call flush()
in write_frame()
. Instead, provide a flush()
function on Connection
. This would allow the caller to write queue multiple small frames in the write buffer then write them all to the socket with one write
syscall. Doing this complicates the Connection
API. Simplicity is one of Mini-Redis’ goals, so we decided to include the flush().await
call in .