Skip to content

Commit

Permalink
Temporary support writing/decoding headers in old format
Browse files Browse the repository at this point in the history
  • Loading branch information
khvzak committed Mar 19, 2024
1 parent 16c075c commit 0ad7502
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 8 deletions.
1 change: 1 addition & 0 deletions casper-server/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ pub async fn buffer_body(mut body: impl MessageBody) -> Result<Bytes, Box<dyn St
pub(crate) mod proxy;
pub(crate) mod trace;
pub(crate) mod websocket;
pub(crate) mod serde;
123 changes: 123 additions & 0 deletions casper-server/src/http/serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//! Based on `http-serde` crate

/// For `HeaderMap`
///
/// `#[serde(with = "casper_server::http::serde::header_map")]`
pub mod header_map {
use std::borrow::Cow;
use std::fmt;

use ntex::http::header::{HeaderMap, HeaderName, HeaderValue};
use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor};
use serde::ser::SerializeSeq;
use serde::{Serialize, Serializer};

struct ToSeq<'a>(&'a HeaderMap, &'a HeaderName);

impl<'a> Serialize for ToSeq<'a> {
fn serialize<S: Serializer>(&self, ser: S) -> Result<S::Ok, S::Error> {
let name = self.1;
let count = self.0.get_all(name).count();
if ser.is_human_readable() {
if count == 1 {
let v = self.0.get(name).expect("header is present");
if let Ok(s) = v.to_str() {
return ser.serialize_str(s);
}
}
ser.collect_seq(self.0.get_all(name).filter_map(|v| v.to_str().ok()))
} else {
let mut seq = ser.serialize_seq(Some(count))?;
for v in self.0.get_all(name) {
seq.serialize_element(v.as_bytes())?;
}
seq.end()
}
}
}

/// Implementation detail. Use derive annotations instead.
pub fn serialize<S: Serializer>(headers: &HeaderMap, ser: S) -> Result<S::Ok, S::Error> {
ser.collect_map(headers.keys().map(|k| (k.as_str(), ToSeq(headers, k))))
}

