Skip to content

Commit

Permalink
Implement unshipped messages in message port. (#1965)
Browse files Browse the repository at this point in the history
Implement storage of unshipped message in message port, to allow a
message port to receive and hold messages before the destination is
ready to receive them.

Use the above functionality to unblock worker.postMessage calls to no
longer wait until the worker script completes loading and executing.

b/310240306
b/226640425
  • Loading branch information
jellefoks committed Nov 17, 2023
1 parent b7dd62f commit fbe1ba5
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 83 deletions.
1 change: 1 addition & 0 deletions cobalt/web/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ target(gtest_target_type, "web_test") {
"event_target_test.cc",
"event_test.cc",
"message_event_test.cc",
"message_port_test.cc",
"url_test.cc",
"url_utils_test.cc",
"window_timers_test.cc",
Expand Down
27 changes: 27 additions & 0 deletions cobalt/web/event_target.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,22 @@ void EventTarget::TraceMembers(script::Tracer* tracer) {
// instead.
}

void EventTarget::AddEventListenerRegistrationCallback(
void* object, base::Token token, base::OnceClosure callback) {
base::AutoLock lock(event_listener_registration_mutex_);
event_listener_registration_callbacks_[token][object] = std::move(callback);
}

void EventTarget::RemoveEventListenerRegistrationCallbacks(void* object) {
base::AutoLock lock(event_listener_registration_mutex_);
for (auto& token : event_listener_registration_callbacks_)
token.second.erase(object);
}

void EventTarget::AddEventListenerInternal(
std::unique_ptr<EventTargetListenerInfo> listener_info) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(listener_info);

// Remove existing attribute listener of the same type.
if (listener_info->is_attribute()) {
Expand Down Expand Up @@ -314,7 +327,21 @@ void EventTarget::AddEventListenerInternal(
debugger_hooks().AsyncTaskScheduled(
listener_info->task(), listener_info->type().c_str(),
base::DebuggerHooks::AsyncTaskFrequency::kRecurring);

base::Token type = listener_info->type();
event_listener_infos_.push_back(std::move(listener_info));

{
// Call the event listener registration callback.
base::AutoLock lock(event_listener_registration_mutex_);
auto callbacks = event_listener_registration_callbacks_.find(type);
if (callbacks != event_listener_registration_callbacks_.end()) {
for (auto& object : callbacks->second) {
std::move(object.second).Run();
}
event_listener_registration_callbacks_.erase(type);
}
}
}

bool EventTarget::HasEventListener(base::Token type) {
Expand Down
16 changes: 13 additions & 3 deletions cobalt/web/event_target.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef COBALT_WEB_EVENT_TARGET_H_
#define COBALT_WEB_EVENT_TARGET_H_

#include <map>
#include <memory>
#include <set>
#include <string>
Expand Down Expand Up @@ -536,7 +537,6 @@ class EventTarget : public script::Wrappable,
web::EnvironmentSettings* environment_settings() const {
return environment_settings_;
}

std::set<base::Token>& event_listener_event_types() const {
static std::set<base::Token> event_listener_event_types;
for (auto& event_listener_info : event_listener_infos_) {
Expand All @@ -545,6 +545,12 @@ class EventTarget : public script::Wrappable,
return event_listener_event_types;
}

// Register a callback to be called when an event listener is added for the
// given type.
void AddEventListenerRegistrationCallback(void* object, base::Token type,
base::OnceClosure callback);
void RemoveEventListenerRegistrationCallbacks(void* object);

protected:
virtual ~EventTarget() { environment_settings_ = nullptr; }

Expand All @@ -569,8 +575,12 @@ class EventTarget : public script::Wrappable,
// the special case of window.onerror handling.
bool unpack_onerror_events_;

// Thread checker ensures all calls to the EventTarget are made from the same
// thread that it is created in.
base::Lock event_listener_registration_mutex_;
std::map<base::Token, std::map<void*, base::OnceClosure>>
event_listener_registration_callbacks_;

// Thread checker ensures all calls to the EventTarget are made from the
// same thread that it is created in.
THREAD_CHECKER(thread_checker_);
};

Expand Down
104 changes: 72 additions & 32 deletions cobalt/web/message_port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h"
#include "base/task_runner.h"
#include "cobalt/script/environment_settings.h"
#include "cobalt/web/context.h"
Expand All @@ -34,64 +33,105 @@
namespace cobalt {
namespace web {

MessagePort::MessagePort(web::EventTarget* event_target)
: event_target_(event_target) {
if (!event_target_) {
return;
void MessagePort::EntangleWithEventTarget(web::EventTarget* event_target) {
DCHECK(!event_target_);
{
base::AutoLock lock(mutex_);
event_target_ = event_target;
if (!event_target_) {
enabled_ = false;
return;
}
}
Context* context = event_target_->environment_settings()->context();
base::MessageLoop* message_loop = context->message_loop();
if (!message_loop) {
return;
}
message_loop->task_runner()->PostTask(
target_task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&Context::AddEnvironmentSettingsChangeObserver,
base::Unretained(context), base::Unretained(this)));
base::Unretained(context()), base::Unretained(this)));
remove_environment_settings_change_observer_ =
base::BindOnce(&Context::RemoveEnvironmentSettingsChangeObserver,
base::Unretained(context), base::Unretained(this));
base::Unretained(context()), base::Unretained(this));

target_task_runner()->PostTask(
FROM_HERE,
base::Bind(
[](MessagePort* message_port,
web::EventTarget*
event_target) { // The first time a MessagePort object's
// onmessage IDL attribute is set, the
// port's port message queue must be enabled, as if the start()
// method had been called.
// https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#messageport
if (event_target->HasEventListener(base::Tokens::message())) {
message_port->Start();
} else {
event_target->AddEventListenerRegistrationCallback(
message_port, base::Tokens::message(),
base::BindOnce(&MessagePort::Start,
base::Unretained(message_port)));
}
},
base::Unretained(this), base::Unretained(event_target)));
}

MessagePort::~MessagePort() { Close(); }
void MessagePort::Start() {
// The start() method steps are to enable this's port message queue, if it is
// not already enabled.
// https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#dom-messageport-start
base::AutoLock lock(mutex_);
if (!event_target_) {
return;
}
enabled_ = true;
for (auto& message : unshipped_messages_) {
PostMessageSerializedLocked(std::move(message));
}
unshipped_messages_.clear();
}

void MessagePort::Close() {
base::AutoLock lock(mutex_);
unshipped_messages_.clear();
if (!event_target_) {
return;
}
event_target_->RemoveEventListenerRegistrationCallbacks(this);
if (remove_environment_settings_change_observer_) {
std::move(remove_environment_settings_change_observer_).Run();
}
event_target_ = nullptr;
}

void MessagePort::PostMessage(const script::ValueHandleHolder& message) {
PostMessageSerialized(std::make_unique<script::StructuredClone>(message));
auto structured_clone = std::make_unique<script::StructuredClone>(message);
{
base::AutoLock lock(mutex_);
if (!(event_target_ && enabled_)) {
unshipped_messages_.push_back(std::move(structured_clone));
return;
}
PostMessageSerializedLocked(std::move(structured_clone));
}
}

void MessagePort::PostMessageSerialized(
void MessagePort::PostMessageSerializedLocked(
std::unique_ptr<script::StructuredClone> structured_clone) {
if (!event_target_ || !structured_clone) {
return;
}
// TODO: Forward the location of the origating API call to the PostTask call.
base::MessageLoop* message_loop =
event_target_->environment_settings()->context()->message_loop();
if (!message_loop) {
if (!structured_clone || !event_target_ || !enabled_) {
return;
}
// TODO: Forward the location of the origating API call to the PostTask
// call.
// https://html.spec.whatwg.org/multipage/workers.html#handler-worker-onmessage
// TODO: Update MessageEvent to support more types. (b/227665847)
// TODO: Remove dependency of MessageEvent on net iobuffer (b/227665847)
message_loop->task_runner()->PostTask(
FROM_HERE, base::BindOnce(&MessagePort::DispatchMessage, AsWeakPtr(),
std::move(structured_clone)));
}

void MessagePort::DispatchMessage(
std::unique_ptr<script::StructuredClone> structured_clone) {
event_target_->DispatchEvent(new web::MessageEvent(
base::Tokens::message(), std::move(structured_clone)));
target_task_runner()->PostTask(
FROM_HERE,
base::BindOnce(
[](base::WeakPtr<web::EventTarget> event_target,
std::unique_ptr<script::StructuredClone> structured_clone) {
event_target->DispatchEvent(new web::MessageEvent(
base::Tokens::message(), std::move(structured_clone)));
},
event_target_->AsWeakPtr(), std::move(structured_clone)));
}

} // namespace web
Expand Down
37 changes: 28 additions & 9 deletions cobalt/web/message_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "base/callback_forward.h"
#include "base/memory/weak_ptr.h"
Expand All @@ -34,14 +36,16 @@ namespace cobalt {
namespace web {

class MessagePort : public script::Wrappable,
public base::SupportsWeakPtr<MessagePort>,
public Context::EnvironmentSettingsChangeObserver {
public:
explicit MessagePort(web::EventTarget* event_target);
~MessagePort();
MessagePort() = default;
~MessagePort() { Close(); }

MessagePort(const MessagePort&) = delete;
MessagePort& operator=(const MessagePort&) = delete;

void EntangleWithEventTarget(web::EventTarget* event_target);

void OnEnvironmentSettingsChanged(bool context_valid) override {
if (!context_valid) {
Close();
Expand All @@ -53,10 +57,8 @@ class MessagePort : public script::Wrappable,
// -> void PostMessage(const script::ValueHandleHolder& message,
// script::Sequence<script::ValueHandle*> transfer) {}
void PostMessage(const script::ValueHandleHolder& message);
void PostMessageSerialized(
std::unique_ptr<script::StructuredClone> structured_clone);

void Start() {}
void Start();
void Close();

const web::EventTargetListenerInfo::EventListenerScriptValue* onmessage()
Expand Down Expand Up @@ -91,15 +93,32 @@ class MessagePort : public script::Wrappable,
event_listener);
}

web::EventTarget* event_target() { return event_target_; }
EventTarget* event_target() const { return event_target_; }
Context* context() const {
return event_target_ ? event_target_->environment_settings()->context()
: nullptr;
}
base::TaskRunner* target_task_runner() const {
return event_target_ ? event_target_->environment_settings()
->context()
->message_loop()
->task_runner()
: nullptr;
}

DEFINE_WRAPPABLE_TYPE(MessagePort);

private:
void DispatchMessage(
void PostMessageSerializedLocked(
std::unique_ptr<script::StructuredClone> structured_clone);

// The event target to dispatch events to.
base::Lock mutex_;
std::vector<std::unique_ptr<script::StructuredClone>> unshipped_messages_;

// A port message queue can be enabled or disabled, and is initially disabled.
// https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#message-ports
bool enabled_ = false;

web::EventTarget* event_target_ = nullptr;
base::OnceClosure remove_environment_settings_change_observer_;
};
Expand Down
45 changes: 45 additions & 0 deletions cobalt/web/message_port_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 The Cobalt Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "cobalt/web/message_port.h"

#include "base/strings/string_util.h"
#include "cobalt/web/testing/test_with_javascript.h"


#define EXPECT_SUBSTRING(needle, haystack) \
EXPECT_PRED_FORMAT2(::testing::IsSubstring, (needle), (haystack))

namespace cobalt {
namespace web {

namespace {
class MessagePortTestWithJavaScript : public testing::TestWebWithJavaScript {};
} // namespace

TEST_P(MessagePortTestWithJavaScript, MessagePortIsNotConstructible) {
std::string result;
EXPECT_FALSE(EvaluateScript("var event = new MessagePort();", &result))
<< "Failed to evaluate script.";
EXPECT_SUBSTRING("TypeError: MessagePort is not constructible", result)
<< result;
}

INSTANTIATE_TEST_CASE_P(
MessagePortTestsWithJavaScript, MessagePortTestWithJavaScript,
::testing::ValuesIn(testing::TestWebWithJavaScript::GetWebTypes()),
testing::TestWebWithJavaScript::GetTypeName);

} // namespace web
} // namespace cobalt
10 changes: 5 additions & 5 deletions cobalt/worker/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

namespace cobalt {
namespace worker {
Client::Client(web::EnvironmentSettings* client)
: MessagePort(client->context()
->GetWindowOrWorkerGlobalScope()
->navigator_base()
->service_worker()) {
Client::Client(web::EnvironmentSettings* client) {
DCHECK(client);
EntangleWithEventTarget(client->context()
->GetWindowOrWorkerGlobalScope()
->navigator_base()
->service_worker());
// Algorithm for Create Client:
// https://www.w3.org/TR/2022/CRD-service-workers-20220712/#create-client
// 1. Let clientObject be a new Client object.
Expand Down
3 changes: 2 additions & 1 deletion cobalt/worker/dedicated_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ void DedicatedWorker::Initialize(script::ExceptionState* exception_state) {
// 6. Let worker be a new Worker object.
// 7. Let outside port be a new MessagePort in outside settings's Realm.
// 8. Associate the outside port with worker.
outside_port_ = new web::MessagePort(this);
outside_port_ = new web::MessagePort();
outside_port_->EntangleWithEventTarget(this);
// 9. Run this step in parallel:
// 1. Run a worker given worker, worker URL, outside settings, outside
// port, and options.
Expand Down
Loading

0 comments on commit fbe1ba5

Please sign in to comment.