From 3baba050ef5955f3d7864fa30efc083c27484c82 Mon Sep 17 00:00:00 2001 From: cobalt-github-releaser-bot <95661244+cobalt-github-releaser-bot@users.noreply.github.com> Date: Tue, 28 Nov 2023 16:54:58 -0800 Subject: [PATCH] Cherry pick PR #1965: Implement unshipped messages in message port. (#1998) Refer to the original PR: https://github.com/youtube/cobalt/pull/1965 Implement storage of unshipped message in message port, to allow a message port to receive and hold messages before the destination is ready to receive them. Use the above functionality to unblock worker.postMessage calls to no longer wait until the worker script completes loading and executing. b/310240306 b/226640425 Co-authored-by: Jelle Foks --- cobalt/web/BUILD.gn | 1 + cobalt/web/event_target.cc | 27 ++++++++ cobalt/web/event_target.h | 16 ++++- cobalt/web/message_port.cc | 104 +++++++++++++++++++++--------- cobalt/web/message_port.h | 37 ++++++++--- cobalt/web/message_port_test.cc | 45 +++++++++++++ cobalt/worker/client.cc | 10 +-- cobalt/worker/dedicated_worker.cc | 3 +- cobalt/worker/worker.cc | 30 +++------ cobalt/worker/worker.h | 15 ++--- 10 files changed, 205 insertions(+), 83 deletions(-) create mode 100644 cobalt/web/message_port_test.cc diff --git a/cobalt/web/BUILD.gn b/cobalt/web/BUILD.gn index acd599c18393..c00435d922cc 100644 --- a/cobalt/web/BUILD.gn +++ b/cobalt/web/BUILD.gn @@ -187,6 +187,7 @@ target(gtest_target_type, "web_test") { "event_target_test.cc", "event_test.cc", "message_event_test.cc", + "message_port_test.cc", "url_test.cc", "url_utils_test.cc", "window_timers_test.cc", diff --git a/cobalt/web/event_target.cc b/cobalt/web/event_target.cc index 2622b4b402fb..4d2fc0a61da2 100644 --- a/cobalt/web/event_target.cc +++ b/cobalt/web/event_target.cc @@ -282,9 +282,22 @@ void EventTarget::TraceMembers(script::Tracer* tracer) { // instead. } +void EventTarget::AddEventListenerRegistrationCallback( + void* object, base::Token token, base::OnceClosure callback) { + base::AutoLock lock(event_listener_registration_mutex_); + event_listener_registration_callbacks_[token][object] = std::move(callback); +} + +void EventTarget::RemoveEventListenerRegistrationCallbacks(void* object) { + base::AutoLock lock(event_listener_registration_mutex_); + for (auto& token : event_listener_registration_callbacks_) + token.second.erase(object); +} + void EventTarget::AddEventListenerInternal( std::unique_ptr listener_info) { DCHECK_CALLED_ON_VALID_THREAD(thread_checker_); + DCHECK(listener_info); // Remove existing attribute listener of the same type. if (listener_info->is_attribute()) { @@ -314,7 +327,21 @@ void EventTarget::AddEventListenerInternal( debugger_hooks().AsyncTaskScheduled( listener_info->task(), listener_info->type().c_str(), base::DebuggerHooks::AsyncTaskFrequency::kRecurring); + + base::Token type = listener_info->type(); event_listener_infos_.push_back(std::move(listener_info)); + + { + // Call the event listener registration callback. + base::AutoLock lock(event_listener_registration_mutex_); + auto callbacks = event_listener_registration_callbacks_.find(type); + if (callbacks != event_listener_registration_callbacks_.end()) { + for (auto& object : callbacks->second) { + std::move(object.second).Run(); + } + event_listener_registration_callbacks_.erase(type); + } + } } bool EventTarget::HasEventListener(base::Token type) { diff --git a/cobalt/web/event_target.h b/cobalt/web/event_target.h index 1663f80f0422..f61ae4191b98 100644 --- a/cobalt/web/event_target.h +++ b/cobalt/web/event_target.h @@ -15,6 +15,7 @@ #ifndef COBALT_WEB_EVENT_TARGET_H_ #define COBALT_WEB_EVENT_TARGET_H_ +#include #include #include #include @@ -536,7 +537,6 @@ class EventTarget : public script::Wrappable, web::EnvironmentSettings* environment_settings() const { return environment_settings_; } - std::set& event_listener_event_types() const { static std::set event_listener_event_types; for (auto& event_listener_info : event_listener_infos_) { @@ -545,6 +545,12 @@ class EventTarget : public script::Wrappable, return event_listener_event_types; } + // Register a callback to be called when an event listener is added for the + // given type. + void AddEventListenerRegistrationCallback(void* object, base::Token type, + base::OnceClosure callback); + void RemoveEventListenerRegistrationCallbacks(void* object); + protected: virtual ~EventTarget() { environment_settings_ = nullptr; } @@ -569,8 +575,12 @@ class EventTarget : public script::Wrappable, // the special case of window.onerror handling. bool unpack_onerror_events_; - // Thread checker ensures all calls to the EventTarget are made from the same - // thread that it is created in. + base::Lock event_listener_registration_mutex_; + std::map> + event_listener_registration_callbacks_; + + // Thread checker ensures all calls to the EventTarget are made from the + // same thread that it is created in. THREAD_CHECKER(thread_checker_); }; diff --git a/cobalt/web/message_port.cc b/cobalt/web/message_port.cc index 871874b208d7..f6de8e2c998c 100644 --- a/cobalt/web/message_port.cc +++ b/cobalt/web/message_port.cc @@ -23,7 +23,6 @@ #include "base/logging.h" #include "base/memory/ptr_util.h" #include "base/memory/ref_counted.h" -#include "base/message_loop/message_loop.h" #include "base/task_runner.h" #include "cobalt/script/environment_settings.h" #include "cobalt/web/context.h" @@ -34,31 +33,68 @@ namespace cobalt { namespace web { -MessagePort::MessagePort(web::EventTarget* event_target) - : event_target_(event_target) { - if (!event_target_) { - return; +void MessagePort::EntangleWithEventTarget(web::EventTarget* event_target) { + DCHECK(!event_target_); + { + base::AutoLock lock(mutex_); + event_target_ = event_target; + if (!event_target_) { + enabled_ = false; + return; + } } - Context* context = event_target_->environment_settings()->context(); - base::MessageLoop* message_loop = context->message_loop(); - if (!message_loop) { - return; - } - message_loop->task_runner()->PostTask( + target_task_runner()->PostTask( FROM_HERE, base::BindOnce(&Context::AddEnvironmentSettingsChangeObserver, - base::Unretained(context), base::Unretained(this))); + base::Unretained(context()), base::Unretained(this))); remove_environment_settings_change_observer_ = base::BindOnce(&Context::RemoveEnvironmentSettingsChangeObserver, - base::Unretained(context), base::Unretained(this)); + base::Unretained(context()), base::Unretained(this)); + + target_task_runner()->PostTask( + FROM_HERE, + base::Bind( + [](MessagePort* message_port, + web::EventTarget* + event_target) { // The first time a MessagePort object's + // onmessage IDL attribute is set, the + // port's port message queue must be enabled, as if the start() + // method had been called. + // https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#messageport + if (event_target->HasEventListener(base::Tokens::message())) { + message_port->Start(); + } else { + event_target->AddEventListenerRegistrationCallback( + message_port, base::Tokens::message(), + base::BindOnce(&MessagePort::Start, + base::Unretained(message_port))); + } + }, + base::Unretained(this), base::Unretained(event_target))); } -MessagePort::~MessagePort() { Close(); } +void MessagePort::Start() { + // The start() method steps are to enable this's port message queue, if it is + // not already enabled. + // https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#dom-messageport-start + base::AutoLock lock(mutex_); + if (!event_target_) { + return; + } + enabled_ = true; + for (auto& message : unshipped_messages_) { + PostMessageSerializedLocked(std::move(message)); + } + unshipped_messages_.clear(); +} void MessagePort::Close() { + base::AutoLock lock(mutex_); + unshipped_messages_.clear(); if (!event_target_) { return; } + event_target_->RemoveEventListenerRegistrationCallbacks(this); if (remove_environment_settings_change_observer_) { std::move(remove_environment_settings_change_observer_).Run(); } @@ -66,32 +102,36 @@ void MessagePort::Close() { } void MessagePort::PostMessage(const script::ValueHandleHolder& message) { - PostMessageSerialized(std::make_unique(message)); + auto structured_clone = std::make_unique(message); + { + base::AutoLock lock(mutex_); + if (!(event_target_ && enabled_)) { + unshipped_messages_.push_back(std::move(structured_clone)); + return; + } + PostMessageSerializedLocked(std::move(structured_clone)); + } } -void MessagePort::PostMessageSerialized( +void MessagePort::PostMessageSerializedLocked( std::unique_ptr structured_clone) { - if (!event_target_ || !structured_clone) { - return; - } - // TODO: Forward the location of the origating API call to the PostTask call. - base::MessageLoop* message_loop = - event_target_->environment_settings()->context()->message_loop(); - if (!message_loop) { + if (!structured_clone || !event_target_ || !enabled_) { return; } + // TODO: Forward the location of the origating API call to the PostTask + // call. // https://html.spec.whatwg.org/multipage/workers.html#handler-worker-onmessage // TODO: Update MessageEvent to support more types. (b/227665847) // TODO: Remove dependency of MessageEvent on net iobuffer (b/227665847) - message_loop->task_runner()->PostTask( - FROM_HERE, base::BindOnce(&MessagePort::DispatchMessage, AsWeakPtr(), - std::move(structured_clone))); -} - -void MessagePort::DispatchMessage( - std::unique_ptr structured_clone) { - event_target_->DispatchEvent(new web::MessageEvent( - base::Tokens::message(), std::move(structured_clone))); + target_task_runner()->PostTask( + FROM_HERE, + base::BindOnce( + [](base::WeakPtr event_target, + std::unique_ptr structured_clone) { + event_target->DispatchEvent(new web::MessageEvent( + base::Tokens::message(), std::move(structured_clone))); + }, + event_target_->AsWeakPtr(), std::move(structured_clone))); } } // namespace web diff --git a/cobalt/web/message_port.h b/cobalt/web/message_port.h index cee51093e722..71ef61556a22 100644 --- a/cobalt/web/message_port.h +++ b/cobalt/web/message_port.h @@ -17,6 +17,8 @@ #include #include +#include +#include #include "base/callback_forward.h" #include "base/memory/weak_ptr.h" @@ -34,14 +36,16 @@ namespace cobalt { namespace web { class MessagePort : public script::Wrappable, - public base::SupportsWeakPtr, public Context::EnvironmentSettingsChangeObserver { public: - explicit MessagePort(web::EventTarget* event_target); - ~MessagePort(); + MessagePort() = default; + ~MessagePort() { Close(); } + MessagePort(const MessagePort&) = delete; MessagePort& operator=(const MessagePort&) = delete; + void EntangleWithEventTarget(web::EventTarget* event_target); + void OnEnvironmentSettingsChanged(bool context_valid) override { if (!context_valid) { Close(); @@ -53,10 +57,8 @@ class MessagePort : public script::Wrappable, // -> void PostMessage(const script::ValueHandleHolder& message, // script::Sequence transfer) {} void PostMessage(const script::ValueHandleHolder& message); - void PostMessageSerialized( - std::unique_ptr structured_clone); - void Start() {} + void Start(); void Close(); const web::EventTargetListenerInfo::EventListenerScriptValue* onmessage() @@ -91,15 +93,32 @@ class MessagePort : public script::Wrappable, event_listener); } - web::EventTarget* event_target() { return event_target_; } + EventTarget* event_target() const { return event_target_; } + Context* context() const { + return event_target_ ? event_target_->environment_settings()->context() + : nullptr; + } + base::TaskRunner* target_task_runner() const { + return event_target_ ? event_target_->environment_settings() + ->context() + ->message_loop() + ->task_runner() + : nullptr; + } DEFINE_WRAPPABLE_TYPE(MessagePort); private: - void DispatchMessage( + void PostMessageSerializedLocked( std::unique_ptr structured_clone); - // The event target to dispatch events to. + base::Lock mutex_; + std::vector> unshipped_messages_; + + // A port message queue can be enabled or disabled, and is initially disabled. + // https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#message-ports + bool enabled_ = false; + web::EventTarget* event_target_ = nullptr; base::OnceClosure remove_environment_settings_change_observer_; }; diff --git a/cobalt/web/message_port_test.cc b/cobalt/web/message_port_test.cc new file mode 100644 index 000000000000..140c2919d053 --- /dev/null +++ b/cobalt/web/message_port_test.cc @@ -0,0 +1,45 @@ +// Copyright 2023 The Cobalt Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "cobalt/web/message_port.h" + +#include "base/strings/string_util.h" +#include "cobalt/web/testing/test_with_javascript.h" + + +#define EXPECT_SUBSTRING(needle, haystack) \ + EXPECT_PRED_FORMAT2(::testing::IsSubstring, (needle), (haystack)) + +namespace cobalt { +namespace web { + +namespace { +class MessagePortTestWithJavaScript : public testing::TestWebWithJavaScript {}; +} // namespace + +TEST_P(MessagePortTestWithJavaScript, MessagePortIsNotConstructible) { + std::string result; + EXPECT_FALSE(EvaluateScript("var event = new MessagePort();", &result)) + << "Failed to evaluate script."; + EXPECT_SUBSTRING("TypeError: MessagePort is not constructible", result) + << result; +} + +INSTANTIATE_TEST_CASE_P( + MessagePortTestsWithJavaScript, MessagePortTestWithJavaScript, + ::testing::ValuesIn(testing::TestWebWithJavaScript::GetWebTypes()), + testing::TestWebWithJavaScript::GetTypeName); + +} // namespace web +} // namespace cobalt diff --git a/cobalt/worker/client.cc b/cobalt/worker/client.cc index 7ca45bec2711..707958867994 100644 --- a/cobalt/worker/client.cc +++ b/cobalt/worker/client.cc @@ -22,12 +22,12 @@ namespace cobalt { namespace worker { -Client::Client(web::EnvironmentSettings* client) - : MessagePort(client->context() - ->GetWindowOrWorkerGlobalScope() - ->navigator_base() - ->service_worker()) { +Client::Client(web::EnvironmentSettings* client) { DCHECK(client); + EntangleWithEventTarget(client->context() + ->GetWindowOrWorkerGlobalScope() + ->navigator_base() + ->service_worker()); // Algorithm for Create Client: // https://www.w3.org/TR/2022/CRD-service-workers-20220712/#create-client // 1. Let clientObject be a new Client object. diff --git a/cobalt/worker/dedicated_worker.cc b/cobalt/worker/dedicated_worker.cc index 783cead9bef5..af3e62b5614c 100644 --- a/cobalt/worker/dedicated_worker.cc +++ b/cobalt/worker/dedicated_worker.cc @@ -85,7 +85,8 @@ void DedicatedWorker::Initialize(script::ExceptionState* exception_state) { // 6. Let worker be a new Worker object. // 7. Let outside port be a new MessagePort in outside settings's Realm. // 8. Associate the outside port with worker. - outside_port_ = new web::MessagePort(this); + outside_port_ = new web::MessagePort(); + outside_port_->EntangleWithEventTarget(this); // 9. Run this step in parallel: // 1. Run a worker given worker, worker URL, outside settings, outside // port, and options. diff --git a/cobalt/worker/worker.cc b/cobalt/worker/worker.cc index 28ee68eefaba..a126d864abac 100644 --- a/cobalt/worker/worker.cc +++ b/cobalt/worker/worker.cc @@ -19,7 +19,6 @@ #include "base/location.h" #include "base/logging.h" -#include "base/message_loop/message_loop.h" #include "base/task_runner.h" #include "base/threading/thread.h" #include "cobalt/browser/user_agent_platform_info.h" @@ -40,6 +39,7 @@ namespace cobalt { namespace worker { Worker::Worker(const char* name, const Options& options) : options_(options) { + message_port_ = new web::MessagePort(); // Algorithm for 'run a worker' // https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#run-a-worker // 1. Let is shared be true if worker is a SharedWorker object, and false @@ -71,7 +71,6 @@ void Worker::WillDestroyCurrentMessageLoop() { // Destroy members that were constructed in the worker thread. loader_.reset(); worker_global_scope_ = nullptr; - message_port_ = nullptr; content_.reset(); } @@ -265,7 +264,7 @@ void Worker::Execute(const std::string& content, // Done at step 8. // 16. Let inside port be a new MessagePort object in inside settings's Realm. // 17. Associate inside port with worker global scope. - message_port_ = new web::MessagePort(worker_global_scope_); + message_port_->EntangleWithEventTarget(worker_global_scope_); // 18. Entangle outside port and inside port. // TODO(b/226640425): Implement this when Message Ports can be entangled. // 19. Create a new WorkerLocation object and associate it with worker global @@ -280,7 +279,6 @@ void Worker::Execute(const std::string& content, // worker until such time as either the closing flag switches to true or // the worker stops being a suspendable worker. // 22. Set inside settings's execution ready flag. - execution_ready_.Signal(); // 23. If script is a classic script, then run the classic script script. // Otherwise, it is a module script; run the module script script. @@ -314,11 +312,11 @@ void Worker::Abort() { // Algorithm for 'run a worker' // https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#run-a-worker // 29. Clear the worker global scope's map of active timers. - if (worker_global_scope_ && message_loop()) { - message_loop()->task_runner()->PostBlockingTask( + if (worker_global_scope_ && task_runner()) { + task_runner()->PostTask( FROM_HERE, base::Bind( - [](WorkerGlobalScope* worker_global_scope) { - worker_global_scope->DestroyTimers(); + [](web::WindowOrWorkerGlobalScope* global_scope) { + global_scope->DestroyTimers(); }, base::Unretained(worker_global_scope_.get()))); } @@ -357,20 +355,8 @@ void Worker::Terminate() { } void Worker::PostMessage(const script::ValueHandleHolder& message) { - DCHECK(message_loop()); - auto structured_clone = std::make_unique(message); - if (base::MessageLoop::current() != message_loop()) { - // Block until the worker thread is ready to execute code to handle the - // event. - execution_ready_.Wait(); - message_loop()->task_runner()->PostTask( - FROM_HERE, base::BindOnce(&web::MessagePort::PostMessageSerialized, - message_port()->AsWeakPtr(), - std::move(structured_clone))); - } else { - DCHECK(execution_ready_.IsSignaled()); - message_port()->PostMessageSerialized(std::move(structured_clone)); - } + DCHECK(message_port_); + message_port_->PostMessage(message); } } // namespace worker diff --git a/cobalt/worker/worker.h b/cobalt/worker/worker.h index 364837a2fd01..5cce8cb6514e 100644 --- a/cobalt/worker/worker.h +++ b/cobalt/worker/worker.h @@ -24,6 +24,7 @@ #include "base/memory/scoped_refptr.h" #include "base/message_loop/message_loop_current.h" #include "base/synchronization/waitable_event.h" +#include "base/task_runner.h" #include "base/threading/thread.h" #include "cobalt/base/source_location.h" #include "cobalt/csp/content_security_policy.h" @@ -81,11 +82,9 @@ class Worker : public base::MessageLoop::DestructionObserver { void Terminate(); - web::MessagePort* message_port() const { return message_port_.get(); } - - // The message loop this object is running on. - base::MessageLoop* message_loop() const { - return web_agent_ ? web_agent_->message_loop() : nullptr; + // The task runner for this object. + base::TaskRunner* task_runner() const { + return web_agent_ ? web_agent_->message_loop()->task_runner() : nullptr; } void PostMessage(const script::ValueHandleHolder& message); @@ -138,12 +137,6 @@ class Worker : public base::MessageLoop::DestructionObserver { // Content of the script. Released after Execute is called. std::unique_ptr content_; - - // The execution ready flag. - // https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#concept-environment-execution-ready-flag - base::WaitableEvent execution_ready_ = { - base::WaitableEvent::ResetPolicy::MANUAL, - base::WaitableEvent::InitialState::NOT_SIGNALED}; }; } // namespace worker