Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

General code review #1

Closed
wants to merge 83 commits into from
Closed

General code review #1

wants to merge 83 commits into from

Conversation

Wiezzel
Copy link
Contributor

@Wiezzel Wiezzel commented Feb 5, 2024

Make /status report all chunks scheduled for download
instead of only those currently being downloaded
src/main.rs Outdated Show resolved Hide resolved
src/main.rs Outdated
Comment on lines 40 to 84
async fn ping_forever(
state_manager: Arc<StateManager>,
transport: Arc<impl Transport>,
interval: Duration,
) {
loop {
let status = state_manager.current_status().await;
let result = transport
.send_ping(transport::State {
datasets: status.available,
})
.await;
if let Err(err) = result {
warn!("Couldn't send ping: {:?}", err);
}
tokio::time::sleep(interval).await;
}
}

async fn handle_updates_forever(
state_manager: Arc<StateManager>,
mut updates: impl Stream<Item = Ranges> + Unpin,
) {
while let Some(ranges) = updates.next().await {
let result = state_manager.set_desired_ranges(ranges).await;
if let Err(err) = result {
warn!("Couldn't schedule update: {:?}", err)
}
}
unreachable!("Updates receiver closed unexpectedly");
}

#[instrument(skip(state_manager, transport))]
async fn process_assignments(
state_manager: Arc<StateManager>,
transport: Arc<impl Transport>,
interval: Duration,
) {
let state_updates = transport.stream_assignments();
tokio::pin!(state_updates);
futures::join!(
ping_forever(state_manager.clone(), transport, interval),
handle_updates_forever(state_manager, state_updates),
);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably be moved to a separate file controller.rs.

I'm not sure what would be a good way to handle dependencies here. I don't want the Transport to know anything about the StateManager, but I want requests for status being made by a direct function call instead of one-shot channels or callbacks. One option would be to have a Controller struct that forwards status() calls to its StateManager and passes reference to itself to the Transport. That way the Transport and the StateManager only know about the Controller but not about each other.

src/storage/downloader.rs Outdated Show resolved Hide resolved
src/storage/layout.rs Show resolved Hide resolved
src/storage/layout.rs Show resolved Hide resolved
src/storage/manager.rs Outdated Show resolved Hide resolved
src/storage/manager.rs Outdated Show resolved Hide resolved
src/storage/manager.rs Outdated Show resolved Hide resolved
src/types/os_str.rs Outdated Show resolved Hide resolved
- Move components interaction into controller.rs
- Stop the process if any background task stops
- Move query related code out of http_server.rs
- Don't limit concurrent downloads for individual files
- Cancel dir download if any file failed to download
- Cancel running downloads on shutdown
Reorganizing crate into a library was required
DataFusion is not optimizing aliased unions so things like
`df.union(df.filter(...))` result in two scans.
However it does optimize expressions like
`df.filter(or(true, ...))` into just `df`
@Wiezzel Wiezzel closed this May 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants