Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore recvmsg in udp_socket_posix #4151

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 69 additions & 6 deletions net/socket/socket_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
#endif // BUILDFLAG(IS_FUCHSIA)

namespace net {


namespace {

int MapAcceptError(int os_error) {
Expand Down Expand Up @@ -71,7 +69,13 @@ int MapConnectError(int os_error) {

SocketPosix::SocketPosix()
: socket_fd_(kInvalidSocket),
#if defined(STARBOARD)
socket_watcher_(FROM_HERE) {}
#else
accept_socket_watcher_(FROM_HERE),
read_socket_watcher_(FROM_HERE),
write_socket_watcher_(FROM_HERE) {}
#endif

SocketPosix::~SocketPosix() {
Close();
Expand Down Expand Up @@ -143,7 +147,7 @@ int SocketPosix::Bind(const SockaddrStorage& address) {

int rv = bind(socket_fd_, address.addr, address.addr_len);
if (rv < 0) {
PLOG(ERROR) << "bind() failed -errno- :" << errno;
PLOG(ERROR) << "bind() failed";
return MapSystemError(errno);
}

Expand Down Expand Up @@ -178,7 +182,11 @@ int SocketPosix::Accept(std::unique_ptr<SocketPosix>* socket,

if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_fd_, true, base::MessagePumpForIO::WATCH_READ,
#if defined(STARBOARD)
&socket_watcher_, this)) {
#else
&accept_socket_watcher_, this)) {
#endif
PLOG(ERROR) << "WatchFileDescriptor failed on accept";
return MapSystemError(errno);
}
Expand All @@ -203,7 +211,11 @@ int SocketPosix::Connect(const SockaddrStorage& address,

if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_fd_, true, base::MessagePumpForIO::WATCH_WRITE,
#if defined(STARBOARD)
&socket_watcher_, this)) {
#else
&write_socket_watcher_, this)) {
#endif
PLOG(ERROR) << "WatchFileDescriptor failed on connect";
return MapSystemError(errno);
}
Expand All @@ -223,7 +235,11 @@ int SocketPosix::Connect(const SockaddrStorage& address,

rv = MapConnectError(errno);
if (rv != OK && rv != ERR_IO_PENDING) {
#if defined(STARBOARD)
ClearWatcherIfOperationsNotPending();
#else
write_socket_watcher_.StopWatchingFileDescriptor();
#endif
return rv;
}

Expand Down Expand Up @@ -299,7 +315,11 @@ int SocketPosix::ReadIfReady(IOBuffer* buf,

if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_fd_, true, base::MessagePumpForIO::WATCH_READ,
#if defined(STARBOARD)
&socket_watcher_, this)) {
#else
&read_socket_watcher_, this)) {
#endif
PLOG(ERROR) << "WatchFileDescriptor failed on read";
return MapSystemError(errno);
}
Expand All @@ -311,7 +331,11 @@ int SocketPosix::ReadIfReady(IOBuffer* buf,
int SocketPosix::CancelReadIfReady() {
DCHECK(read_if_ready_callback_);

#if defined(STARBOARD)
bool ok = ClearWatcherIfOperationsNotPending();
#else
bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);

read_if_ready_callback_.Reset();
Expand Down Expand Up @@ -349,7 +373,11 @@ int SocketPosix::WaitForWrite(IOBuffer* buf,

if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_fd_, true, base::MessagePumpForIO::WATCH_WRITE,
#if defined(STARBOARD)
&socket_watcher_, this)) {
#else
&write_socket_watcher_, this)) {
#endif
PLOG(ERROR) << "WatchFileDescriptor failed on write";
return MapSystemError(errno);
}
Expand Down Expand Up @@ -413,7 +441,12 @@ void SocketPosix::OnFileCanReadWithoutBlocking(int fd) {
"SocketPosix::OnFileCanReadWithoutBlocking");
if (!accept_callback_.is_null()) {
AcceptCompleted();
} else if (!read_if_ready_callback_.is_null()){
#if defined(STARBOARD)
} else if (!read_if_ready_callback_.is_null()) {
#else
} else {
DCHECK(!read_if_ready_callback_.is_null());
#endif
ReadCompleted();
}
}
Expand Down Expand Up @@ -450,7 +483,11 @@ void SocketPosix::AcceptCompleted() {
if (rv == ERR_IO_PENDING)
return;

#if defined(STARBOARD)
bool ok = ClearWatcherIfOperationsNotPending();
#else
bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);
accept_socket_ = nullptr;
std::move(accept_callback_).Run(rv);
Expand All @@ -477,14 +514,22 @@ void SocketPosix::ConnectCompleted() {
if (rv == ERR_IO_PENDING)
return;

#if defined(STARBOARD)
bool ok = socket_watcher_.StopWatchingFileDescriptor();
#else
bool ok = write_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);
waiting_connect_ = false;
std::move(write_callback_).Run(rv);
}