#[derive(serde::Deserialize)]
#[serde(untagged)]
enum OneOrMore<'a> {
One(Cow<'a, str>),
Strings(Vec<Cow<'a, str>>),
Bytes(Vec<Cow<'a, [u8]>>),
}

struct HeaderMapVisitor {
is_human_readable: bool,
}

impl<'de> Visitor<'de> for HeaderMapVisitor {
type Value = HeaderMap;

// Format a message stating what data this Visitor expects to receive.
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("lots of things can go wrong with HeaderMap")
}

fn visit_map<M>(self, mut access: M) -> Result<Self::Value, M::Error>
where
M: MapAccess<'de>,
{
let mut map = HeaderMap::with_capacity(access.size_hint().unwrap_or(0));

if !self.is_human_readable {
while let Some((key, arr)) = access.next_entry::<Cow<str>, Vec<Cow<[u8]>>>()? {
let key = HeaderName::from_bytes(key.as_bytes())
.map_err(|_| de::Error::invalid_value(Unexpected::Str(&key), &self))?;
for val in arr {
let val = HeaderValue::from_bytes(&val).map_err(|_| {
de::Error::invalid_value(Unexpected::Bytes(&val), &self)
})?;
map.append(key.clone(), val);
}
}
} else {
while let Some((key, val)) = access.next_entry::<Cow<str>, OneOrMore>()? {
let key = HeaderName::from_bytes(key.as_bytes())
.map_err(|_| de::Error::invalid_value(Unexpected::Str(&key), &self))?;
match val {
OneOrMore::One(val) => {
let val = val.parse().map_err(|_| {
de::Error::invalid_value(Unexpected::Str(&val), &self)
})?;
map.insert(key, val);
}
OneOrMore::Strings(arr) => {
for val in arr {
let val = val.parse().map_err(|_| {
de::Error::invalid_value(Unexpected::Str(&val), &self)
})?;
map.append(key.clone(), val);
}
}
OneOrMore::Bytes(arr) => {
for val in arr {
let val = HeaderValue::from_bytes(&val).map_err(|_| {
de::Error::invalid_value(Unexpected::Bytes(&val), &self)
})?;
map.append(key.clone(), val);
}
}
};
}
}
Ok(map)
}
}

/// Implementation detail.
pub fn deserialize<'de, D>(de: D) -> Result<HeaderMap, D::Error>
where
D: Deserializer<'de>,
{
let is_human_readable = de.is_human_readable();
de.deserialize_map(HeaderMapVisitor { is_human_readable })
}
}
4 changes: 2 additions & 2 deletions casper-server/src/storage/backends/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl Storage for MemoryBackend {
let resp = memory
.get_unexpired(&key)
.map(|value| {
let headers = decode_headers(&value.headers)?;
let headers = decode_headers(&value.headers, true)?;
let body = Body::Bytes(value.body.clone());

let mut resp = Response::with_body(value.status, body);
Expand Down Expand Up @@ -224,7 +224,7 @@ impl Storage for MemoryBackend {
let result = (|| {
let value = Value {
status: item.status,
headers: encode_headers(&item.headers)?,
headers: encode_headers(&item.headers, true)?,
body: item.body,
expires: SystemTime::now() + item.ttl,
surrogate_keys: item.surrogate_keys,
Expand Down
7 changes: 5 additions & 2 deletions casper-server/src/storage/backends/redis/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,15 @@ bitflags! {
const HEADERS_COMPRESSED = 0b00000010; // Headers compression
const BODY_COMPRESSED = 0b00000100; // Body compression
const ENCRYPTED = 0b00001000;
const HEADERS_V2 = 0b00010000; // Headers format version 2 (temporary)
}
}

const EX_COMPRESSED: Flags = Flags::EX_COMPRESSED; // Deprecated
const HEADERS_COMPRESSED: Flags = Flags::HEADERS_COMPRESSED;
const BODY_COMPRESSED: Flags = Flags::BODY_COMPRESSED;
const ENCRYPTED: Flags = Flags::ENCRYPTED;
const HEADERS_V2: Flags = Flags::HEADERS_V2;

struct RedisMetrics {
pub internal_cache_counter: Counter<u64>,
Expand Down Expand Up @@ -264,7 +266,8 @@ impl RedisBackend {
}

// Decode them
let headers = decode_headers(&raw_headers).context("failed to decode headers")?;
let v2format = flags.contains(HEADERS_V2);
let headers = decode_headers(&raw_headers, v2format).context("failed to decode headers")?;

// If we have only one chunk, decode it in-place
if response_item.num_chunks == 1 {
Expand Down Expand Up @@ -380,7 +383,7 @@ impl RedisBackend {
}

async fn store_response_inner<'a>(&self, item: Item<'a>) -> Result<()> {
let mut headers = Bytes::from(encode_headers(&item.headers)?);
let mut headers = Bytes::from(encode_headers(&item.headers, false)?);
let mut body = item.body;
let body_length = body.len();

Expand Down
24 changes: 20 additions & 4 deletions casper-server/src/storage/common.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
use ntex::http::header::HeaderMap;
use serde::{Deserialize, Serialize};

pub fn encode_headers(headers: &HeaderMap) -> Result<Vec<u8>, flexbuffers::SerializationError> {
use crate::http::serde as http_serde;

pub fn encode_headers(
headers: &HeaderMap,
v2: bool,
) -> Result<Vec<u8>, flexbuffers::SerializationError> {
let mut serializer = flexbuffers::FlexbufferSerializer::new();
headers.serialize(&mut serializer)?;
if v2 {
headers.serialize(&mut serializer)?;
} else {
http_serde::header_map::serialize(headers, &mut serializer)?;
}
Ok(serializer.take_buffer())
}

pub fn decode_headers(data: &[u8]) -> Result<HeaderMap, flexbuffers::DeserializationError> {
pub fn decode_headers(
data: &[u8],
v2: bool,
) -> Result<HeaderMap, flexbuffers::DeserializationError> {
let deserializer = flexbuffers::Reader::get_root(data)?;
HeaderMap::deserialize(deserializer)
if v2 {
HeaderMap::deserialize(deserializer)
} else {
http_serde::header_map::deserialize(deserializer)
}
}

0 comments on commit 0ad7502

Please sign in to comment.