Skip to content

Commit

Permalink
Implement unshipped messages in message port.
Browse files Browse the repository at this point in the history
Implement storage of unshipped message in message port, to allow a
message port to receive and hold messages before the destination is
ready to receive them.

Use the above functionality to unblock worker.postMessage calls to no
longer wait until the worker script completes loading and executing.

b/310240306
  • Loading branch information
jellefoks committed Nov 11, 2023
1 parent c3e9811 commit a9529d7
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 30 deletions.
17 changes: 12 additions & 5 deletions cobalt/web/message_port.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
namespace cobalt {
namespace web {

MessagePort::MessagePort(web::EventTarget* event_target)
: event_target_(event_target) {
void MessagePort::SetEventTarget(web::EventTarget* event_target) {
DCHECK(!event_target_);
event_target_ = event_target;
if (!event_target_) {
return;
}
Expand All @@ -51,9 +52,11 @@ MessagePort::MessagePort(web::EventTarget* event_target)
remove_environment_settings_change_observer_ =
base::BindOnce(&Context::RemoveEnvironmentSettingsChangeObserver,
base::Unretained(context), base::Unretained(this));
}

MessagePort::~MessagePort() { Close(); }
for (auto& message : unshipped_messages_) {
PostMessageSerialized(std::move(message));
}
}

void MessagePort::Close() {
if (!event_target_) {
Expand All @@ -71,7 +74,11 @@ void MessagePort::PostMessage(const script::ValueHandleHolder& message) {

void MessagePort::PostMessageSerialized(
std::unique_ptr<script::StructuredClone> structured_clone) {
if (!event_target_ || !structured_clone) {
if (!structured_clone) {
return;
}
if (!event_target_) {
unshipped_messages_.push_back(std::move(structured_clone));
return;
}
// TODO: Forward the location of the origating API call to the PostTask call.
Expand Down
12 changes: 9 additions & 3 deletions cobalt/web/message_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "base/callback_forward.h"
#include "base/memory/weak_ptr.h"
Expand All @@ -37,11 +39,14 @@ class MessagePort : public script::Wrappable,
public base::SupportsWeakPtr<MessagePort>,
public Context::EnvironmentSettingsChangeObserver {
public:
explicit MessagePort(web::EventTarget* event_target);
~MessagePort();
MessagePort() {}
~MessagePort() { Close(); }

MessagePort(const MessagePort&) = delete;
MessagePort& operator=(const MessagePort&) = delete;

void SetEventTarget(web::EventTarget* event_target);

void OnEnvironmentSettingsChanged(bool context_valid) override {
if (!context_valid) {
Close();
Expand Down Expand Up @@ -99,7 +104,8 @@ class MessagePort : public script::Wrappable,
void DispatchMessage(
std::unique_ptr<script::StructuredClone> structured_clone);

// The event target to dispatch events to.
std::vector<std::unique_ptr<script::StructuredClone>> unshipped_messages_;

web::EventTarget* event_target_ = nullptr;
base::OnceClosure remove_environment_settings_change_observer_;
};
Expand Down
10 changes: 5 additions & 5 deletions cobalt/worker/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@

namespace cobalt {
namespace worker {
Client::Client(web::EnvironmentSettings* client)
: MessagePort(client->context()
->GetWindowOrWorkerGlobalScope()
->navigator_base()
->service_worker()) {
Client::Client(web::EnvironmentSettings* client) {
DCHECK(client);
SetEventTarget(client->context()
->GetWindowOrWorkerGlobalScope()
->navigator_base()
->service_worker());
// Algorithm for Create Client:
// https://www.w3.org/TR/2022/CRD-service-workers-20220712/#create-client
// 1. Let clientObject be a new Client object.
Expand Down
3 changes: 2 additions & 1 deletion cobalt/worker/dedicated_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ void DedicatedWorker::Initialize(script::ExceptionState* exception_state) {
// 6. Let worker be a new Worker object.
// 7. Let outside port be a new MessagePort in outside settings's Realm.
// 8. Associate the outside port with worker.
outside_port_ = new web::MessagePort(this);
outside_port_ = new web::MessagePort();
outside_port_->SetEventTarget(this);
// 9. Run this step in parallel:
// 1. Run a worker given worker, worker URL, outside settings, outside
// port, and options.
Expand Down
11 changes: 3 additions & 8 deletions cobalt/worker/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ void Worker::WillDestroyCurrentMessageLoop() {
// Destroy members that were constructed in the worker thread.
loader_.reset();
worker_global_scope_ = nullptr;
message_port_ = nullptr;
content_.reset();
}

Expand Down Expand Up @@ -265,7 +264,7 @@ void Worker::Execute(const std::string& content,
// Done at step 8.
// 16. Let inside port be a new MessagePort object in inside settings's Realm.
// 17. Associate inside port with worker global scope.
message_port_ = new web::MessagePort(worker_global_scope_);
message_port_.SetEventTarget(worker_global_scope_);
// 18. Entangle outside port and inside port.
// TODO(b/226640425): Implement this when Message Ports can be entangled.
// 19. Create a new WorkerLocation object and associate it with worker global
Expand All @@ -280,7 +279,6 @@ void Worker::Execute(const std::string& content,
// worker until such time as either the closing flag switches to true or
// the worker stops being a suspendable worker.
// 22. Set inside settings's execution ready flag.
execution_ready_.Signal();
// 23. If script is a classic script, then run the classic script script.
// Otherwise, it is a module script; run the module script script.

Expand Down Expand Up @@ -362,14 +360,11 @@ void Worker::PostMessage(const script::ValueHandleHolder& message) {
if (base::MessageLoop::current() != message_loop()) {
// Block until the worker thread is ready to execute code to handle the
// event.
execution_ready_.Wait();
message_loop()->task_runner()->PostTask(
FROM_HERE, base::BindOnce(&web::MessagePort::PostMessageSerialized,
message_port()->AsWeakPtr(),
std::move(structured_clone)));
&message_port_, std::move(structured_clone)));
} else {
DCHECK(execution_ready_.IsSignaled());
message_port()->PostMessageSerialized(std::move(structured_clone));
message_port_.PostMessageSerialized(std::move(structured_clone));
}
}

Expand Down
10 changes: 2 additions & 8 deletions cobalt/worker/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class Worker : public base::MessageLoop::DestructionObserver {

void Terminate();

web::MessagePort* message_port() const { return message_port_.get(); }
// web::MessagePort& message_port() { return message_port_; }

// The message loop this object is running on.
base::MessageLoop* message_loop() const {
Expand Down Expand Up @@ -131,19 +131,13 @@ class Worker : public base::MessageLoop::DestructionObserver {
scoped_refptr<WorkerGlobalScope> worker_global_scope_;

// Inner message port.
scoped_refptr<web::MessagePort> message_port_;
web::MessagePort message_port_;

// The loader that is used for asynchronous loads.
std::unique_ptr<loader::Loader> loader_;

// Content of the script. Released after Execute is called.
std::unique_ptr<std::string> content_;

// The execution ready flag.
// https://html.spec.whatwg.org/commit-snapshots/465a6b672c703054de278b0f8133eb3ad33d93f4/#concept-environment-execution-ready-flag
base::WaitableEvent execution_ready_ = {
base::WaitableEvent::ResetPolicy::MANUAL,
base::WaitableEvent::InitialState::NOT_SIGNALED};
};

} // namespace worker
Expand Down

0 comments on commit a9529d7

Please sign in to comment.