int SocketPosix::DoRead(IOBuffer* buf, int buf_len) {
#if defined(STARBOARD)
int rv = HANDLE_EINTR(recv(socket_fd_, buf->data(), buf_len, 0));
#else
int rv = HANDLE_EINTR(read(socket_fd_, buf->data(), buf_len, 0));
#endif
return rv >= 0 ? rv : MapSystemError(errno);
}

Expand All @@ -508,7 +553,11 @@ void SocketPosix::RetryRead(int rv) {
void SocketPosix::ReadCompleted() {
DCHECK(read_if_ready_callback_);

#if defined(STARBOARD)
bool ok = socket_watcher_.StopWatchingFileDescriptor();
#else
bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);
std::move(read_if_ready_callback_).Run(OK);
}
Expand All @@ -528,8 +577,7 @@ int SocketPosix::DoWrite(IOBuffer* buf, int buf_len) {
int rv = HANDLE_EINTR(send(socket_fd_, buf->data(), buf_len, kSendFlags));
#else
int rv = HANDLE_EINTR(write(socket_fd_, buf->data(), buf_len));
#endif // BUILDFLAG(IS_LINUX) || BUILDFLAG(IS_CHROMEOS) || BUILDFLAG(IS_ANDROID) ||\
// defined(STARBOARD)
#endif
return rv >= 0 ? rv : MapSystemError(errno);
}

Expand All @@ -538,24 +586,39 @@ void SocketPosix::WriteCompleted() {
if (rv == ERR_IO_PENDING)
return;

#if defined(STARBOARD)
bool ok = ClearWatcherIfOperationsNotPending();
#else
bool ok = write_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);
write_buf_.reset();
write_buf_len_ = 0;
std::move(write_callback_).Run(rv);
}

#if defined(STARBOARD)
bool SocketPosix::ClearWatcherIfOperationsNotPending() {
bool ok = true;
if (!read_pending() && !write_pending() && !accept_pending()) {
ok = socket_watcher_.StopWatchingFileDescriptor();
}
return ok;
}
#endif

