Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement unshipped messages in message port. #1965

Merged
merged 3 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cobalt/web/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ target(gtest_target_type, "web_test") {
"event_target_test.cc",
"event_test.cc",
"message_event_test.cc",
"message_port_test.cc",
"url_test.cc",
"url_utils_test.cc",
"window_timers_test.cc",
Expand Down
27 changes: 27 additions & 0 deletions cobalt/web/event_target.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,22 @@ void EventTarget::TraceMembers(script::Tracer* tracer) {
// instead.
}

void EventTarget::AddEventListenerRegistrationCallback(
void* object, base::Token token, base::OnceClosure callback) {
base::AutoLock lock(event_listener_registration_mutex_);
event_listener_registration_callbacks_[token][object] = std::move(callback);
}

void EventTarget::RemoveEventListenerRegistrationCallbacks(void* object) {
base::AutoLock lock(event_listener_registration_mutex_);
for (auto& token : event_listener_registration_callbacks_)
token.second.erase(object);
}

void EventTarget::AddEventListenerInternal(
std::unique_ptr<EventTargetListenerInfo> listener_info) {
DCHECK_CALLED_ON_VALID_THREAD(thread_checker_);
DCHECK(listener_info);

// Remove existing attribute listener of the same type.
if (listener_info->is_attribute()) {
Expand Down Expand Up @@ -314,7 +327,21 @@ void EventTarget::AddEventListenerInternal(
debugger_hooks().AsyncTaskScheduled(
listener_info->task(), listener_info->type().c_str(),
base::DebuggerHooks::AsyncTaskFrequency::kRecurring);

base::Token type = listener_info->type();
event_listener_infos_.push_back(std::move(listener_info));

{
// Call the event listener registration callback.
base::AutoLock lock(event_listener_registration_mutex_);
auto callbacks = event_listener_registration_callbacks_.find(type);
if (callbacks != event_listener_registration_callbacks_.end()) {
for (auto& object : callbacks->second) {
std::move(object.second).Run();
}
event_listener_registration_callbacks_.erase(type);
}
}
}

bool EventTarget::HasEventListener(base::Token type) {
Expand Down
16 changes: 13 additions & 3 deletions cobalt/web/event_target.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef COBALT_WEB_EVENT_TARGET_H_
#define COBALT_WEB_EVENT_TARGET_H_

#include <map>
#include <memory>
#include <set>
#include <string>
Expand Down Expand Up @@ -536,7 +537,6 @@ class EventTarget : public script::Wrappable,
web::EnvironmentSettings* environment_settings() const {
return environment_settings_;
}

std::set<base::Token>& event_listener_event_types() const {
static std::set<base::Token> event_listener_event_types;
for (auto& event_listener_info : event_listener_infos_) {
Expand All @@ -545,6 +545,12 @@ class EventTarget : public script::Wrappable,
return event_listener_event_types;
}

// Register a callback to be called when an event listener is added for the
// given type.
void AddEventListenerRegistrationCallback(void* object, base::Token type,
base::OnceClosure callback);
void RemoveEventListenerRegistrationCallbacks(void* object);

protected:
virtual ~EventTarget() { environment_settings_ = nullptr; }

Expand All @@ -569,8 +575,12 @@ class EventTarget : public script::Wrappable,
// the special case of window.onerror handling.
bool unpack_onerror_events_;

// Thread checker ensures all calls to the EventTarget are made from the same
// thread that it is created in.
base::Lock event_listener_registration_mutex_;
std::map<base::Token, std::map<void*, base::OnceClosure>>
event_listener_registration_callbacks_;

// Thread checker ensures all calls to the EventTarget are made from the
// same thread that it is created in.
THREAD_CHECKER(thread_checker_);
};

Expand Down
104 changes: 72 additions & 32 deletions cobalt/web/message_port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include "base/logging.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_loop.h"
#include "base/task_runner.h"
#include "cobalt/script/environment_settings.h"
#include "cobalt/web/context.h"
Expand All @@ -34,64 +33,105 @@
namespace cobalt {
namespace web {

MessagePort::MessagePort(web::EventTarget* event_target)
: event_target_(event_target) {
if (!event_target_) {
return;
void MessagePort::EntangleWithEventTarget(web::EventTarget* event_target) {
DCHECK(!event_target_);
{
base::AutoLock lock(mutex_);
event_target_ = event_target;
if (!event_target_) {
enabled_ = false;
return;
}
}
Context* context = event_target_->environment_settings()->context();
base::MessageLoop* message_loop = context->message_loop();
if (!message_loop) {
return;
}
message_loop->task_runner()->PostTask(
target_task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&Context::AddEnvironmentSettingsChangeObserver,
base::Unretained(context), base::Unretained(this)));
base::Unretained(context()), base::Unretained(this)));
remove_environment_settings_change_observer_ =
base::BindOnce(&Context::RemoveEnvironmentSettingsChangeObserver,
base::Unretained(context), base::Unretained(this));
base::Unretained(context()), base::Unretained(this));

target_task_runner()->PostTask(
FROM_HERE,
base::Bind(
[](MessagePort* message_port,
web::EventTarget*
event_target) { // The first time a MessagePort object's
// onmessage IDL attribute is set, the
// port's port message queue must be enabled, as if the start()
// method had been called.
// https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#messageport
if (event_target->HasEventListener(base::Tokens::message())) {
message_port->Start();
} else {
event_target->AddEventListenerRegistrationCallback(
message_port, base::Tokens::message(),
base::BindOnce(&MessagePort::Start,
base::Unretained(message_port)));
}
},
base::Unretained(this), base::Unretained(event_target)));
}

MessagePort::~MessagePort() { Close(); }
void MessagePort::Start() {
// The start() method steps are to enable this's port message queue, if it is
// not already enabled.
// https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#dom-messageport-start
base::AutoLock lock(mutex_);
if (!event_target_) {
return;
}
enabled_ = true;
for (auto& message : unshipped_messages_) {
PostMessageSerializedLocked(std::move(message));
}
unshipped_messages_.clear();
}

void MessagePort::Close() {
base::AutoLock lock(mutex_);
unshipped_messages_.clear();
if (!event_target_) {
return;
}
event_target_->RemoveEventListenerRegistrationCallbacks(this);
if (remove_environment_settings_change_observer_) {
std::move(remove_environment_settings_change_observer_).Run();
}
event_target_ = nullptr;
}

void MessagePort::PostMessage(const script::ValueHandleHolder& message) {
PostMessageSerialized(std::make_unique<script::StructuredClone>(message));
auto structured_clone = std::make_unique<script::StructuredClone>(message);
{
base::AutoLock lock(mutex_);
if (!(event_target_ && enabled_)) {
unshipped_messages_.push_back(std::move(structured_clone));
return;
}
PostMessageSerializedLocked(std::move(structured_clone));
}
}

void MessagePort::PostMessageSerialized(
void MessagePort::PostMessageSerializedLocked(
std::unique_ptr<script::StructuredClone> structured_clone) {
if (!event_target_ || !structured_clone) {
return;
}
// TODO: Forward the location of the origating API call to the PostTask call.
base::MessageLoop* message_loop =
event_target_->environment_settings()->context()->message_loop();
if (!message_loop) {
if (!structured_clone || !event_target_ || !enabled_) {
return;
}
// TODO: Forward the location of the origating API call to the PostTask
// call.
// https://html.spec.whatwg.org/multipage/workers.html#handler-worker-onmessage
// TODO: Update MessageEvent to support more types. (b/227665847)
// TODO: Remove dependency of MessageEvent on net iobuffer (b/227665847)
message_loop->task_runner()->PostTask(
FROM_HERE, base::BindOnce(&MessagePort::DispatchMessage, AsWeakPtr(),
std::move(structured_clone)));
}

void MessagePort::DispatchMessage(
std::unique_ptr<script::StructuredClone> structured_clone) {
event_target_->DispatchEvent(new web::MessageEvent(
base::Tokens::message(), std::move(structured_clone)));
target_task_runner()->PostTask(
FROM_HERE,
base::BindOnce(
[](base::WeakPtr<web::EventTarget> event_target,
std::unique_ptr<script::StructuredClone> structured_clone) {
event_target->DispatchEvent(new web::MessageEvent(
base::Tokens::message(), std::move(structured_clone)));
},
event_target_->AsWeakPtr(), std::move(structured_clone)));
}

} // namespace web
Expand Down
37 changes: 28 additions & 9 deletions cobalt/web/message_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "base/callback_forward.h"
#include "base/memory/weak_ptr.h"
Expand All @@ -34,14 +36,16 @@ namespace cobalt {
namespace web {

class MessagePort : public script::Wrappable,
public base::SupportsWeakPtr<MessagePort>,
public Context::EnvironmentSettingsChangeObserver {
public:
explicit MessagePort(web::EventTarget* event_target);
~MessagePort();
MessagePort() = default;
~MessagePort() { Close(); }

MessagePort(const MessagePort&) = delete;
MessagePort& operator=(const MessagePort&) = delete;

void EntangleWithEventTarget(web::EventTarget* event_target);

void OnEnvironmentSettingsChanged(bool context_valid) override {
if (!context_valid) {
Close();
Expand All @@ -53,10 +57,8 @@ class MessagePort : public script::Wrappable,
// -> void PostMessage(const script::ValueHandleHolder& message,
// script::Sequence<script::ValueHandle*> transfer) {}
void PostMessage(const script::ValueHandleHolder& message);
void PostMessageSerialized(
std::unique_ptr<script::StructuredClone> structured_clone);

void Start() {}
void Start();
void Close();

const web::EventTargetListenerInfo::EventListenerScriptValue* onmessage()
Expand Down Expand Up @@ -91,15 +93,32 @@ class MessagePort : public script::Wrappable,
event_listener);
}

web::EventTarget* event_target() { return event_target_; }
EventTarget* event_target() const { return event_target_; }
Context* context() const {
return event_target_ ? event_target_->environment_settings()->context()
: nullptr;
}
base::TaskRunner* target_task_runner() const {
return event_target_ ? event_target_->environment_settings()
->context()
->message_loop()
->task_runner()
: nullptr;
}

DEFINE_WRAPPABLE_TYPE(MessagePort);

private:
void DispatchMessage(
void PostMessageSerializedLocked(
std::unique_ptr<script::StructuredClone> structured_clone);

// The event target to dispatch events to.
base::Lock mutex_;
std::vector<std::unique_ptr<script::StructuredClone>> unshipped_messages_;

// A port message queue can be enabled or disabled, and is initially disabled.
// https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#message-ports
bool enabled_ = false;

web::EventTarget* event_target_ = nullptr;
base::OnceClosure remove_environment_settings_change_observer_;
};
Expand Down
45 changes: 45 additions & 0 deletions cobalt/web/message_port_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2023 The Cobalt Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "cobalt/web/message_port.h"

#include "base/strings/string_util.h"
#include "cobalt/web/testing/test_with_javascript.h"


#define EXPECT_SUBSTRING(needle, haystack) \
EXPECT_PRED_FORMAT2(::testing::IsSubstring, (needle), (haystack))

namespace cobalt {
namespace web {

namespace {
class MessagePortTestWithJavaScript : public testing::TestWebWithJavaScript {};
} // namespace

TEST_P(MessagePortTestWithJavaScript, MessagePortIsNotConstructible) {
std::string result;
EXPECT_FALSE(EvaluateScript("var event = new MessagePort();", &result))
<< "Failed to evaluate script.";
EXPECT_SUBSTRING("TypeError: MessagePort is not constructible", result)
<< result;
}

INSTANTIATE_TEST_CASE_P(
MessagePortTestsWithJavaScript, MessagePortTestWithJavaScript,
::testing::ValuesIn(testing::TestWebWithJavaScript::GetWebTypes()),
testing::TestWebWithJavaScript::GetTypeName);

} // namespace web
} // namespace cobalt
10 changes: 5 additions & 5 deletions cobalt/worker/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

namespace cobalt {
namespace worker {
Client::Client(web::EnvironmentSettings* client)
: MessagePort(client->context()
->GetWindowOrWorkerGlobalScope()
->navigator_base()
->service_worker()) {
Client::Client(web::EnvironmentSettings* client) {
DCHECK(client);
EntangleWithEventTarget(client->context()
->GetWindowOrWorkerGlobalScope()
->navigator_base()
->service_worker());
// Algorithm for Create Client:
// https://www.w3.org/TR/2022/CRD-service-workers-20220712/#create-client
// 1. Let clientObject be a new Client object.
Expand Down
3 changes: 2 additions & 1 deletion cobalt/worker/dedicated_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ void DedicatedWorker::Initialize(script::ExceptionState* exception_state) {
// 6. Let worker be a new Worker object.
// 7. Let outside port be a new MessagePort in outside settings's Realm.
// 8. Associate the outside port with worker.
outside_port_ = new web::MessagePort(this);
outside_port_ = new web::MessagePort();
outside_port_->EntangleWithEventTarget(this);
// 9. Run this step in parallel:
// 1. Run a worker given worker, worker URL, outside settings, outside
// port, and options.
Expand Down
Loading