Skip to content

Commit

Permalink
Merge branch 'main' into forward_error
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz authored Dec 19, 2023
2 parents 3f3024c + fb1fe6a commit 0660ea1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 38 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/publish_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ jobs:

- name: Export digest
run: |
mkdir -p /tmp/digests/${{ matrix.platform }}
mkdir -p /tmp/digests
digest="${{ steps.build.outputs.digest }}"
touch "/tmp/digests/${digest#sha256:}"
- name: Upload digest
uses: actions/upload-artifact@v4
uses: actions/upload-artifact@v3
with:
name: digest
path: /tmp/digests/${{ matrix.platform }}/*
path: /tmp/digests/*
if-no-files-found: error
retention-days: 1

Expand All @@ -89,10 +89,10 @@ jobs:
needs: [docker]
steps:
- name: Download digests
uses: actions/download-artifact@v4
uses: actions/download-artifact@v3
with:
name: digest
path: /tmp/digests/${{ matrix.platform }}
path: /tmp/digests

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
Expand All @@ -114,7 +114,7 @@ jobs:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_ACCESS_TOKEN }}
- name: Create manifest list and push tags
working-directory: /tmp/digests/${{ matrix.platform }}
working-directory: /tmp/digests
run: |
docker buildx imagetools create $(jq -cr '.tags | map("-t " + .) | join(" ")' <<< "$DOCKER_METADATA_OUTPUT_JSON") \
$(printf '${{ env.REGISTRY_IMAGE }}@sha256:%s ' *)
Expand Down
83 changes: 51 additions & 32 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,12 @@ impl Handler<DeleteIndexRequest> for ControlPlane {
.flat_map(|shard_entry| shard_entry.ingester_nodes())
.collect();

self.model.delete_index(&index_uid);

self.ingest_controller
.sync_with_ingesters(&ingester_needing_resync, &self.model)
.await;

self.model.delete_index(&index_uid);

// TODO: Refine the event. Notify index will have the effect to reload the entire state from
// the metastore. We should update the state of the control plane.
self.indexing_scheduler
Expand Down Expand Up @@ -567,6 +567,7 @@ impl EventSubscriber<ShardPositionsUpdate> for ControlPlaneEventSubscriber {

#[cfg(test)]
mod tests {
use mockall::Sequence;
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
use quickwit_config::{IndexConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_indexing::IndexingService;
Expand Down Expand Up @@ -1301,46 +1302,33 @@ mod tests {

let ingester_pool = IngesterPool::default();
let mut ingester_mock = IngesterServiceClient::mock();
ingester_mock
.expect_retain_shards()
.times(2)
.returning(|mut request| {
assert_eq!(request.retain_shards_for_sources.len(), 1);
let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap();
assert_eq!(&retain_shards_for_source.shard_ids, &[15]);
Ok(RetainShardsResponse {})
});
ingester_pool.insert("node1".into(), ingester_mock.into());
let mut seq = Sequence::new();

let mut index_0 = IndexMetadata::for_test("test-index-0", "ram:///test-index-0");
let index_uid_clone = index_0.index_uid.clone();

let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore.expect_delete_index().return_once(
move |delete_index_request: DeleteIndexRequest| {
assert_eq!(delete_index_request.index_uid, index_uid_clone.to_string());
Ok(EmptyResponse {})
},
);

let mut source = SourceConfig::ingest_v2_default();
source.enabled = true;
index_0.add_source(source.clone()).unwrap();

let index_uid_clone = index_0.index_uid.clone();
let index_0_clone = index_0.clone();
mock_metastore.expect_list_indexes_metadata().return_once(
move |list_indexes_request: ListIndexesMetadataRequest| {

let mut mock_metastore = MetastoreServiceClient::mock();
mock_metastore
.expect_list_indexes_metadata()
.times(1)
.in_sequence(&mut seq)
.returning(move |list_indexes_request: ListIndexesMetadataRequest| {
assert_eq!(list_indexes_request, ListIndexesMetadataRequest::all());
Ok(ListIndexesMetadataResponse::try_from_indexes_metadata(vec![
index_0_clone.clone()
])
.unwrap())
},
);

let index_uid_clone = index_0.index_uid.clone();
mock_metastore.expect_list_shards().return_once(
move |_list_shards_request: ListShardsRequest| {
});
mock_metastore
.expect_list_shards()
.times(1)
.in_sequence(&mut seq)
.returning(move |_list_shards_request: ListShardsRequest| {
let list_shards_resp = ListShardsResponse {
subresponses: vec![ListShardsSubresponse {
index_uid: index_uid_clone.to_string(),
Expand All @@ -1359,8 +1347,39 @@ mod tests {
}],
};
Ok(list_shards_resp)
},
);
});

ingester_mock
.expect_retain_shards()
.times(1)
.in_sequence(&mut seq)
.returning(|mut request| {
assert_eq!(request.retain_shards_for_sources.len(), 1);
let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap();
assert_eq!(&retain_shards_for_source.shard_ids, &[15]);
Ok(RetainShardsResponse {})
});

let index_uid_clone = index_0.index_uid.clone();
mock_metastore
.expect_delete_index()
.times(1)
.in_sequence(&mut seq)
.returning(move |delete_index_request: DeleteIndexRequest| {
assert_eq!(delete_index_request.index_uid, index_uid_clone.to_string());
Ok(EmptyResponse {})
});
ingester_mock
.expect_retain_shards()
.times(1)
.in_sequence(&mut seq)
.returning(|mut request| {
assert_eq!(request.retain_shards_for_sources.len(), 1);
let retain_shards_for_source = request.retain_shards_for_sources.pop().unwrap();
assert!(&retain_shards_for_source.shard_ids.is_empty());
Ok(RetainShardsResponse {})
});
ingester_pool.insert("node1".into(), ingester_mock.into());

let (control_plane_mailbox, _control_plane_handle) = ControlPlane::spawn(
&universe,
Expand Down

0 comments on commit 0660ea1

Please sign in to comment.