Skip to content

Commit

Permalink
Introduce Invalid pseudo-command.
Browse files Browse the repository at this point in the history
This helps draw a distinction between errors caused by bugs in the
server, and errors caused by bugs in the client. This latter kind,
at least as `Command` parse errors are concerned, is handled now
with an `Invalid` pseudo-command (akin to `Unknown`). Drawing this
distinction is pedagogically useful, as it helps set up an example
usage of `warn!` in `process_frame` (in contrast to other kinds of
errors, which are still traced with `error!` in `Listener::run`).
  • Loading branch information
jswrenn committed May 25, 2022
1 parent fc4314d commit fa08337
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 35 deletions.
4 changes: 2 additions & 2 deletions src/cmd/get.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Connection, Db, Frame, Parse};
use crate::{Connection, Db, Frame, Parse, ParseError};

use bytes::Bytes;
use tracing::{debug, instrument};
Expand Down Expand Up @@ -48,7 +48,7 @@ impl Get {
/// GET key
/// ```
#[instrument(level = "trace", name = "Get::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Get> {
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
// The `GET` string has already been consumed. The next value is the
// name of the key to get. If the next value is not a string or the
// input is fully consumed, then an error is returned.
Expand Down
25 changes: 25 additions & 0 deletions src/cmd/invalid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::{Connection, Frame, ParseError};

use tracing::instrument;

/// Represents a malformed frame. This is not a real `Redis` command.
#[derive(Debug)]
pub struct Invalid {
error: ParseError,
}

impl Invalid {
/// Create a new `Invalid` command which responds to frames that could not
/// be successfully parsed as commands.
pub(crate) fn new(error: ParseError) -> Self {
Self { error }
}

/// Responds to the client, indicating the command could not be parsed.
#[instrument(level = "trace", name = "ParseError::apply", skip(dst))]
pub(crate) async fn apply(self, dst: &mut Connection) -> crate::Result<()> {
let response = Frame::Error(self.error.to_string());
dst.write_frame(&response).await?;
Ok(())
}
}
34 changes: 25 additions & 9 deletions src/cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub use ping::Ping;
mod unknown;
pub use unknown::Unknown;

mod invalid;
pub use invalid::Invalid;

use crate::{Connection, Db, Frame, Parse, ParseError, Shutdown};
use tracing::instrument;

Expand All @@ -31,6 +34,7 @@ pub enum Command {
Unsubscribe(Unsubscribe),
Ping(Ping),
Unknown(Unknown),
Invalid(Invalid),
}

impl Command {
Expand All @@ -42,8 +46,13 @@ impl Command {
/// # Returns
///
/// On success, the command value is returned, otherwise, `Err` is returned.
#[instrument(level = "trace", name = "Command::from_frame", skip(frame), err)]
pub fn from_frame(frame: Frame) -> crate::Result<Command> {
///
/// # Traces
///
/// Generates a TRACE-level span named `Command::from_frame` that includes
/// the `Debug`-representation of `frame` as a field.
#[instrument(level = "trace", name = "Command::from_frame")]
pub fn from_frame(frame: Frame) -> Result<Command, ParseError> {
// The frame value is decorated with `Parse`. `Parse` provides a
// "cursor" like API which makes parsing the command easier.
//
Expand Down Expand Up @@ -87,16 +96,21 @@ impl Command {
Ok(command)
}

/// Construct an `Invalid` response command from a `ParseError`.
pub(crate) fn from_error(err: ParseError) -> Command {
Command::Invalid(invalid::Invalid::new(err))
}

/// Apply the command to the specified `Db` instance.
///
/// The response is written to `dst`. This is called by the server in order
/// to execute a received command.
#[instrument(
level = "trace",
name = "Command::apply",
skip(self, db, dst, shutdown),
err
)]
///
/// # Traces
///
/// Generates a `TRACE`-level span that includes the `Debug`-serializaiton
/// of `self` (the `Command` being applied) as a field.
#[instrument(level = "trace", name = "Command::apply", skip(db, dst, shutdown))]
pub(crate) async fn apply(
self,
db: &Db,
Expand All @@ -112,9 +126,10 @@ impl Command {
Subscribe(cmd) => cmd.apply(db, dst, shutdown).await,
Ping(cmd) => cmd.apply(dst).await,
Unknown(cmd) => cmd.apply(dst).await,
Invalid(cmd) => cmd.apply(dst).await,
// `Unsubscribe` cannot be applied. It may only be received from the
// context of a `Subscribe` command.
Unsubscribe(_) => Err("`Unsubscribe` is unsupported in this context".into()),
Unsubscribe(_) => Result::Err("`Unsubscribe` is unsupported in this context".into()),
}
}

Expand All @@ -128,6 +143,7 @@ impl Command {
Command::Unsubscribe(_) => "unsubscribe",
Command::Ping(_) => "ping",
Command::Unknown(cmd) => cmd.get_name(),
Command::Invalid(_) => "err",
}
}
}
2 changes: 1 addition & 1 deletion src/cmd/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Ping {
/// PING [message]
/// ```
#[instrument(level = "trace", name = "Ping::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Ping> {
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
match parse.next_string() {
Ok(msg) => Ok(Ping::new(Some(msg))),
Err(ParseError::EndOfStream) => Ok(Ping::default()),
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/publish.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Connection, Db, Frame, Parse};
use crate::{Connection, Db, Frame, Parse, ParseError};

use bytes::Bytes;
use tracing::instrument;
Expand Down Expand Up @@ -49,7 +49,7 @@ impl Publish {
/// PUBLISH channel message
/// ```
#[instrument(level = "trace", name = "Publish::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Publish> {
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
// The `PUBLISH` string has already been consumed. Extract the `channel`
// and `message` values from the frame.
//
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Set {
/// SET key value [EX seconds|PX milliseconds]
/// ```
#[instrument(level = "trace", name = "Set::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Set> {
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
use ParseError::EndOfStream;

// Read the key to set. This is a required field
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Subscribe {
/// SUBSCRIBE channel [channel ...]
/// ```
#[instrument(level = "trace", name = "Subscribe::parse_frames", skip(parse))]
pub(crate) fn parse_frames(parse: &mut Parse) -> crate::Result<Subscribe> {
pub(crate) fn parse_frames(parse: &mut Parse) -> Result<Self, ParseError> {
use ParseError::EndOfStream;

// The `SUBSCRIBE` string has already been consumed. At this point,
Expand Down
2 changes: 1 addition & 1 deletion src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub(crate) struct Parse {
/// Only `EndOfStream` errors are handled at runtime. All other errors result in
/// the connection being terminated.
#[derive(Debug)]
pub(crate) enum ParseError {
pub enum ParseError {
/// Attempting to extract a value failed due to the frame being fully
/// consumed.
EndOfStream,
Expand Down
44 changes: 26 additions & 18 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{broadcast, mpsc, Semaphore};
use tokio::time::{self, Duration};
use tracing::{debug, error, info, instrument};
use tracing::{error, info, instrument, warn};

/// Server listener state. Created in the `run` call. It includes a `run` method
/// which performs the TCP listening and initialization of per-connection state.
Expand Down Expand Up @@ -349,7 +349,7 @@ impl Handler {
}

/// Process a single connection.
#[instrument(level = "debug", name = "Handler::process_frame", skip(self), err)]
#[instrument(level = "debug", name = "Handler::process_frame", skip(self))]
async fn process_frame(&mut self) -> crate::Result<ControlFlow<(), ()>> {
// While reading a request frame, also listen for the shutdown
// signal.
Expand All @@ -370,23 +370,31 @@ impl Handler {
None => return Ok(ControlFlow::Break(())),
};

debug!(?frame);

// Convert the redis frame into a command struct. This returns an
// error if the frame is not a valid redis command or it is an
// unsupported command.
let cmd = Command::from_frame(frame)?;

// Logs the `cmd` object. The syntax here is a shorthand provided by
// the `tracing` crate. It can be thought of as similar to:
//
// ```
// debug!(cmd = format!("{:?}", cmd));
// ```
//
// `tracing` provides structured logging, so information is "logged"
// as key-value pairs.
debug!(?cmd);
// error if the frame is not a valid redis command.
let cmd = match Command::from_frame(frame) {
Ok(cmd) => cmd,
Err(cause) => {
// The frame was malformed and could not be parsed. This is
// probably indicative of an issue with the client (as opposed
// to our server), so we (1) emit a warning...
//
// The syntax here is a shorthand provided by the `tracing`
// crate. It can be thought of as similar to:
// warn! {
// cause = format!("{}", cause),
// "failed to parse command from frame"
// };
// `tracing` provides structured logging, so information is
// "logged" as key-value pairs.
warn! {
%cause,
"failed to parse command from frame"
};
// ...and (2) respond to the client with the error:
Command::from_error(cause)
}
};

// Perform the work needed to apply the command. This may mutate the
// database state as a result.
Expand Down

0 comments on commit fa08337

Please sign in to comment.