From fa08337aa1a6b285becbf88ca4cdd23068e11ef1 Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Wed, 4 May 2022 12:51:56 -0400 Subject: [PATCH] Introduce `Invalid` pseudo-command. This helps draw a distinction between errors caused by bugs in the server, and errors caused by bugs in the client. This latter kind, at least as `Command` parse errors are concerned, is handled now with an `Invalid` pseudo-command (akin to `Unknown`). Drawing this distinction is pedagogically useful, as it helps set up an example usage of `warn!` in `process_frame` (in contrast to other kinds of errors, which are still traced with `error!` in `Listener::run`). --- src/cmd/get.rs | 4 ++-- src/cmd/invalid.rs | 25 +++++++++++++++++++++++++ src/cmd/mod.rs | 34 +++++++++++++++++++++++++--------- src/cmd/ping.rs | 2 +- src/cmd/publish.rs | 4 ++-- src/cmd/set.rs | 2 +- src/cmd/subscribe.rs | 2 +- src/parse.rs | 2 +- src/server.rs | 44 ++++++++++++++++++++++++++------------------ 9 files changed, 84 insertions(+), 35 deletions(-) create mode 100644 src/cmd/invalid.rs diff --git a/src/cmd/get.rs b/src/cmd/get.rs index ef9df3c..146d8b0 100644 --- a/src/cmd/get.rs +++ b/src/cmd/get.rs @@ -1,4 +1,4 @@ -use crate::{Connection, Db, Frame, Parse}; +use crate::{Connection, Db, Frame, Parse, ParseError}; use bytes::Bytes; use tracing::{debug, instrument}; @@ -48,7 +48,7 @@ impl Get { /// GET key /// ``` #[instrument(level = "trace", name = "Get::parse_frames", skip(parse))] - pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result { + pub(crate) fn parse_frames(parse: &mut Parse) -> Result { // The `GET` string has already been consumed. The next value is the // name of the key to get. If the next value is not a string or the // input is fully consumed, then an error is returned. diff --git a/src/cmd/invalid.rs b/src/cmd/invalid.rs new file mode 100644 index 0000000..8093e22 --- /dev/null +++ b/src/cmd/invalid.rs @@ -0,0 +1,25 @@ +use crate::{Connection, Frame, ParseError}; + +use tracing::instrument; + +/// Represents a malformed frame. This is not a real `Redis` command. +#[derive(Debug)] +pub struct Invalid { + error: ParseError, +} + +impl Invalid { + /// Create a new `Invalid` command which responds to frames that could not + /// be successfully parsed as commands. + pub(crate) fn new(error: ParseError) -> Self { + Self { error } + } + + /// Responds to the client, indicating the command could not be parsed. + #[instrument(level = "trace", name = "ParseError::apply", skip(dst))] + pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> { + let response = Frame::Error(self.error.to_string()); + dst.write_frame(&response).await?; + Ok(()) + } +} diff --git a/src/cmd/mod.rs b/src/cmd/mod.rs index 80690e8..180c287 100644 --- a/src/cmd/mod.rs +++ b/src/cmd/mod.rs @@ -16,6 +16,9 @@ pub use ping::Ping; mod unknown; pub use unknown::Unknown; +mod invalid; +pub use invalid::Invalid; + use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown}; use tracing::instrument; @@ -31,6 +34,7 @@ pub enum Command { Unsubscribe(Unsubscribe), Ping(Ping), Unknown(Unknown), + Invalid(Invalid), } impl Command { @@ -42,8 +46,13 @@ impl Command { /// # Returns /// /// On success, the command value is returned, otherwise, `Err` is returned. - #[instrument(level = "trace", name = "Command::from_frame", skip(frame), err)] - pub fn from_frame(frame: Frame) -> crate::Result { + /// + /// # Traces + /// + /// Generates a TRACE-level span named `Command::from_frame` that includes + /// the `Debug`-representation of `frame` as a field. + #[instrument(level = "trace", name = "Command::from_frame")] + pub fn from_frame(frame: Frame) -> Result { // The frame value is decorated with `Parse`. `Parse` provides a // "cursor" like API which makes parsing the command easier. // @@ -87,16 +96,21 @@ impl Command { Ok(command) } + /// Construct an `Invalid` response command from a `ParseError`. + pub(crate) fn from_error(err: ParseError) -> Command { + Command::Invalid(invalid::Invalid::new(err)) + } + /// Apply the command to the specified `Db` instance. /// /// The response is written to `dst`. This is called by the server in order /// to execute a received command. - #[instrument( - level = "trace", - name = "Command::apply", - skip(self, db, dst, shutdown), - err - )] + /// + /// # Traces + /// + /// Generates a `TRACE`-level span that includes the `Debug`-serializaiton + /// of `self` (the `Command` being applied) as a field. + #[instrument(level = "trace", name = "Command::apply", skip(db, dst, shutdown))] pub(crate) async fn apply( self, db: &Db, @@ -112,9 +126,10 @@ impl Command { Subscribe(cmd) => cmd.apply(db, dst, shutdown).await, Ping(cmd) => cmd.apply(dst).await, Unknown(cmd) => cmd.apply(dst).await, + Invalid(cmd) => cmd.apply(dst).await, // `Unsubscribe` cannot be applied. It may only be received from the // context of a `Subscribe` command. - Unsubscribe(_) => Err("`Unsubscribe` is unsupported in this context".into()), + Unsubscribe(_) => Result::Err("`Unsubscribe` is unsupported in this context".into()), } } @@ -128,6 +143,7 @@ impl Command { Command::Unsubscribe(_) => "unsubscribe", Command::Ping(_) => "ping", Command::Unknown(cmd) => cmd.get_name(), + Command::Invalid(_) => "err", } } } diff --git a/src/cmd/ping.rs b/src/cmd/ping.rs index c2da0f8..f29eb2f 100644 --- a/src/cmd/ping.rs +++ b/src/cmd/ping.rs @@ -40,7 +40,7 @@ impl Ping { /// PING [message] /// ``` #[instrument(level = "trace", name = "Ping::parse_frames", skip(parse))] - pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result { + pub(crate) fn parse_frames(parse: &mut Parse) -> Result { match parse.next_string() { Ok(msg) => Ok(Ping::new(Some(msg))), Err(ParseError::EndOfStream) => Ok(Ping::default()), diff --git a/src/cmd/publish.rs b/src/cmd/publish.rs index 1edcb55..abda68c 100644 --- a/src/cmd/publish.rs +++ b/src/cmd/publish.rs @@ -1,4 +1,4 @@ -use crate::{Connection, Db, Frame, Parse}; +use crate::{Connection, Db, Frame, Parse, ParseError}; use bytes::Bytes; use tracing::instrument; @@ -49,7 +49,7 @@ impl Publish { /// PUBLISH channel message /// ``` #[instrument(level = "trace", name = "Publish::parse_frames", skip(parse))] - pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result { + pub(crate) fn parse_frames(parse: &mut Parse) -> Result { // The `PUBLISH` string has already been consumed. Extract the `channel` // and `message` values from the frame. // diff --git a/src/cmd/set.rs b/src/cmd/set.rs index 36664db..8c08a5c 100644 --- a/src/cmd/set.rs +++ b/src/cmd/set.rs @@ -78,7 +78,7 @@ impl Set { /// SET key value [EX seconds|PX milliseconds] /// ``` #[instrument(level = "trace", name = "Set::parse_frames", skip(parse))] - pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result { + pub(crate) fn parse_frames(parse: &mut Parse) -> Result { use ParseError::EndOfStream; // Read the key to set. This is a required field diff --git a/src/cmd/subscribe.rs b/src/cmd/subscribe.rs index e12c3d8..1603437 100644 --- a/src/cmd/subscribe.rs +++ b/src/cmd/subscribe.rs @@ -62,7 +62,7 @@ impl Subscribe { /// SUBSCRIBE channel [channel ...] /// ``` #[instrument(level = "trace", name = "Subscribe::parse_frames", skip(parse))] - pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result { + pub(crate) fn parse_frames(parse: &mut Parse) -> Result { use ParseError::EndOfStream; // The `SUBSCRIBE` string has already been consumed. At this point, diff --git a/src/parse.rs b/src/parse.rs index 70bfc5e..a4f0367 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -21,7 +21,7 @@ pub(crate) struct Parse { /// Only `EndOfStream` errors are handled at runtime. All other errors result in /// the connection being terminated. #[derive(Debug)] -pub(crate) enum ParseError { +pub enum ParseError { /// Attempting to extract a value failed due to the frame being fully /// consumed. EndOfStream, diff --git a/src/server.rs b/src/server.rs index ba91de7..a4679e5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -11,7 +11,7 @@ use std::sync::Arc; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::{broadcast, mpsc, Semaphore}; use tokio::time::{self, Duration}; -use tracing::{debug, error, info, instrument}; +use tracing::{error, info, instrument, warn}; /// Server listener state. Created in the `run` call. It includes a `run` method /// which performs the TCP listening and initialization of per-connection state. @@ -349,7 +349,7 @@ impl Handler { } /// Process a single connection. - #[instrument(level = "debug", name = "Handler::process_frame", skip(self), err)] + #[instrument(level = "debug", name = "Handler::process_frame", skip(self))] async fn process_frame(&mut self) -> crate::Result> { // While reading a request frame, also listen for the shutdown // signal. @@ -370,23 +370,31 @@ impl Handler { None => return Ok(ControlFlow::Break(())), }; - debug!(?frame); - // Convert the redis frame into a command struct. This returns an - // error if the frame is not a valid redis command or it is an - // unsupported command. - let cmd = Command::from_frame(frame)?; - - // Logs the `cmd` object. The syntax here is a shorthand provided by - // the `tracing` crate. It can be thought of as similar to: - // - // ``` - // debug!(cmd = format!("{:?}", cmd)); - // ``` - // - // `tracing` provides structured logging, so information is "logged" - // as key-value pairs. - debug!(?cmd); + // error if the frame is not a valid redis command. + let cmd = match Command::from_frame(frame) { + Ok(cmd) => cmd, + Err(cause) => { + // The frame was malformed and could not be parsed. This is + // probably indicative of an issue with the client (as opposed + // to our server), so we (1) emit a warning... + // + // The syntax here is a shorthand provided by the `tracing` + // crate. It can be thought of as similar to: + // warn! { + // cause = format!("{}", cause), + // "failed to parse command from frame" + // }; + // `tracing` provides structured logging, so information is + // "logged" as key-value pairs. + warn! { + %cause, + "failed to parse command from frame" + }; + // ...and (2) respond to the client with the error: + Command::from_error(cause) + } + }; // Perform the work needed to apply the command. This may mutate the // database state as a result.