Skip to content

Commit

Permalink
Put mrecordlog behind its own lock
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Jan 22, 2024
1 parent 8b7d723 commit 9bf5cd3
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 334 deletions.
29 changes: 10 additions & 19 deletions quickwit/quickwit-ingest/src/ingest_v2/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::{BTreeMap, BTreeSet};
use std::sync::Weak;
use std::time::Duration;

use bytesize::ByteSize;
Expand All @@ -30,12 +29,11 @@ use quickwit_common::tower::Rate;
use quickwit_proto::ingest::ShardState;
use quickwit_proto::types::{split_queue_id, NodeId, QueueId, ShardId, SourceUid};
use serde::{Deserialize, Serialize, Serializer};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tracing::{debug, warn};

use super::ingester::IngesterState;
use super::metrics::INGEST_V2_METRICS;
use super::state::WeakIngesterState;
use crate::RateMibPerSec;

const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) {
Expand Down Expand Up @@ -149,11 +147,11 @@ impl LocalShardsSnapshot {
/// broadcasts it to other nodes via Chitchat.
pub(super) struct BroadcastLocalShardsTask {
cluster: Cluster,
weak_state: Weak<RwLock<IngesterState>>,
weak_state: WeakIngesterState,
}

impl BroadcastLocalShardsTask {
pub fn spawn(cluster: Cluster, weak_state: Weak<RwLock<IngesterState>>) -> JoinHandle<()> {
pub fn spawn(cluster: Cluster, weak_state: WeakIngesterState) -> JoinHandle<()> {
let mut broadcaster = Self {
cluster,
weak_state,
Expand All @@ -163,7 +161,7 @@ impl BroadcastLocalShardsTask {

async fn snapshot_local_shards(&self) -> Option<LocalShardsSnapshot> {
let state = self.weak_state.upgrade()?;
let mut state_guard = state.write().await;
let mut state_guard = state.lock_partially().await;

let mut per_source_shard_infos: BTreeMap<SourceUid, ShardInfos> = BTreeMap::new();

Expand Down Expand Up @@ -331,21 +329,22 @@ pub async fn setup_local_shards_update_listener(

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use mrecordlog::MultiRecordLog;
use quickwit_cluster::{create_cluster_for_test, ChannelTransport};
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
use quickwit_proto::ingest::ingester::{IngesterStatus, ObservationMessage};
use quickwit_proto::ingest::ingester::ObservationMessage;
use quickwit_proto::ingest::ShardState;
use quickwit_proto::types::{queue_id, Position};
use tokio::sync::watch;

use super::*;
use crate::ingest_v2::models::IngesterShard;
use crate::ingest_v2::rate_meter::RateMeter;
use crate::ingest_v2::state::IngesterState;

#[test]
fn test_shard_info_serde() {
Expand Down Expand Up @@ -473,24 +472,16 @@ mod tests {
let tempdir = tempfile::tempdir().unwrap();
let mrecordlog = MultiRecordLog::open(tempdir.path()).await.unwrap();
let (observation_tx, _observation_rx) = watch::channel(Ok(ObservationMessage::default()));
let state = Arc::new(RwLock::new(IngesterState {
mrecordlog,
shards: HashMap::new(),
rate_trackers: HashMap::new(),
replication_streams: HashMap::new(),
replication_tasks: HashMap::new(),
status: IngesterStatus::Ready,
observation_tx,
}));
let weak_state = Arc::downgrade(&state);
let state = IngesterState::new(mrecordlog, observation_tx);
let weak_state = state.weak();
let task = BroadcastLocalShardsTask {
cluster,
weak_state,
};
let previous_snapshot = task.snapshot_local_shards().await.unwrap();
assert!(previous_snapshot.per_source_shard_infos.is_empty());

let mut state_guard = state.write().await;
let mut state_guard = state.lock_partially().await;

let queue_id_01 = queue_id("test-index:0", "test-source", &ShardId::from(1));
let shard =
Expand Down
Loading

0 comments on commit 9bf5cd3

Please sign in to comment.