Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use posix socket APIs #4206

Open
wants to merge 6 commits into
base: 25.lts.1+
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions base/files/file_util_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,17 @@ FilePath MakeAbsoluteFilePath(const FilePath& input) {
return input;
}

bool SetNonBlocking(int fd) {
const int flags = fcntl(fd, F_GETFL);
if (flags == -1)
return false;
if (flags & O_NONBLOCK)
return true;
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
return false;
return true;
}

namespace internal {

bool MoveUnsafe(const FilePath& from_path, const FilePath& to_path) {
Expand Down
141 changes: 139 additions & 2 deletions base/message_loop/message_pump_io_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

#include "base/message_loop/message_pump_io_starboard.h"

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
#include <sys/socket.h>
#include <sys/stat.h>
#include <netinet/in.h>
#endif

#include "base/auto_reset.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
Expand All @@ -28,17 +34,46 @@ namespace base {
MessagePumpIOStarboard::SocketWatcher::SocketWatcher(const Location& from_here)
: created_from_location_(from_here),
interests_(kSbSocketWaiterInterestNone),
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
socket_(-1),
#else
socket_(kSbSocketInvalid),
#endif
pump_(nullptr),
watcher_(nullptr),
weak_factory_(this) {}

MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() {
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
if (socket_ >= 0) {
StopWatchingFileDescriptor();
}
#else
if (SbSocketIsValid(socket_)) {
StopWatchingSocket();
}
#endif
}

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
bool MessagePumpIOStarboard::SocketWatcher::StopWatchingFileDescriptor() {
watcher_ = nullptr;
interests_ = kSbSocketWaiterInterestNone;
if (socket_ < 0) {
pump_ = nullptr;
// If this watcher is not watching anything, no-op and return success.
return true;
}
int socket = Release();
bool result = true;
if (socket >= 0) {
DCHECK(pump_);
result = pump_->StopWatchingFileDescriptor(socket);
}
pump_ = nullptr;
return result;
}
#else
bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
watcher_ = nullptr;
interests_ = kSbSocketWaiterInterestNone;
Expand All @@ -64,21 +99,59 @@ bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
pump_ = nullptr;
return result;
}
#endif

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
void MessagePumpIOStarboard::SocketWatcher::Init(int socket,
bool persistent) {
DCHECK(socket >= 0);
DCHECK(socket_ < 0);
#else
void MessagePumpIOStarboard::SocketWatcher::Init(SbSocket socket,
bool persistent) {
DCHECK(socket);
DCHECK(!socket_);
#endif
socket_ = socket;
persistent_ = persistent;
}

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
int MessagePumpIOStarboard::SocketWatcher::Release() {
int socket = socket_;
socket_ = -1;
return socket;
}
#else
SbSocket MessagePumpIOStarboard::SocketWatcher::Release() {
SbSocket socket = socket_;
socket_ = kSbSocketInvalid;
return socket;
}
#endif

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
void MessagePumpIOStarboard::SocketWatcher::OnFileCanReadWithoutBlocking(
int socket,
MessagePumpIOStarboard* pump) {
if (!watcher_)
return;
pump->WillProcessIOEvent();
watcher_->OnFileCanReadWithoutBlocking(socket);
pump->DidProcessIOEvent();
}

void MessagePumpIOStarboard::SocketWatcher::OnFileCanWriteWithoutBlocking(
int socket,
MessagePumpIOStarboard* pump) {
if (!watcher_)
return;
pump->WillProcessIOEvent();
watcher_->OnFileCanWriteWithoutBlocking(socket);
pump->DidProcessIOEvent();
}

#else
void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToRead(
SbSocket socket,
MessagePumpIOStarboard* pump) {
Expand All @@ -98,6 +171,7 @@ void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToWrite(
watcher_->OnSocketReadyToWrite(socket);
pump->DidProcessIOEvent();
}
#endif

MessagePumpIOStarboard::MessagePumpIOStarboard()
: keep_running_(true),
Expand All @@ -109,12 +183,21 @@ MessagePumpIOStarboard::~MessagePumpIOStarboard() {
SbSocketWaiterDestroy(waiter_);
}

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
bool MessagePumpIOStarboard::WatchFileDescriptor(int socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(socket >= 0);
#else
bool MessagePumpIOStarboard::Watch(SbSocket socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(SbSocketIsValid(socket));
#endif
DCHECK(controller);
DCHECK(delegate);
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
Expand All @@ -130,8 +213,13 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
interests |= kSbSocketWaiterInterestWrite;
}

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
int old_socket = controller->Release();
if (old_socket >= 0) {
#else
SbSocket old_socket = controller->Release();
if (SbSocketIsValid(old_socket)) {
#endif
// It's illegal to use this function to listen on 2 separate fds with the
// same |controller|.
if (old_socket != socket) {
Expand All @@ -148,12 +236,24 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
interests |= old_interest_mask;

// Must disarm the event before we can reuse it.
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
SbPosixSocketWaiterRemove(waiter_, old_socket);
#else
SbSocketWaiterRemove(waiter_, old_socket);
#endif // SB_API_VERSION >= 16 && !defined(_MSC_VER)
}

// Set current interest mask and waiter for this event.
if (!SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests, persistent)) {
bool result = false;
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
result = SbPosixSocketWaiterAdd(waiter_, socket, controller,
OnPosixSocketWaiterNotification, interests, persistent);

#else
result = SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests, persistent);
#endif // SB_API_VERSION >= 16 && !defined(_MSC_VER)
if (result == false) {
return false;
}

Expand All @@ -164,9 +264,15 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
return true;
}

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
bool MessagePumpIOStarboard::StopWatchingFileDescriptor(int socket) {
return SbPosixSocketWaiterRemove(waiter_, socket);
}
#else
bool MessagePumpIOStarboard::StopWatching(SbSocket socket) {
return SbSocketWaiterRemove(waiter_, socket);
}
#endif // SB_API_VERSION >= 16 && !defined(_MSC_VER)

void MessagePumpIOStarboard::AddIOObserver(IOObserver* obs) {
io_observers_.AddObserver(obs);
Expand Down Expand Up @@ -259,6 +365,36 @@ void MessagePumpIOStarboard::DidProcessIOEvent() {
}
}

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)

// static
void MessagePumpIOStarboard::OnPosixSocketWaiterNotification(SbSocketWaiter waiter,
int socket,
void* context,
int ready_interests) {
base::WeakPtr<SocketWatcher> controller =
static_cast<SocketWatcher*>(context)->weak_factory_.GetWeakPtr();
DCHECK(controller.get());

MessagePumpIOStarboard* pump = controller->pump();
pump->processed_io_events_ = true;

// If not persistent, the watch has been released at this point.
if (!controller->persistent()) {
controller->Release();
}

if (ready_interests & kSbSocketWaiterInterestWrite) {
controller->OnFileCanWriteWithoutBlocking(socket, pump);
}

// Check |controller| in case it's been deleted previously.
if (controller.get() && ready_interests & kSbSocketWaiterInterestRead) {
controller->OnFileCanReadWithoutBlocking(socket, pump);
}
}

#else
// static
void MessagePumpIOStarboard::OnSocketWaiterNotification(SbSocketWaiter waiter,
SbSocket socket,
Expand Down Expand Up @@ -286,4 +422,5 @@ void MessagePumpIOStarboard::OnSocketWaiterNotification(SbSocketWaiter waiter,
}
}

#endif // SB_API_VERSION >= 16 && !defined(_MSC_VER)
} // namespace base
43 changes: 43 additions & 0 deletions base/message_loop/message_pump_io_starboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef BASE_MESSAGE_PUMP_IO_STARBOARD_H_
#define BASE_MESSAGE_PUMP_IO_STARBOARD_H_

#include "starboard/configuration.h"

#include "base/compiler_specific.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_pump.h"
Expand Down Expand Up @@ -49,8 +51,13 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
public:
// These methods are called from MessageLoop::Run when a socket can be
// interacted with without blocking.
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
virtual void OnFileCanReadWithoutBlocking(int socket) {}
virtual void OnFileCanWriteWithoutBlocking(int socket) {}
#else
virtual void OnSocketReadyToRead(SbSocket socket) {}
virtual void OnSocketReadyToWrite(SbSocket socket) {}
#endif

protected:
virtual ~Watcher() {}
Expand All @@ -70,7 +77,11 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

// Stops watching the socket, always safe to call. No-op if there's nothing
// to do.
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
bool StopWatchingFileDescriptor();
#else
bool StopWatchingSocket();
#endif

bool persistent() const { return persistent_; }

Expand All @@ -79,8 +90,13 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
friend class MessagePumpIOStarboardTest;

// Called by MessagePumpIOStarboard.
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
void Init(int socket, bool persistent);
int Release();
#else
void Init(SbSocket socket, bool persistent);
SbSocket Release();
#endif

int interests() const { return interests_; }
void set_interests(int interests) { interests_ = interests; }
Expand All @@ -90,12 +106,21 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

void set_watcher(Watcher* watcher) { watcher_ = watcher; }

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
void OnFileCanReadWithoutBlocking(int socket, MessagePumpIOStarboard* pump);
void OnFileCanWriteWithoutBlocking(int socket, MessagePumpIOStarboard* pump);
#else
void OnSocketReadyToRead(SbSocket socket, MessagePumpIOStarboard* pump);
void OnSocketReadyToWrite(SbSocket socket, MessagePumpIOStarboard* pump);
#endif

const Location created_from_location_;
int interests_;
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
int socket_;
#else
SbSocket socket_;
#endif
bool persistent_;
MessagePumpIOStarboard* pump_;
Watcher* watcher_;
Expand Down Expand Up @@ -123,6 +148,16 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
// If an error occurs while calling this method in a cumulative fashion, the
// event previously attached to |controller| is aborted. Returns true on
// success. Must be called on the same thread the message_pump is running on.
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
bool WatchFileDescriptor(int socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate);

// Stops watching the socket.
bool StopWatchingFileDescriptor(int socket);
#else
bool Watch(SbSocket socket,
bool persistent,
int mode,
Expand All @@ -131,6 +166,7 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

// Stops watching the socket.
bool StopWatching(SbSocket socket);
#endif

void AddIOObserver(IOObserver* obs);
void RemoveIOObserver(IOObserver* obs);
Expand All @@ -149,10 +185,17 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

// Called by SbSocketWaiter to tell us a registered socket can be read and/or
// written to.
#if SB_API_VERSION >= 16 && !defined(_MSC_VER)
static void OnPosixSocketWaiterNotification(SbSocketWaiter waiter,
int socket,
void* context,
int ready_interests);
#else
static void OnSocketWaiterNotification(SbSocketWaiter waiter,
SbSocket socket,
void* context,
int ready_interests);
#endif

bool should_quit() const { return !keep_running_; }

Expand Down
Loading
Loading