Skip to content

Commit

Permalink
Replace handwritten getters and use macro instead
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 30, 2024
1 parent 3916104 commit 80351ae
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 186 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl Handler<ShardPositionsUpdate> for ControlPlane {
for (shard_id, position) in shard_positions_update.updated_shard_positions {
if let Some(shard) = shard_entries.get_mut(&shard_id) {
shard.publish_position_inclusive =
Some(shard.publish_position_inclusive().max(&position).clone());
Some(shard.publish_position_inclusive().max(position.clone()));
if position.is_eof() {
// identify shards that have reached EOF but have not yet been removed.
info!(shard_id=%shard_id, position=?position, "received eof shard via gossip");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,7 @@ impl IngestController {

for shard_id in shard_ids.shard_ids {
if let Some(shard_entry) = shard_entries.get(&shard_id) {
let publish_position_inclusive =
shard_entry.publish_position_inclusive().clone();
let publish_position_inclusive = shard_entry.publish_position_inclusive();

shard_positions_to_truncate.push(ShardIdPosition {
shard_id: Some(shard_id),
Expand All @@ -842,9 +841,8 @@ impl IngestController {
});
}
}

if enabled!(Level::DEBUG) {
let shards_to_truncate: Vec<(&str, &Position)> = shards_to_truncate
let shards_to_truncate: Vec<(&str, Position)> = shards_to_truncate
.iter()
.flat_map(|shard_positions| {
shard_positions
Expand Down
9 changes: 4 additions & 5 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ impl IngestSource {
assigned_shard.status = IndexingStatus::Active;

let partition_id = assigned_shard.partition_id.clone();
let from_position_exclusive = fetch_payload.from_position_exclusive().clone();
let to_position_inclusive = fetch_payload.to_position_inclusive().clone();
let from_position_exclusive = fetch_payload.from_position_exclusive();
let to_position_inclusive = fetch_payload.to_position_inclusive();

for mrecord in decoded_mrecords(mrecord_batch) {
match mrecord {
Expand Down Expand Up @@ -257,7 +257,7 @@ impl IngestSource {

let partition_id = assigned_shard.partition_id.clone();
let from_position_exclusive = assigned_shard.current_position_inclusive.clone();
let to_position_inclusive = fetch_eof.eof_position().clone();
let to_position_inclusive = fetch_eof.eof_position();

batch_builder
.checkpoint_delta
Expand Down Expand Up @@ -575,8 +575,7 @@ impl Source for IngestSource {
for acquired_shard in acquire_shards_response.acquired_shards {
let index_uid = acquired_shard.index_uid().clone();
let shard_id = acquired_shard.shard_id().clone();
let mut current_position_inclusive =
acquired_shard.publish_position_inclusive().clone();
let mut current_position_inclusive = acquired_shard.publish_position_inclusive();
let leader_id: NodeId = acquired_shard.leader_id.into();
let follower_id_opt: Option<NodeId> = acquired_shard.follower_id.map(Into::into);
let source_id: SourceId = acquired_shard.source_id;
Expand Down
8 changes: 4 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl FetchStreamTask {
};
let batch_size = mrecord_batch.estimate_size();
let fetch_payload = FetchPayload {
index_uid: self.index_uid.clone().into(),
index_uid: Some(self.index_uid.clone()),
source_id: self.source_id.clone(),
shard_id: Some(self.shard_id.clone()),
mrecord_batch: Some(mrecord_batch),
Expand Down Expand Up @@ -209,7 +209,7 @@ impl FetchStreamTask {
let eof_position = to_position_inclusive.as_eof();

let fetch_eof = FetchEof {
index_uid: self.index_uid.clone().into(),
index_uid: Some(self.index_uid.clone()),
source_id: self.source_id.clone(),
shard_id: Some(self.shard_id.clone()),
eof_position: Some(eof_position),
Expand Down Expand Up @@ -553,7 +553,7 @@ async fn fault_tolerant_fetch_stream(
Ok(fetch_message) => match &fetch_message.message {
Some(fetch_message::Message::Payload(fetch_payload)) => {
let batch_size = fetch_payload.estimate_size();
let to_position_inclusive = fetch_payload.to_position_inclusive().clone();
let to_position_inclusive = fetch_payload.to_position_inclusive();
let in_flight_value = InFlightValue::new(
fetch_message,
batch_size,
Expand All @@ -566,7 +566,7 @@ async fn fault_tolerant_fetch_stream(
*from_position_exclusive = to_position_inclusive;
}
Some(fetch_message::Message::Eof(fetch_eof)) => {
let eof_position = fetch_eof.eof_position().clone();
let eof_position = fetch_eof.eof_position();
let in_flight_value = InFlightValue::new(
fetch_message,
ByteSize(0),
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const MIN_RESET_SHARDS_INTERVAL: Duration = if cfg!(any(test, feature = "testsui
/// Duration after which persist requests time out with
/// [`quickwit_proto::ingest::IngestV2Error::Timeout`].
pub(super) const PERSIST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(100)
Duration::from_millis(500)
} else {
Duration::from_secs(6)
};
Expand Down Expand Up @@ -1184,7 +1184,7 @@ impl EventSubscriber<ShardPositionsUpdate> for WeakIngesterState {
if shard_position.is_eof() {
state_guard.delete_shard(&queue_id).await;
} else if !shard_position.is_beginning() {
state_guard.truncate_shard(&queue_id, &shard_position).await;
state_guard.truncate_shard(&queue_id, shard_position).await;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-ingest/src/ingest_v2/mrecordlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ pub(super) fn queue_position_range(
mod tests {
use super::*;

#[cfg(not(feature = "failpoints"))]
#[tokio::test]
async fn test_append_non_empty_doc_batch() {
let tempdir = tempfile::tempdir().unwrap();
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(super) const SYN_REPLICATION_STREAM_CAPACITY: usize = 5;

/// Duration after which replication requests time out with [`ReplicationError::Timeout`].
const REPLICATION_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) {
Duration::from_millis(100)
Duration::from_millis(250)
} else {
Duration::from_secs(3)
};
Expand Down Expand Up @@ -555,7 +555,7 @@ impl ReplicationTask {

for subrequest in replicate_request.subrequests {
let queue_id = subrequest.queue_id();
let from_position_exclusive = subrequest.from_position_exclusive().clone();
let from_position_exclusive = subrequest.from_position_exclusive();

let Some(shard) = state_guard.shards.get(&queue_id) else {
let replicate_failure = ReplicateFailure {
Expand Down Expand Up @@ -1376,6 +1376,7 @@ mod tests {
);
}

#[cfg(not(feature = "failpoints"))]
#[tokio::test]
async fn test_replication_task_deletes_dangling_shard() {
let leader_id: NodeId = "test-leader".into();
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ impl FullyLockedIngesterState<'_> {
pub async fn truncate_shard(
&mut self,
queue_id: &QueueId,
truncate_up_to_position_inclusive: &Position,
truncate_up_to_position_inclusive: Position,
) {
// TODO: Replace with if-let-chains when stabilized.
let Some(truncate_up_to_offset_inclusive) = truncate_up_to_position_inclusive.as_u64()
Expand All @@ -365,7 +365,7 @@ impl FullyLockedIngesterState<'_> {
let Some(shard) = self.inner.shards.get_mut(queue_id) else {
return;
};
if shard.truncation_position_inclusive >= *truncate_up_to_position_inclusive {
if shard.truncation_position_inclusive >= truncate_up_to_position_inclusive {
return;
}
match self
Expand All @@ -374,8 +374,8 @@ impl FullyLockedIngesterState<'_> {
.await
{
Ok(_) => {
shard.truncation_position_inclusive = truncate_up_to_position_inclusive.clone();
info!("truncated shard `{queue_id}` at {truncate_up_to_position_inclusive}");
shard.truncation_position_inclusive = truncate_up_to_position_inclusive;
}
Err(TruncateError::MissingQueue(_)) => {
error!("failed to truncate shard `{queue_id}`: WAL queue not found");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Shards {
for shard in shards_vec {
let shard_id = shard.shard_id().clone();
let partition_id = PartitionId::from(shard_id.as_str());
let position = shard.publish_position_inclusive().clone();
let position = shard.publish_position_inclusive();
checkpoint.add_partition(partition_id, position);
shards.insert(shard_id, shard);
}
Expand Down
116 changes: 106 additions & 10 deletions quickwit/quickwit-proto/src/getters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use crate::ingest::*;
use crate::metastore::*;
use crate::types::*;

macro_rules! generate_getters{
macro_rules! generate_getters {
(impl fn $field:ident() -> $type:ty {} for $($struct:ty),+) => {
$(
impl $struct {
// we track caller so the reported line isn't the macro invocation below
#[track_caller]
pub fn $field(&self) -> &$type {
pub fn $field(&self) -> $type {
self.$field
.as_ref()
.expect(concat!("`",
Expand All @@ -23,8 +23,42 @@ macro_rules! generate_getters{
}
}

macro_rules! generate_clone_getters {
(impl fn $field:ident() -> $type:ty {} for $($struct:ty),+) => {
$(
impl $struct {
// we track caller so the reported line isn't the macro invocation below
#[track_caller]
pub fn $field(&self) -> $type {
self.$field
.clone()
.expect(concat!("`",
stringify!($field), "` should be a required field"))
}
}
)*
}
}

macro_rules! generate_copy_getters {
(impl fn $field:ident() -> $type:ty {} for $($struct:ty),+) => {
$(
impl $struct {
// we track caller so the reported line isn't the macro invocation below
#[track_caller]
pub fn $field(&self) -> $type {
self.$field
.expect(concat!("`",
stringify!($field), "` should be a required field"))
}
}
)*
}
}

// [`IndexUid`] getters
generate_getters! {
impl fn index_uid() -> IndexUid {} for
impl fn index_uid() -> &IndexUid {} for
// Control Plane API
GetOrCreateOpenShardsSuccess,

Expand All @@ -44,6 +78,7 @@ generate_getters! {
ReplicateSuccess,
RetainShardsForSource,
Shard,
ShardIdPositions,
ShardIds,
ShardPKey,
TruncateShardsSubrequest,
Expand Down Expand Up @@ -73,23 +108,84 @@ generate_getters! {
UpdateSplitsDeleteOpstampRequest
}

generate_getters! {
impl fn shard() -> Shard {} for
// [`PipelineUid`] getters
generate_copy_getters! {
impl fn pipeline_uid() -> PipelineUid {} for

InitShardSubrequest,
InitShardSuccess
IndexingTask
}

// [`Position`] getters. We use `clone` because `Position` is an `Arc` under the hood.
generate_clone_getters! {
impl fn eof_position() -> Position {} for

FetchEof
}

generate_clone_getters! {
impl fn from_position_exclusive() -> Position {} for

FetchPayload,
OpenFetchStreamRequest,
ReplicateSubrequest
}

generate_clone_getters! {
impl fn to_position_inclusive() -> Position {} for

FetchPayload
}

generate_clone_getters! {
impl fn publish_position_inclusive() -> Position {} for

Shard,
ShardIdPosition
}

generate_clone_getters! {
impl fn replication_position_inclusive() -> Position {} for

ReplicateSuccess
}

generate_clone_getters! {
impl fn truncate_up_to_position_inclusive() -> Position {} for

TruncateShardsSubrequest
}

// [`Shard`] getters
generate_getters! {
impl fn open_shard() -> Shard {} for
impl fn open_shard() -> &Shard {} for

OpenShardSubresponse
}

generate_getters! {
impl fn shard_id() -> ShardId {} for
impl fn shard() -> &Shard {} for

InitShardSubrequest,
InitShardSuccess
}

// [`ShardId`] getters
generate_getters! {
impl fn shard_id() -> &ShardId {} for

FetchEof,
FetchPayload,
InitShardFailure,
OpenFetchStreamRequest,
OpenShardSubrequest,
ShardPKey
PersistFailure,
PersistSubrequest,
PersistSuccess,
ReplicateFailure,
ReplicateSubrequest,
ReplicateSuccess,
Shard,
ShardIdPosition,
ShardPKey,
TruncateShardsSubrequest
}
7 changes: 0 additions & 7 deletions quickwit/quickwit-proto/src/indexing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,6 @@ pub struct ShardPositionsUpdate {

impl Event for ShardPositionsUpdate {}

impl IndexingTask {
pub fn pipeline_uid(&self) -> PipelineUid {
self.pipeline_uid
.expect("`pipeline_uid` should be a required field")
}
}

impl RpcName for ApplyIndexingPlanRequest {
fn rpc_name() -> &'static str {
"apply_indexing_plan"
Expand Down
Loading

0 comments on commit 80351ae

Please sign in to comment.