void SocketPosix::StopWatchingAndCleanUp(bool close_socket) {
#if defined(STARBOARD)
bool ok = socket_watcher_.StopWatchingFileDescriptor();
DCHECK(ok);
#else
bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
DCHECK(ok);
ok = read_socket_watcher_.StopWatchingFileDescriptor();
DCHECK(ok);
ok = write_socket_watcher_.StopWatchingFileDescriptor();
DCHECK(ok);
#endif

// These needs to be done after the StopWatchingFileDescriptor() calls, but
// before deleting the write buffer.
Expand Down
20 changes: 19 additions & 1 deletion net/socket/socket_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ struct SockaddrStorage;
// Socket class to provide asynchronous read/write operations on top of the
// posix socket api. It supports AF_INET, AF_INET6, and AF_UNIX addresses.
class NET_EXPORT_PRIVATE SocketPosix
#if defined(STARBOARD)
: public base::MessagePumpForIO::Watcher {
#else
: public base::MessagePumpForIO::FdWatcher {
#endif
public:
SocketPosix();

Expand Down Expand Up @@ -107,7 +111,7 @@ class NET_EXPORT_PRIVATE SocketPosix
SocketDescriptor socket_fd() const { return socket_fd_; }

private:
// base::MessagePumpForIO::Watcher methods.
// base::MessagePumpForIO::FdWatcher methods.
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override;

Expand All @@ -124,22 +128,32 @@ class NET_EXPORT_PRIVATE SocketPosix
int DoWrite(IOBuffer* buf, int buf_len);
void WriteCompleted();

#if defined(STARBOARD)
bool read_pending() const { return !read_if_ready_callback_.is_null(); }
bool write_pending() const {
return !write_callback_.is_null() && !waiting_connect_;
}
bool accept_pending() const { return !accept_callback_.is_null(); }

bool ClearWatcherIfOperationsNotPending();
#endif

// |close_socket| indicates whether the socket should also be closed.
void StopWatchingAndCleanUp(bool close_socket);

SocketDescriptor socket_fd_;

#if !defined(STARBOARD)
base::MessagePumpForIO::FdWatchController accept_socket_watcher_;
#endif

raw_ptr<std::unique_ptr<SocketPosix>> accept_socket_;
CompletionOnceCallback accept_callback_;

#if !defined(STARBOARD)
base::MessagePumpForIO::FdWatchController read_socket_watcher_;
#endif

// Non-null when a Read() is in progress.
scoped_refptr<IOBuffer> read_buf_;
int read_buf_len_ = 0;
Expand All @@ -148,7 +162,11 @@ class NET_EXPORT_PRIVATE SocketPosix
// Non-null when a ReadIfReady() is in progress.
CompletionOnceCallback read_if_ready_callback_;

#if defined(STARBOARD)
base::MessagePumpForIO::SocketWatcher socket_watcher_;
#else
base::MessagePumpForIO::FdWatchController write_socket_watcher_;
#endif
scoped_refptr<IOBuffer> write_buf_;
int write_buf_len_ = 0;
// External callback; called when write or connect is complete.
Expand Down
6 changes: 6 additions & 0 deletions net/socket/tcp_socket_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ int TCPSocketPosix::Bind(const IPEndPoint& address) {
SockaddrStorage storage;
if (!address.ToSockAddr(storage.addr, &storage.addr_len))
return ERR_ADDRESS_INVALID;

return socket_->Bind(storage);
}

Expand All @@ -250,11 +251,16 @@ int TCPSocketPosix::Accept(std::unique_ptr<TCPSocketPosix>* tcp_socket,
CompletionOnceCallback callback) {
DCHECK(tcp_socket);
DCHECK(!callback.is_null());
#if !defined(STARBOARD)
DCHECK(socket_);
#endif
DCHECK(!accept_socket_);

#if defined(STARBOARD)
if ((!socket_)) {
return MapSystemError(errno);
}
#endif

net_log_.BeginEvent(NetLogEventType::TCP_ACCEPT);

Expand Down
32 changes: 22 additions & 10 deletions net/socket/udp_socket_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,13 +573,13 @@ int UDPSocketPosix::SetDoNotFragment() {
}

void UDPSocketPosix::SetMsgConfirm(bool confirm) {
#if !BUILDFLAG(IS_APPLE) && defined(MSG_CONFIRM)
#if !BUILDFLAG(IS_APPLE)
if (confirm) {
sendto_flags_ |= MSG_CONFIRM;
} else {
sendto_flags_ &= ~MSG_CONFIRM;
}
#endif // !BUILDFLAG(IS_APPLE) && defined(MSG_CONFIRM)
#endif // !BUILDFLAG(IS_APPLE)
}

int UDPSocketPosix::AllowAddressReuse() {
Expand Down Expand Up @@ -777,23 +777,31 @@ int UDPSocketPosix::InternalRecvFromNonConnectedSocket(IOBuffer* buf,
int buf_len,
IPEndPoint* address) {
SockaddrStorage storage;
int result = -1;
int bytes_transferred = -1;
bytes_transferred = HANDLE_EINTR(recvfrom(socket_, buf->data(), static_cast<size_t>(buf_len),
0, storage.addr, &storage.addr_len));
struct iovec iov = {
.iov_base = buf->data(),
.iov_len = static_cast<size_t>(buf_len),
};
struct msghdr msg = {
.msg_name = storage.addr,
.msg_namelen = storage.addr_len,
.msg_iov = &iov,
.msg_iovlen = 1,
};
int result;
int bytes_transferred = HANDLE_EINTR(recvmsg(socket_, &msg, 0));
if (bytes_transferred < 0) {
result = MapSystemError(errno);
if (result == ERR_IO_PENDING) {
return result;
}
} else {
if (bytes_transferred == buf_len) {
storage.addr_len = msg.msg_namelen;
if (msg.msg_flags & MSG_TRUNC) {
// NB: recvfrom(..., MSG_TRUNC, ...) would be a simpler way to do this on
// Linux, but isn't supported by POSIX.
// When received data size == buffer size, it means the buffer isn't big enough,
// i.e. truncated.
result = ERR_MSG_TOO_BIG;
} else if (address && !address->FromSockAddr(storage.addr, storage.addr_len)) {
} else if (address &&
!address->FromSockAddr(storage.addr, storage.addr_len)) {
result = ERR_ADDRESS_INVALID;
} else {
result = bytes_transferred;
Expand Down Expand Up @@ -844,10 +852,14 @@ int UDPSocketPosix::SetMulticastOptions() {
if (rv < 0)
return MapSystemError(errno);
}
#if defined(STARBOARD)
#if defined(IP_DEFAULT_MULTICAST_TTL)
if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
#elif defined(IP_MULTICAST_TTL)
if (multicast_time_to_live_ != IP_MULTICAST_TTL) {
#endif
#else
if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
#endif
int rv;
if (addr_family_ == AF_INET) {
Expand Down
Loading
Loading