Buffered writes

    Consider a bulk stream frame. The value being written is Frame::Bulk(Bytes). The wire format of a bulk frame is a frame head, which consists of the $ character followed by the data length in bytes. The majority of the frame is the contents of the Bytes value. If the data is large, copying it to an intermediate buffer would be costly.

    To implement buffered writes, we will use the BufWriter struct. This struct is initialized with a T: AsyncWrite and implements AsyncWrite itself. When write is called on BufWriter, the write does not go directly to the inner writer, but to a buffer. When the buffer is full, the contents are flushed to the inner writer and the inner buffer is cleared. There are also optimizations that allow bypassing the buffer in certain cases.

    First, the Connection struct is updated:

    Next, write_frame() is implemented.

    1. use tokio::io::{self, AsyncWriteExt};
    2. use mini_redis::Frame;
    3. async fn write_value(&mut self, frame: &Frame)
    4. -> io::Result<()>
    5. {
    6. match frame {
    7. Frame::Simple(val) => {
    8. self.stream.write_u8(b'+').await?;
    9. self.stream.write_all(b"\r\n").await?;
    10. }
    11. Frame::Error(val) => {
    12. self.stream.write_all(val.as_bytes()).await?;
    13. self.stream.write_all(b"\r\n").await?;
    14. }
    15. Frame::Integer(val) => {
    16. self.stream.write_u8(b':').await?;
    17. self.write_decimal(*val).await?;
    18. }
    19. Frame::Null => {
    20. self.stream.write_all(b"$-1\r\n").await?;
    21. }
    22. Frame::Bulk(val) => {
    23. let len = val.len();
    24. self.write_decimal(len as u64).await?;
    25. self.stream.write_all(val).await?;
    26. self.stream.write_all(b"\r\n").await?;
    27. }
    28. }
    29. self.stream.flush().await;
    30. Ok(())
    31. }
    • writes a single byte to the writer.
    • write_all writes the entire slice to the writer.

    The function ends with a call to self.stream.flush().await. Because BufWriter stores writes in an intermediate buffer, calls to write do not guarantee that the data is written to the socket. Before returning, we want the frame to be written to the socket. The call to flush() writes any data pending in the buffer to the socket.

    Another alternative would be to not call flush() in write_frame(). Instead, provide a flush() function on Connection. This would allow the caller to write queue multiple small frames in the write buffer then write them all to the socket with one write syscall. Doing this complicates the Connection API. Simplicity is one of Mini-Redis’ goals, so we decided to include the flush().await call in .