From f310221e424ce2f53137bcc584ae02cf17482237 Mon Sep 17 00:00:00 2001 From: deanlee Date: Thu, 13 Jun 2024 20:03:56 +0800 Subject: [PATCH] avoid dead locks and improve responsive --- tools/cabana/chart/chart.cc | 2 +- tools/cabana/chart/chartswidget.cc | 24 ++++--- tools/cabana/chart/chartswidget.h | 16 ++--- tools/cabana/mainwin.cc | 1 - tools/cabana/streams/abstractstream.cc | 22 ++++++- tools/cabana/streams/abstractstream.h | 8 ++- tools/cabana/streams/replaystream.cc | 7 +- tools/cabana/streams/replaystream.h | 3 - tools/cabana/videowidget.cc | 23 ++++--- tools/cabana/videowidget.h | 8 +-- tools/replay/camera.cc | 1 - tools/replay/logreader.cc | 6 +- tools/replay/replay.cc | 88 +++++++++++++++----------- tools/replay/replay.h | 11 ++-- tools/replay/util.cc | 36 ++++------- tools/replay/util.h | 2 +- 16 files changed, 135 insertions(+), 123 deletions(-) diff --git a/tools/cabana/chart/chart.cc b/tools/cabana/chart/chart.cc index c8537845b164ff..7a056c7674527b 100644 --- a/tools/cabana/chart/chart.cc +++ b/tools/cabana/chart/chart.cc @@ -513,7 +513,7 @@ void ChartView::mouseReleaseEvent(QMouseEvent *event) { // no rubber dragged, seek to mouse position can->seekTo(min); } else if (rubber->width() > 10 && (max - min) > MIN_ZOOM_SECONDS) { - charts_widget->zoom_undo_stack->push(new ZoomCommand(charts_widget, {min, max})); + charts_widget->zoom_undo_stack->push(new ZoomCommand({min, max})); } else { viewport()->update(); } diff --git a/tools/cabana/chart/chartswidget.cc b/tools/cabana/chart/chartswidget.cc index a7bdd746465ec9..59c188afbcdad8 100644 --- a/tools/cabana/chart/chartswidget.cc +++ b/tools/cabana/chart/chartswidget.cc @@ -103,6 +103,8 @@ ChartsWidget::ChartsWidget(QWidget *parent) : QFrame(parent) { QObject::connect(dbc(), &DBCManager::DBCFileChanged, this, &ChartsWidget::removeAll); QObject::connect(can, &AbstractStream::eventsMerged, this, &ChartsWidget::eventsMerged); QObject::connect(can, &AbstractStream::msgsReceived, this, &ChartsWidget::updateState); + QObject::connect(can, &AbstractStream::seeking, this, &ChartsWidget::updateState); + QObject::connect(can, &AbstractStream::timeRangeChanged, this, &ChartsWidget::timeRangeChanged); QObject::connect(range_slider, &QSlider::valueChanged, this, &ChartsWidget::setMaxChartRange); QObject::connect(new_plot_btn, &QToolButton::clicked, this, &ChartsWidget::newChart); QObject::connect(remove_all_btn, &QToolButton::clicked, this, &ChartsWidget::removeAll); @@ -159,16 +161,13 @@ void ChartsWidget::eventsMerged(const MessageEventsMap &new_events) { } } -void ChartsWidget::setZoom(double min, double max) { - zoomed_range = {min, max}; - is_zoomed = zoomed_range != display_range; +void ChartsWidget::timeRangeChanged(const std::optional> &time_range) { updateToolBar(); updateState(); - emit rangeChanged(min, max, is_zoomed); } void ChartsWidget::zoomReset() { - setZoom(display_range.first, display_range.second); + can->setTimeRange(std::nullopt); zoom_undo_stack->clear(); } @@ -186,8 +185,9 @@ void ChartsWidget::showValueTip(double sec) { void ChartsWidget::updateState() { if (charts.isEmpty()) return; + const auto &time_range = can->timeRange(); const double cur_sec = can->currentSec(); - if (!is_zoomed) { + if (!time_range.has_value()) { double pos = (cur_sec - display_range.first) / std::max(1.0, max_chart_range); if (pos < 0 || pos > 0.8) { display_range.first = std::max(0.0, cur_sec - max_chart_range * 0.1); @@ -195,13 +195,9 @@ void ChartsWidget::updateState() { double max_sec = std::min(display_range.first + max_chart_range, can->totalSeconds()); display_range.first = std::max(0.0, max_sec - max_chart_range); display_range.second = display_range.first + max_chart_range; - } else if (cur_sec < (zoomed_range.first - 0.1) || cur_sec >= zoomed_range.second) { - // loop in zoomed range - QTimer::singleShot(0, [ts = zoomed_range.first]() { can->seekTo(ts);}); - return; } - const auto &range = is_zoomed ? zoomed_range : display_range; + const auto &range = time_range ? *time_range : display_range; for (auto c : charts) { c->updatePlot(cur_sec, range.first, range.second); } @@ -217,12 +213,14 @@ void ChartsWidget::updateToolBar() { title_label->setText(tr("Charts: %1").arg(charts.size())); columns_action->setText(tr("Column: %1").arg(column_count)); range_lb->setText(utils::formatSeconds(max_chart_range)); + + bool is_zoomed = can->timeRange().has_value(); range_lb_action->setVisible(!is_zoomed); range_slider_action->setVisible(!is_zoomed); undo_zoom_action->setVisible(is_zoomed); redo_zoom_action->setVisible(is_zoomed); reset_zoom_action->setVisible(is_zoomed); - reset_zoom_btn->setText(is_zoomed ? tr("%1-%2").arg(zoomed_range.first, 0, 'f', 2).arg(zoomed_range.second, 0, 'f', 2) : ""); + reset_zoom_btn->setText(is_zoomed ? tr("%1-%2").arg(can->timeRange()->first, 0, 'f', 2).arg(can->timeRange()->second, 0, 'f', 2) : ""); remove_all_btn->setEnabled(!charts.isEmpty()); dock_btn->setIcon(docking ? "arrow-up-right-square" : "arrow-down-left-square"); dock_btn->setToolTip(docking ? tr("Undock charts") : tr("Dock charts")); @@ -252,7 +250,7 @@ ChartView *ChartsWidget::findChart(const MessageId &id, const cabana::Signal *si } ChartView *ChartsWidget::createChart() { - auto chart = new ChartView(is_zoomed ? zoomed_range : display_range, this); + auto chart = new ChartView(can->timeRange().value_or(display_range), this); chart->setFixedHeight(settings.chart_height); chart->setMinimumWidth(CHART_MIN_WIDTH); chart->setSizePolicy(QSizePolicy::MinimumExpanding, QSizePolicy::Fixed); diff --git a/tools/cabana/chart/chartswidget.h b/tools/cabana/chart/chartswidget.h index a39b4d73a59bf6..a9ac05db23ec29 100644 --- a/tools/cabana/chart/chartswidget.h +++ b/tools/cabana/chart/chartswidget.h @@ -46,11 +46,10 @@ class ChartsWidget : public QFrame { public slots: void setColumnCount(int n); void removeAll(); - void setZoom(double min, double max); + void timeRangeChanged(const std::optional> &time_range); signals: void dock(bool floating); - void rangeChanged(double min, double max, bool is_zommed); void seriesChanged(); private: @@ -102,9 +101,7 @@ public slots: ChartsContainer *charts_container; QScrollArea *charts_scroll; uint32_t max_chart_range = 0; - bool is_zoomed = false; std::pair display_range; - std::pair zoomed_range; QAction *columns_action; int column_count = 1; int current_column_count = 0; @@ -119,12 +116,11 @@ public slots: class ZoomCommand : public QUndoCommand { public: - ZoomCommand(ChartsWidget *charts, std::pair range) : charts(charts), range(range), QUndoCommand() { - prev_range = charts->is_zoomed ? charts->zoomed_range : charts->display_range; + ZoomCommand(std::pair range) : range(range), QUndoCommand() { + prev_range = can->timeRange(); setText(QObject::tr("Zoom to %1-%2").arg(range.first, 0, 'f', 2).arg(range.second, 0, 'f', 2)); } - void undo() override { charts->setZoom(prev_range.first, prev_range.second); } - void redo() override { charts->setZoom(range.first, range.second); } - ChartsWidget *charts; - std::pair prev_range, range; + void undo() override { can->setTimeRange(prev_range); } + void redo() override { can->setTimeRange(range); } + std::optional> prev_range, range; }; diff --git a/tools/cabana/mainwin.cc b/tools/cabana/mainwin.cc index e6c9b49ca17a0c..d099fcec9955b3 100644 --- a/tools/cabana/mainwin.cc +++ b/tools/cabana/mainwin.cc @@ -192,7 +192,6 @@ void MainWindow::createDockWidgets() { video_splitter = new QSplitter(Qt::Vertical, this); video_widget = new VideoWidget(this); video_splitter->addWidget(video_widget); - QObject::connect(charts_widget, &ChartsWidget::rangeChanged, video_widget, &VideoWidget::updateTimeRange); video_splitter->addWidget(charts_container); video_splitter->setStretchFactor(1, 1); diff --git a/tools/cabana/streams/abstractstream.cc b/tools/cabana/streams/abstractstream.cc index 2e6ca9662e237a..2584106ce4848b 100644 --- a/tools/cabana/streams/abstractstream.cc +++ b/tools/cabana/streams/abstractstream.cc @@ -92,13 +92,23 @@ void AbstractStream::updateLastMessages() { std::set msgs; { std::lock_guard lk(mutex_); + double max_sec = 0; for (const auto &id : new_msgs_) { const auto &can_data = messages_[id]; - current_sec_ = std::max(current_sec_, can_data.ts); + max_sec = std::max(max_sec, can_data.ts); last_msgs[id] = can_data; sources.insert(id.source); } - msgs = std::move(new_msgs_); + + if (!new_msgs_.empty()) { + msgs = std::move(new_msgs_); + current_sec_ = max_sec; + } + } + + if (time_range_ && (current_sec_ < time_range_->first || current_sec_ >= time_range_->second)) { + seekTo(time_range_->first); + return; } if (sources.size() != prev_src_size) { @@ -108,6 +118,14 @@ void AbstractStream::updateLastMessages() { emit msgsReceived(&msgs, prev_msg_size != last_msgs.size()); } +void AbstractStream::setTimeRange(const std::optional> &range) { + time_range_ = range; + if (time_range_ && (current_sec_ < time_range_->first || current_sec_ >= time_range_->second)) { + seekTo(time_range_->first); + } + emit timeRangeChanged(time_range_); +} + void AbstractStream::updateEvent(const MessageId &id, double sec, const uint8_t *data, uint8_t size) { std::lock_guard lk(mutex_); messages_[id].compute(id, data, size, sec, getSpeed(), masks_[id]); diff --git a/tools/cabana/streams/abstractstream.h b/tools/cabana/streams/abstractstream.h index cfd423b36819fe..822dd03d8478b3 100644 --- a/tools/cabana/streams/abstractstream.h +++ b/tools/cabana/streams/abstractstream.h @@ -3,8 +3,10 @@ #include #include #include +#include #include #include +#include #include #include @@ -77,6 +79,8 @@ class AbstractStream : public QObject { virtual double getSpeed() { return 1; } virtual bool isPaused() const { return false; } virtual void pause(bool pause) {} + void setTimeRange(const std::optional> &range); + const std::optional> &timeRange() const { return time_range_; } inline const std::unordered_map &lastMessages() const { return last_msgs; } inline const MessageEventsMap &eventsMap() const { return events_; } @@ -91,8 +95,9 @@ class AbstractStream : public QObject { signals: void paused(); void resume(); - void seekingTo(double sec); + void seeking(double sec); void seekedTo(double sec); + void timeRangeChanged(const std::optional> &range); void streamStarted(); void eventsMerged(const MessageEventsMap &events_map); void msgsReceived(const std::set *new_msgs, bool has_new_ids); @@ -110,6 +115,7 @@ class AbstractStream : public QObject { std::vector all_events_; double current_sec_ = 0; + std::optional> time_range_; uint64_t lastest_event_ts = 0; private: diff --git a/tools/cabana/streams/replaystream.cc b/tools/cabana/streams/replaystream.cc index 5fda6b04870f6c..5d4925a67ac84e 100644 --- a/tools/cabana/streams/replaystream.cc +++ b/tools/cabana/streams/replaystream.cc @@ -53,9 +53,9 @@ bool ReplayStream::loadRoute(const QString &route, const QString &data_dir, uint {}, nullptr, replay_flags, data_dir, this)); replay->setSegmentCacheLimit(settings.max_cached_minutes); replay->installEventFilter(event_filter, this); + QObject::connect(replay.get(), &Replay::seeking, this, &AbstractStream::seeking); QObject::connect(replay.get(), &Replay::seekedTo, this, &AbstractStream::seekedTo); QObject::connect(replay.get(), &Replay::segmentsMerged, this, &ReplayStream::mergeSegments); - QObject::connect(replay.get(), &Replay::qLogLoaded, this, &ReplayStream::qLogLoaded, Qt::QueuedConnection); return replay->load(); } @@ -92,12 +92,7 @@ bool ReplayStream::eventFilter(const Event *event) { } void ReplayStream::seekTo(double ts) { - // Update timestamp and notify receivers of the time change. current_sec_ = ts; - std::set new_msgs; - msgsReceived(&new_msgs, false); - - // Seek to the specified timestamp replay->seekTo(std::max(double(0), ts), false); } diff --git a/tools/cabana/streams/replaystream.h b/tools/cabana/streams/replaystream.h index 049ccddafb7756..ced78680d1e155 100644 --- a/tools/cabana/streams/replaystream.h +++ b/tools/cabana/streams/replaystream.h @@ -34,9 +34,6 @@ class ReplayStream : public AbstractStream { void pause(bool pause) override; static AbstractOpenStreamWidget *widget(AbstractStream **stream); -signals: - void qLogLoaded(int segnum, std::shared_ptr qlog); - private: void mergeSegments(); std::unique_ptr replay = nullptr; diff --git a/tools/cabana/videowidget.cc b/tools/cabana/videowidget.cc index 7fca45c3936dea..d2f78babc19060 100644 --- a/tools/cabana/videowidget.cc +++ b/tools/cabana/videowidget.cc @@ -38,6 +38,8 @@ VideoWidget::VideoWidget(QWidget *parent) : QFrame(parent) { QObject::connect(can, &AbstractStream::paused, this, &VideoWidget::updatePlayBtnState); QObject::connect(can, &AbstractStream::resume, this, &VideoWidget::updatePlayBtnState); QObject::connect(can, &AbstractStream::msgsReceived, this, &VideoWidget::updateState); + QObject::connect(can, &AbstractStream::seeking, this, &VideoWidget::updateState); + QObject::connect(can, &AbstractStream::timeRangeChanged, this, &VideoWidget::timeRangeChanged); updatePlayBtnState(); setWhatsThis(tr(R"( @@ -150,14 +152,16 @@ QWidget *VideoWidget::createCameraWidget() { setMaximumTime(can->totalSeconds()); QObject::connect(slider, &QSlider::sliderReleased, [this]() { can->seekTo(slider->currentSecond()); }); - QObject::connect(slider, &Slider::updateMaximumTime, this, &VideoWidget::setMaximumTime, Qt::QueuedConnection); QObject::connect(can, &AbstractStream::eventsMerged, this, [this]() { slider->update(); }); - QObject::connect(static_cast(can), &ReplayStream::qLogLoaded, slider, &Slider::parseQLog); QObject::connect(cam_widget, &CameraWidget::clicked, []() { can->pause(!can->isPaused()); }); QObject::connect(cam_widget, &CameraWidget::vipcAvailableStreamsUpdated, this, &VideoWidget::vipcAvailableStreamsUpdated); QObject::connect(camera_tab, &QTabBar::currentChanged, [this](int index) { if (index != -1) cam_widget->setStreamType((VisionStreamType)camera_tab->tabData(index).toInt()); }); + + auto replay = static_cast(can)->getReplay(); + QObject::connect(replay, &Replay::qLogLoaded, slider, &Slider::parseQLog, Qt::QueuedConnection); + QObject::connect(replay, &Replay::totalSecondsUpdated, this, &VideoWidget::setMaximumTime, Qt::QueuedConnection); return w; } @@ -198,13 +202,13 @@ void VideoWidget::setMaximumTime(double sec) { slider->setTimeRange(0, sec); } -void VideoWidget::updateTimeRange(double min, double max, bool is_zoomed) { +void VideoWidget::timeRangeChanged(const std::optional> &time_range) { if (can->liveStreaming()) { - skip_to_end_btn->setEnabled(!is_zoomed); + skip_to_end_btn->setEnabled(!time_range.has_value()); return; } - is_zoomed ? slider->setTimeRange(min, max) - : slider->setTimeRange(0, maximum_time); + time_range ? slider->setTimeRange(time_range->first, time_range->second) + : slider->setTimeRange(0, maximum_time); } QString VideoWidget::formatTime(double sec, bool include_milliseconds) { @@ -255,12 +259,7 @@ void Slider::setTimeRange(double min, double max) { setRange(min * factor, max * factor); } -void Slider::parseQLog(int segnum, std::shared_ptr qlog) { - const auto &segments = qobject_cast(can)->route()->segments(); - if (segments.size() > 0 && segnum == segments.rbegin()->first && !qlog->events.empty()) { - emit updateMaximumTime(qlog->events.back().mono_time / 1e9 - can->routeStartTime()); - } - +void Slider::parseQLog(std::shared_ptr qlog) { std::mutex mutex; QtConcurrent::blockingMap(qlog->events.cbegin(), qlog->events.cend(), [&mutex, this](const Event &e) { if (e.which == cereal::Event::Which::THUMBNAIL) { diff --git a/tools/cabana/videowidget.h b/tools/cabana/videowidget.h index b2039e09a46324..7bd55cbce4103d 100644 --- a/tools/cabana/videowidget.h +++ b/tools/cabana/videowidget.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -40,13 +41,10 @@ class Slider : public QSlider { void setTimeRange(double min, double max); AlertInfo alertInfo(double sec); QPixmap thumbnail(double sec); - void parseQLog(int segnum, std::shared_ptr qlog); + void parseQLog(std::shared_ptr qlog); const double factor = 1000.0; -signals: - void updateMaximumTime(double); - private: void mousePressEvent(QMouseEvent *e) override; void mouseMoveEvent(QMouseEvent *e) override; @@ -63,11 +61,11 @@ class VideoWidget : public QFrame { public: VideoWidget(QWidget *parnet = nullptr); - void updateTimeRange(double min, double max, bool is_zommed); void setMaximumTime(double sec); protected: QString formatTime(double sec, bool include_milliseconds = false); + void timeRangeChanged(const std::optional> &time_range); void updateState(); void updatePlayBtnState(); QWidget *createCameraWidget(); diff --git a/tools/replay/camera.cc b/tools/replay/camera.cc index 9e711149c5eff4..b6d4ddc3a29c90 100644 --- a/tools/replay/camera.cc +++ b/tools/replay/camera.cc @@ -60,7 +60,6 @@ void CameraServer::cameraThread(Camera &cam) { capnp::FlatArrayMessageReader reader(event->data); auto evt = reader.getRoot(); auto eidx = capnp::AnyStruct::Reader(evt).getPointerSection()[0].getAs(); - if (eidx.getType() != cereal::EncodeIndex::Type::FULL_H_E_V_C) continue; int segment_id = eidx.getSegmentId(); uint32_t frame_id = eidx.getFrameId(); diff --git a/tools/replay/logreader.cc b/tools/replay/logreader.cc index 0f1638145fd424..9b726e067d8515 100644 --- a/tools/replay/logreader.cc +++ b/tools/replay/logreader.cc @@ -42,10 +42,10 @@ bool LogReader::load(const char *data, size_t size, std::atomic *abort) { evt.which == cereal::Event::DRIVER_ENCODE_IDX || evt.which == cereal::Event::WIDE_ROAD_ENCODE_IDX) { auto idx = capnp::AnyStruct::Reader(event).getPointerSection()[0].getAs(); - if (uint64_t sof = idx.getTimestampSof()) { - mono_time = sof; + if (idx.getType() == cereal::EncodeIndex::Type::FULL_H_E_V_C) { + uint64_t sof = idx.getTimestampSof(); + events.emplace_back(which, sof ? sof : mono_time, event_data, idx.getSegmentNum()); } - events.emplace_back(which, mono_time, event_data, idx.getSegmentNum()); } } } catch (const kj::Exception &e) { diff --git a/tools/replay/replay.cc b/tools/replay/replay.cc index 8616437188afa0..c5ad2aa5488b9b 100644 --- a/tools/replay/replay.cc +++ b/tools/replay/replay.cc @@ -55,11 +55,11 @@ void Replay::stop() { rInfo("shutdown: in progress..."); if (stream_thread_ != nullptr) { exit_ = true; - paused_ = true; + pauseStreamThread(); stream_cv_.notify_one(); stream_thread_->quit(); stream_thread_->wait(); - delete stream_thread_; + stream_thread_->deleteLater(); stream_thread_ = nullptr; } timeline_future.waitForFinished(); @@ -104,28 +104,40 @@ void Replay::updateEvents(const std::function &update_events_function) { void Replay::seekTo(double seconds, bool relative) { updateEvents([&]() { - seeking_to_seconds_ = relative ? seconds + currentSeconds() : seconds; - seeking_to_seconds_ = std::max(double(0.0), seeking_to_seconds_); - int target_segment = (int)seeking_to_seconds_ / 60; + double target_time = relative ? seconds + currentSeconds() : seconds; + target_time = std::max(double(0.0), target_time); + int target_segment = (int)target_time / 60; if (segments_.count(target_segment) == 0) { - rWarning("can't seek to %d s segment %d is invalid", (int)seeking_to_seconds_, target_segment); + rWarning("Can't seek to %d s segment %d is invalid", (int)target_time, target_segment); return true; } - rInfo("seeking to %d s, segment %d", (int)seeking_to_seconds_, target_segment); + rInfo("Seeking to %d s, segment %d", (int)target_time, target_segment); current_segment_ = target_segment; - cur_mono_time_ = route_start_ts_ + seeking_to_seconds_ * 1e9; - bool segment_merged = isSegmentMerged(target_segment); - if (segment_merged) { - emit seekedTo(seeking_to_seconds_); - // Reset seeking_to_seconds_ to indicate completion of seek - seeking_to_seconds_ = -1; - } - return segment_merged; + cur_mono_time_ = route_start_ts_ + target_time * 1e9; + seeking_to_ = target_time; + return false; }); + + checkSeekProgress(); updateSegmentsCache(); } +void Replay::checkSeekProgress() { + if (seeking_to_) { + auto it = segments_.find(int(*seeking_to_ / 60)); + if (it != segments_.end() && it->second && it->second->isLoaded()) { + emit seekedTo(*seeking_to_); + seeking_to_ = std::nullopt; + // wake up stream thread + updateEvents([]() { return true; }); + } else { + // Emit signal indicating the ongoing seek operation + emit seeking(*seeking_to_); + } + } +} + void Replay::seekToFlag(FindFlag flag) { if (auto next = find(flag)) { seekTo(*next - 2, false); // seek to 2 seconds before next @@ -150,8 +162,9 @@ void Replay::buildTimeline() { const auto &route_segments = route_->segments(); for (auto it = route_segments.cbegin(); it != route_segments.cend() && !exit_; ++it) { std::shared_ptr log(new LogReader()); - if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3)) continue; + if (!log->load(it->second.qlog.toStdString(), &exit_, !hasFlag(REPLAY_FLAG_NO_FILE_CACHE), 0, 3) || log->events.empty()) continue; + std::vector> timeline; for (const Event &e : log->events) { if (e.which == cereal::Event::Which::CONTROLS_STATE) { capnp::FlatArrayMessageReader reader(e.data); @@ -160,7 +173,6 @@ void Replay::buildTimeline() { if (engaged != cs.getEnabled()) { if (engaged) { - std::lock_guard lk(timeline_lock); timeline.push_back({toSeconds(engaged_begin), toSeconds(e.mono_time), TimelineType::Engaged}); } engaged_begin = e.mono_time; @@ -169,7 +181,6 @@ void Replay::buildTimeline() { if (alert_type != cs.getAlertType().cStr() || alert_status != cs.getAlertStatus()) { if (!alert_type.empty() && alert_size != cereal::ControlsState::AlertSize::NONE) { - std::lock_guard lk(timeline_lock); timeline.push_back({toSeconds(alert_begin), toSeconds(e.mono_time), timeline_types[(int)alert_status]}); } alert_begin = e.mono_time; @@ -178,12 +189,20 @@ void Replay::buildTimeline() { alert_status = cs.getAlertStatus(); } } else if (e.which == cereal::Event::Which::USER_FLAG) { - std::lock_guard lk(timeline_lock); timeline.push_back({toSeconds(e.mono_time), toSeconds(e.mono_time), TimelineType::UserFlag}); } } - std::sort(timeline.begin(), timeline.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); }); - emit qLogLoaded(it->first, log); + + { + std::lock_guard lk(timeline_lock); + timeline_.insert(timeline_.end(), timeline.begin(), timeline.end()); + std::sort(timeline_.begin(), timeline_.end(), [](auto &l, auto &r) { return std::get<2>(l) < std::get<2>(r); }); + } + + if (it->first == route_segments.rbegin()->first) { + emit totalSecondsUpdated(toSeconds(log->events.back().mono_time)); + } + emit qLogLoaded(log); } } @@ -260,14 +279,13 @@ void Replay::updateSegmentsCache() { const auto &cur_segment = cur->second; if (stream_thread_ == nullptr && cur_segment->isLoaded()) { startStream(cur_segment.get()); - emit streamStarted(); } } void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator cur, SegmentMap::iterator end) { - auto loadNext = [this](auto begin, auto end) { - auto it = std::find_if(begin, end, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); }); - if (it != end && !it->second) { + auto loadNextSegment = [this](auto first, auto last) { + auto it = std::find_if(first, last, [](const auto &seg_it) { return !seg_it.second || !seg_it.second->isLoaded(); }); + if (it != last && !it->second) { rDebug("loading segment %d...", it->first); it->second = std::make_unique(it->first, route_->at(it->first), flags_, filters_); QObject::connect(it->second.get(), &Segment::loadFinished, this, &Replay::segmentLoadFinished); @@ -276,9 +294,9 @@ void Replay::loadSegmentInRange(SegmentMap::iterator begin, SegmentMap::iterator return false; }; - // Load forward segments, then try reverse - if (!loadNext(cur, end)) { - loadNext(std::make_reverse_iterator(cur), segments_.rend()); + // Try loading forward segments, then reverse segments + if (!loadNextSegment(cur, end)) { + loadNextSegment(std::make_reverse_iterator(cur), std::make_reverse_iterator(begin)); } } @@ -316,15 +334,10 @@ void Replay::mergeSegments(const SegmentMap::iterator &begin, const SegmentMap:: updateEvents([&]() { events_.swap(new_events); merged_segments_ = segments_to_merge; - // Check if seeking is in progress - int target_segment = int(seeking_to_seconds_ / 60); - if (seeking_to_seconds_ >= 0 && segments_to_merge.count(target_segment) > 0) { - emit seekedTo(seeking_to_seconds_); - seeking_to_seconds_ = -1; // Reset seeking_to_seconds_ to indicate completion of seek - } // Wake up the stream thread if the current segment is loaded or invalid. - return isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0); + return !seeking_to_ && (isSegmentMerged(current_segment_) || (segments_.count(current_segment_) == 0)); }); + checkSeekProgress(); } void Replay::startStream(const Segment *cur_segment) { @@ -379,6 +392,7 @@ void Replay::startStream(const Segment *cur_segment) { stream_thread_->start(); timeline_future = QtConcurrent::run(this, &Replay::buildTimeline); + emit streamStarted(); } void Replay::publishMessage(const Event *e) { @@ -473,6 +487,7 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con // Skip events if socket is not present if (!sockets_[evt.which]) continue; + cur_mono_time_ = evt.mono_time; const uint64_t current_nanos = nanos_since_boot(); const int64_t time_diff = (evt.mono_time - evt_start_ts) / speed_ - (current_nanos - loop_start_ts); @@ -484,12 +499,11 @@ std::vector::const_iterator Replay::publishEvents(std::vector::con loop_start_ts = current_nanos; prev_replay_speed = speed_; } else if (time_diff > 0) { - precise_nano_sleep(time_diff); + precise_nano_sleep(time_diff, paused_); } if (paused_) break; - cur_mono_time_ = evt.mono_time; if (evt.eidx_segnum == -1) { publishMessage(&evt); } else if (camera_server_) { diff --git a/tools/replay/replay.h b/tools/replay/replay.h index 4adbc14df8558d..8b51ab054d06db 100644 --- a/tools/replay/replay.h +++ b/tools/replay/replay.h @@ -85,14 +85,16 @@ class Replay : public QObject { inline const std::string &carFingerprint() const { return car_fingerprint_; } inline const std::vector> getTimeline() { std::lock_guard lk(timeline_lock); - return timeline; + return timeline_; } signals: void streamStarted(); void segmentsMerged(); + void seeking(double sec); void seekedTo(double sec); - void qLogLoaded(int segnum, std::shared_ptr qlog); + void qLogLoaded(std::shared_ptr qlog); + void totalSecondsUpdated(double sec); protected slots: void segmentLoadFinished(bool success); @@ -112,6 +114,7 @@ protected slots: void publishMessage(const Event *e); void publishFrame(const Event *e); void buildTimeline(); + void checkSeekProgress(); inline bool isSegmentMerged(int n) const { return merged_segments_.count(n) > 0; } pthread_t stream_thread_id = 0; @@ -120,7 +123,7 @@ protected slots: bool user_paused_ = false; std::condition_variable stream_cv_; std::atomic current_segment_ = 0; - double seeking_to_seconds_ = -1; + std::optional seeking_to_; SegmentMap segments_; // the following variables must be protected with stream_lock_ std::atomic exit_ = false; @@ -143,7 +146,7 @@ protected slots: std::mutex timeline_lock; QFuture timeline_future; - std::vector> timeline; + std::vector> timeline_; std::string car_fingerprint_; std::atomic speed_ = 1.0; replayEventFilter event_filter = nullptr; diff --git a/tools/replay/util.cc b/tools/replay/util.cc index c8203fd79d815a..a08b3b3d5eab72 100644 --- a/tools/replay/util.cc +++ b/tools/replay/util.cc @@ -318,33 +318,23 @@ std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic return {}; } -void precise_nano_sleep(int64_t nanoseconds) { -#ifdef __APPLE__ - const long estimate_ns = 1 * 1e6; // 1ms - struct timespec req = {.tv_nsec = estimate_ns}; - uint64_t start_sleep = nanos_since_boot(); - while (nanoseconds > estimate_ns) { - nanosleep(&req, nullptr); - uint64_t end_sleep = nanos_since_boot(); - nanoseconds -= (end_sleep - start_sleep); - start_sleep = end_sleep; - } - // spin wait - if (nanoseconds > 0) { - while ((nanos_since_boot() - start_sleep) <= nanoseconds) { - std::this_thread::yield(); - } - } -#else +void precise_nano_sleep(int64_t nanoseconds, std::atomic &should_exit) { struct timespec req, rem; - - req.tv_sec = nanoseconds / 1e9; - req.tv_nsec = nanoseconds % (int64_t)1e9; - while (clock_nanosleep(CLOCK_MONOTONIC, 0, &req, &rem) && errno == EINTR) { + req.tv_sec = nanoseconds / 1000000000; + req.tv_nsec = nanoseconds % 1000000000; + while (!should_exit) { +#ifdef __APPLE_ + int ret = nanosleep(&req, &rem); + if (ret == 0 || errno != EINTR) + break; +#else + int ret = clock_nanosleep(CLOCK_MONOTONIC, 0, &req, &rem); + if (ret == 0 || ret != EINTR) + break; +#endif // Retry sleep if interrupted by a signal req = rem; } -#endif } std::string sha256(const std::string &str) { diff --git a/tools/replay/util.h b/tools/replay/util.h index 750c13355520aa..97516e18147600 100644 --- a/tools/replay/util.h +++ b/tools/replay/util.h @@ -37,7 +37,7 @@ class MonotonicBuffer { }; std::string sha256(const std::string &str); -void precise_nano_sleep(int64_t nanoseconds); +void precise_nano_sleep(int64_t nanoseconds, std::atomic &should_exit); std::string decompressBZ2(const std::string &in, std::atomic *abort = nullptr); std::string decompressBZ2(const std::byte *in, size_t in_size, std::atomic *abort = nullptr); std::string getUrlWithoutQuery(const std::string &url);