Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
feat(flags): Do token validation and extract distinct id (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
neilkakkar authored May 29, 2024
1 parent ebaf596 commit f71dc08
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 66 deletions.
39 changes: 34 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions feature-flags/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ redis = { version = "0.23.3", features = [
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
serde-pickle = { version = "1.1.1"}

[lints]
workspace = true
Expand Down
9 changes: 9 additions & 0 deletions feature-flags/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ pub enum FlagError {

#[error("rate limited")]
RateLimited,

#[error("failed to parse redis cache data")]
DataParsingError,
#[error("redis unavailable")]
RedisUnavailable,
}

impl IntoResponse for FlagError {
Expand All @@ -52,6 +57,10 @@ impl IntoResponse for FlagError {
}

FlagError::RateLimited => (StatusCode::TOO_MANY_REQUESTS, self.to_string()),

FlagError::DataParsingError | FlagError::RedisUnavailable => {
(StatusCode::SERVICE_UNAVAILABLE, self.to_string())
}
}
.into_response()
}
Expand Down
2 changes: 1 addition & 1 deletion feature-flags/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use envconfig::Envconfig;

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "127.0.0.1:0")]
#[envconfig(default = "127.0.0.1:3001")]
pub address: SocketAddr,

#[envconfig(default = "postgres://posthog:posthog@localhost:15432/test_database")]
Expand Down
9 changes: 9 additions & 0 deletions feature-flags/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,14 @@ pub mod config;
pub mod redis;
pub mod router;
pub mod server;
pub mod team;
pub mod v0_endpoint;
pub mod v0_request;

// Test modules don't need to be compiled with main binary
// #[cfg(test)]
// TODO: To use in integration tests, we need to compile with binary
// or make it a separate feature using cfg(feature = "integration-tests")
// and then use this feature only in tests.
// For now, ok to just include in binary
pub mod test_utils;
73 changes: 46 additions & 27 deletions feature-flags/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,38 @@ use std::time::Duration;

use anyhow::Result;
use async_trait::async_trait;
use redis::AsyncCommands;
use redis::{AsyncCommands, RedisError};
use thiserror::Error;
use tokio::time::timeout;

// average for all commands is <10ms, check grafana
const REDIS_TIMEOUT_MILLISECS: u64 = 10;

#[derive(Error, Debug)]
pub enum CustomRedisError {
#[error("Not found in redis")]
NotFound,

#[error("Pickle error: {0}")]
PickleError(#[from] serde_pickle::Error),

#[error("Redis error: {0}")]
Other(#[from] RedisError),

#[error("Timeout error")]
Timeout(#[from] tokio::time::error::Elapsed),
}
/// A simple redis wrapper
/// Copied from capture/src/redis.rs.
/// TODO: Modify this to support hincrby, get, and set commands.
/// TODO: Modify this to support hincrby
#[async_trait]
pub trait Client {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result<Vec<String>>;

async fn get(&self, k: String) -> Result<String, CustomRedisError>;
async fn set(&self, k: String, v: String) -> Result<()>;
}

pub struct RedisClient {
Expand All @@ -40,38 +58,39 @@ impl Client for RedisClient {

Ok(fut?)
}
}

// TODO: Find if there's a better way around this.
#[derive(Clone)]
pub struct MockRedisClient {
zrangebyscore_ret: Vec<String>,
}
async fn get(&self, k: String) -> Result<String, CustomRedisError> {
let mut conn = self.client.get_async_connection().await?;

impl MockRedisClient {
pub fn new() -> MockRedisClient {
MockRedisClient {
zrangebyscore_ret: Vec::new(),
let results = conn.get(k);
let fut: Result<Vec<u8>, RedisError> =
timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?;

// return NotFound error when empty or not found
if match &fut {
Ok(v) => v.is_empty(),
Err(_) => false,
} {
return Err(CustomRedisError::NotFound);
}
}

pub fn zrangebyscore_ret(&mut self, ret: Vec<String>) -> Self {
self.zrangebyscore_ret = ret;
// TRICKY: We serialise data to json, then django pickles it.
// Here we deserialize the bytes using serde_pickle, to get the json string.
let string_response: String = serde_pickle::from_slice(&fut?, Default::default())?;

self.clone()
Ok(string_response)
}
}

impl Default for MockRedisClient {
fn default() -> Self {
Self::new()
}
}
async fn set(&self, k: String, v: String) -> Result<()> {
// TRICKY: We serialise data to json, then django pickles it.
// Here we serialize the json string to bytes using serde_pickle.
let bytes = serde_pickle::to_vec(&v, Default::default())?;

#[async_trait]
impl Client for MockRedisClient {
// A very simplified wrapper, but works for our usage
async fn zrangebyscore(&self, _k: String, _min: String, _max: String) -> Result<Vec<String>> {
Ok(self.zrangebyscore_ret.clone())
let mut conn = self.client.get_async_connection().await?;

let results = conn.set(k, bytes);
let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?;

Ok(fut?)
}
}
139 changes: 139 additions & 0 deletions feature-flags/src/team.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::instrument;

use crate::{
api::FlagError,
redis::{Client, CustomRedisError},
};

// TRICKY: This cache data is coming from django-redis. If it ever goes out of sync, we'll bork.
// TODO: Add integration tests across repos to ensure this doesn't happen.
pub const TEAM_TOKEN_CACHE_PREFIX: &str = "posthog:1:team_token:";

#[derive(Debug, Deserialize, Serialize)]
pub struct Team {
pub id: i64,
pub name: String,
pub api_token: String,
}

impl Team {
/// Validates a token, and returns a team if it exists.
#[instrument(skip_all)]
pub async fn from_redis(
client: Arc<dyn Client + Send + Sync>,
token: String,
) -> Result<Team, FlagError> {
// TODO: Instead of failing here, i.e. if not in redis, fallback to pg
let serialized_team = client
.get(format!("{TEAM_TOKEN_CACHE_PREFIX}{}", token))
.await
.map_err(|e| match e {
CustomRedisError::NotFound => FlagError::TokenValidationError,
CustomRedisError::PickleError(_) => {
tracing::error!("failed to fetch data: {}", e);
FlagError::DataParsingError
}
_ => {
tracing::error!("Unknown redis error: {}", e);
FlagError::RedisUnavailable
}
})?;

let team: Team = serde_json::from_str(&serialized_team).map_err(|e| {
tracing::error!("failed to parse data to team: {}", e);
FlagError::DataParsingError
})?;

Ok(team)
}
}

#[cfg(test)]
mod tests {
use rand::Rng;
use redis::AsyncCommands;

use super::*;
use crate::{
team,
test_utils::{insert_new_team_in_redis, random_string, setup_redis_client},
};

#[tokio::test]
async fn test_fetch_team_from_redis() {
let client = setup_redis_client(None);

let team = insert_new_team_in_redis(client.clone()).await.unwrap();

let target_token = team.api_token;

let team_from_redis = Team::from_redis(client.clone(), target_token.clone())
.await
.unwrap();
assert_eq!(team_from_redis.api_token, target_token);
assert_eq!(team_from_redis.id, team.id);
}

#[tokio::test]
async fn test_fetch_invalid_team_from_redis() {
let client = setup_redis_client(None);

match Team::from_redis(client.clone(), "banana".to_string()).await {
Err(FlagError::TokenValidationError) => (),
_ => panic!("Expected TokenValidationError"),
};
}

#[tokio::test]
async fn test_cant_connect_to_redis_error_is_not_token_validation_error() {
let client = setup_redis_client(Some("redis://localhost:1111/".to_string()));

match Team::from_redis(client.clone(), "banana".to_string()).await {
Err(FlagError::RedisUnavailable) => (),
_ => panic!("Expected RedisUnavailable"),
};
}

#[tokio::test]
async fn test_corrupted_data_in_redis_is_handled() {
// TODO: Extend this test with fallback to pg
let id = rand::thread_rng().gen_range(0..10_000_000);
let token = random_string("phc_", 12);
let team = Team {
id,
name: "team".to_string(),
api_token: token,
};
let serialized_team = serde_json::to_string(&team).expect("Failed to serialise team");

// manually insert non-pickled data in redis
let client =
redis::Client::open("redis://localhost:6379/").expect("Failed to create redis client");
let mut conn = client
.get_async_connection()
.await
.expect("Failed to get redis connection");
conn.set::<String, String, ()>(
format!(
"{}{}",
team::TEAM_TOKEN_CACHE_PREFIX,
team.api_token.clone()
),
serialized_team,
)
.await
.expect("Failed to write data to redis");

// now get client connection for data
let client = setup_redis_client(None);

match Team::from_redis(client.clone(), team.api_token.clone()).await {
Err(FlagError::DataParsingError) => (),
Err(other) => panic!("Expected DataParsingError, got {:?}", other),
Ok(_) => panic!("Expected DataParsingError"),
};
}
}
Loading

0 comments on commit f71dc08

Please sign in to comment.