Skip to content

Commit

Permalink
Fix otlp hex fields serialization
Browse files Browse the repository at this point in the history
Co-authored-by: François Massot <[email protected]>
  • Loading branch information
ddelemeny and fmassot committed Dec 22, 2023
1 parent f3a2b50 commit 6333f4c
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 39 deletions.
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-opentelemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ documentation = "https://quickwit.io/docs/"
anyhow = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
hex = { workspace = true }
once_cell = { workspace = true }
prost = { workspace = true }
serde = { workspace = true }
Expand Down
32 changes: 13 additions & 19 deletions quickwit/quickwit-opentelemetry/src/otlp/span_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,13 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct SpanId([u8; 8]);

impl SpanId {
pub const BASE64_LENGTH: usize = 12;
pub const HEX_LENGTH: usize = 16;

pub fn new(bytes: [u8; 8]) -> Self {
Self(bytes)
Expand All @@ -42,33 +40,29 @@ impl SpanId {

impl Serialize for SpanId {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let b64span_id = BASE64_STANDARD.encode(self.0);
serializer.serialize_str(&b64span_id)
let hexspan_id = hex::encode(self.0);
serializer.serialize_str(&hexspan_id)
}
}

impl<'de> Deserialize<'de> for SpanId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
let b64span_id = String::deserialize(deserializer)?;
let hexspan_id = String::deserialize(deserializer)?;

if b64span_id.len() != SpanId::BASE64_LENGTH {
if hexspan_id.len() != SpanId::HEX_LENGTH {
let message = format!(
"base64 span ID must be {} bytes long, got {}",
SpanId::BASE64_LENGTH,
b64span_id.len()
"hex span ID must be {} bytes long, got {}",
SpanId::HEX_LENGTH,
hexspan_id.len()
);
return Err(de::Error::custom(message));
}
let mut span_id = [0u8; 8];
BASE64_STANDARD
// Using the unchecked version here because otherwise the engine gets the wrong size
// estimate and fails.
.decode_slice_unchecked(b64span_id.as_bytes(), &mut span_id)
.map_err(|error| {
let message = format!("failed to decode base64 span ID: {:?}", error);
de::Error::custom(message)
})?;
hex::decode_to_slice(hexspan_id, &mut span_id).map_err(|error| {
let message = format!("failed to decode hex span ID: {:?}", error);
de::Error::custom(message)
})?;
Ok(SpanId(span_id))
}
}
Expand Down Expand Up @@ -104,7 +98,7 @@ mod tests {
fn test_span_id_serde() {
let expected_span_id = SpanId::new([1; 8]);
let span_id_json = serde_json::to_string(&expected_span_id).unwrap();
assert_eq!(span_id_json, r#""AQEBAQEBAQE=""#);
assert_eq!(span_id_json, r#""0101010101010101""#);

let span_id = serde_json::from_str::<SpanId>(&span_id_json).unwrap();
assert_eq!(span_id, expected_span_id,);
Expand Down
31 changes: 13 additions & 18 deletions quickwit/quickwit-opentelemetry/src/otlp/trace_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@
use base64::display::Base64Display;
use base64::engine::GeneralPurpose;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};

#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct TraceId([u8; 16]);

impl TraceId {
pub const BASE64_LENGTH: usize = 24;
pub const HEX_LENGTH: usize = 32;

pub fn new(bytes: [u8; 16]) -> Self {
Self(bytes)
Expand All @@ -49,8 +48,8 @@ impl TraceId {
impl Serialize for TraceId {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
if serializer.is_human_readable() {
let b64trace_id = BASE64_STANDARD.encode(self.0);
serializer.serialize_str(&b64trace_id)
let hextrace_id = hex::encode(self.0);
serializer.serialize_str(&hextrace_id)
} else {
self.0.serialize(serializer)
}
Expand All @@ -61,24 +60,20 @@ impl<'de> Deserialize<'de> for TraceId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de> {
if deserializer.is_human_readable() {
let b64trace_id = String::deserialize(deserializer)?;
if b64trace_id.len() != TraceId::BASE64_LENGTH {
let hextrace_id = String::deserialize(deserializer)?;
if hextrace_id.len() != TraceId::HEX_LENGTH {
let message = format!(
"base64 trace ID must be {} bytes long, got {}",
TraceId::BASE64_LENGTH,
b64trace_id.len()
"hex trace ID must be {} bytes long, got {}",
TraceId::HEX_LENGTH,
hextrace_id.len()
);
return Err(de::Error::custom(message));
}
let mut trace_id_bytes = [0u8; 16];
BASE64_STANDARD
// Using the unchecked version here because otherwise the engine gets the wrong size
// estimate and fails.
.decode_slice_unchecked(b64trace_id.as_bytes(), &mut trace_id_bytes)
.map_err(|error| {
let message = format!("failed to decode base64 trace ID: {:?}", error);
de::Error::custom(message)
})?;
hex::decode_to_slice(hextrace_id, &mut trace_id_bytes).map_err(|error| {
let message = format!("failed to decode hex span ID: {:?}", error);
de::Error::custom(message)
})?;
Ok(TraceId(trace_id_bytes))
} else {
let trace_id_bytes: [u8; 16] = <[u8; 16]>::deserialize(deserializer)?;
Expand Down Expand Up @@ -118,7 +113,7 @@ mod tests {
fn test_trace_id_serde() {
let expected_trace_id = TraceId::new([1; 16]);
let trace_id_json = serde_json::to_string(&expected_trace_id).unwrap();
assert_eq!(trace_id_json, r#""AQEBAQEBAQEBAQEBAQEBAQ==""#);
assert_eq!(trace_id_json, r#""01010101010101010101010101010101""#);

let trace_id = serde_json::from_str::<TraceId>(&trace_id_json).unwrap();
assert_eq!(trace_id, expected_trace_id,);
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-search/src/find_trace_ids_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl SegmentCollector for FindTraceIdsSegmentCollector {
}

fn harvest(self) -> Self::Fruit {
let mut buffer = Vec::with_capacity(TraceId::BASE64_LENGTH);
let mut buffer = Vec::with_capacity(TraceId::HEX_LENGTH);
self.select_trace_ids
.harvest()
.into_iter()
Expand Down
4 changes: 3 additions & 1 deletion quickwit/quickwit-search/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ async fn test_single_node_without_timestamp_with_query_start_timestamp_enabled(
let start_timestamp = OffsetDateTime::now_utc().unix_timestamp();
for i in 0..30 {
let body = format!("info @ t:{}", i + 1);
docs.push(json!({"body": body}));
docs.push(json!({ "body": body }));
}
test_sandbox.add_documents(docs).await?;

Expand Down Expand Up @@ -1782,6 +1782,8 @@ async fn test_single_node_find_trace_ids_collector() {
- name: trace_id
type: bytes
fast: true
input_format: hex
output_format: hex
- name: span_timestamp_secs
type: datetime
fast: true
Expand Down

0 comments on commit 6333f4c

Please sign in to comment.