Skip to content

Commit

Permalink
Restore recvmsg() call
Browse files Browse the repository at this point in the history
Change (PR3904) replaced recvmsg() with recvfrom unnecessarily. This change to restore the recvmsg().

b/302741384
  • Loading branch information
maxz-lab committed Oct 18, 2024
1 parent 0a21943 commit 1e5af93
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 16 deletions.
1 change: 0 additions & 1 deletion net/base/address_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ NET_EXPORT SbSocketAddressType ConvertAddressFamily(
NET_EXPORT int ConvertAddressFamily(AddressFamily address_family);
#endif


// Maps AF_INET, AF_INET6 or AF_UNSPEC to an AddressFamily.
NET_EXPORT AddressFamily ToAddressFamily(int family);

Expand Down
72 changes: 68 additions & 4 deletions net/socket/socket_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
namespace net {

#if SB_API_VERSION >= 16 && !defined(_MSC_VER)

namespace {

int MapAcceptError(int os_error) {
Expand Down Expand Up @@ -72,7 +71,13 @@ int MapConnectError(int os_error) {

SocketPosix::SocketPosix()
: socket_fd_(kInvalidSocket),
#if defined(STARBOARD)
socket_watcher_(FROM_HERE) {}
#else
accept_socket_watcher_(FROM_HERE),
read_socket_watcher_(FROM_HERE),
write_socket_watcher_(FROM_HERE) {}
#endif

SocketPosix::~SocketPosix() {
Close();
Expand Down Expand Up @@ -179,7 +184,11 @@ int SocketPosix::Accept(std::unique_ptr<SocketPosix>* socket,

if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_fd_, true, base::MessagePumpForIO::WATCH_READ,
#if defined(STARBOARD)
&socket_watcher_, this)) {
#else
&accept_socket_watcher_, this)) {
#endif
PLOG(ERROR) << "WatchFileDescriptor failed on accept";
return MapSystemError(errno);
}
Expand All @@ -204,7 +213,11 @@ int SocketPosix::Connect(const SockaddrStorage& address,

if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_fd_, true, base::MessagePumpForIO::WATCH_WRITE,
#if defined(STARBOARD)
&socket_watcher_, this)) {
#else
&write_socket_watcher_, this)) {
#endif
PLOG(ERROR) << "WatchFileDescriptor failed on connect";
return MapSystemError(errno);
}
Expand All @@ -224,7 +237,11 @@ int SocketPosix::Connect(const SockaddrStorage& address,

rv = MapConnectError(errno);
if (rv != OK && rv != ERR_IO_PENDING) {
#if defined(STARBOARD)
ClearWatcherIfOperationsNotPending();
#else
write_socket_watcher_.StopWatchingFileDescriptor();
#endif
return rv;
}

Expand Down Expand Up @@ -300,7 +317,11 @@ int SocketPosix::ReadIfReady(IOBuffer* buf,

if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_fd_, true, base::MessagePumpForIO::WATCH_READ,
#if defined(STARBOARD)
&socket_watcher_, this)) {
#else
&read_socket_watcher_, this)) {
#endif
PLOG(ERROR) << "WatchFileDescriptor failed on read";
return MapSystemError(errno);
}
Expand All @@ -312,7 +333,11 @@ int SocketPosix::ReadIfReady(IOBuffer* buf,
int SocketPosix::CancelReadIfReady() {
DCHECK(read_if_ready_callback_);

#if defined(STARBOARD)
bool ok = ClearWatcherIfOperationsNotPending();
#else
bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);

read_if_ready_callback_.Reset();
Expand Down Expand Up @@ -350,7 +375,11 @@ int SocketPosix::WaitForWrite(IOBuffer* buf,

if (!base::CurrentIOThread::Get()->WatchFileDescriptor(
socket_fd_, true, base::MessagePumpForIO::WATCH_WRITE,
#if defined(STARBOARD)
&socket_watcher_, this)) {
#else
&write_socket_watcher_, this)) {
#endif
PLOG(ERROR) << "WatchFileDescriptor failed on write";
return MapSystemError(errno);
}
Expand Down Expand Up @@ -414,7 +443,12 @@ void SocketPosix::OnFileCanReadWithoutBlocking(int fd) {
"SocketPosix::OnFileCanReadWithoutBlocking");
if (!accept_callback_.is_null()) {
AcceptCompleted();
} else if (!read_if_ready_callback_.is_null()){
#if defined(STARBOARD)
} else if (!read_if_ready_callback_.is_null()) {
#else
} else {
DCHECK(!read_if_ready_callback_.is_null());
#endif
ReadCompleted();
}
}
Expand Down Expand Up @@ -451,7 +485,11 @@ void SocketPosix::AcceptCompleted() {
if (rv == ERR_IO_PENDING)
return;

#if defined(STARBOARD)
bool ok = ClearWatcherIfOperationsNotPending();
#else
bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);
accept_socket_ = nullptr;
std::move(accept_callback_).Run(rv);
Expand All @@ -478,14 +516,22 @@ void SocketPosix::ConnectCompleted() {
if (rv == ERR_IO_PENDING)
return;

#if defined(STARBOARD)
bool ok = socket_watcher_.StopWatchingFileDescriptor();
#else
bool ok = write_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);
waiting_connect_ = false;
std::move(write_callback_).Run(rv);
}

int SocketPosix::DoRead(IOBuffer* buf, int buf_len) {
#if defined(STARBOARD)
int rv = HANDLE_EINTR(recv(socket_fd_, buf->data(), buf_len, 0));
#else
int rv = HANDLE_EINTR(read(socket_fd_, buf->data(), buf_len, 0));
#endif
return rv >= 0 ? rv : MapSystemError(errno);
}

Expand All @@ -509,7 +555,11 @@ void SocketPosix::RetryRead(int rv) {
void SocketPosix::ReadCompleted() {
DCHECK(read_if_ready_callback_);

#if defined(STARBOARD)
bool ok = socket_watcher_.StopWatchingFileDescriptor();
#else
bool ok = read_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);
std::move(read_if_ready_callback_).Run(OK);
}
Expand All @@ -529,8 +579,7 @@ int SocketPosix::DoWrite(IOBuffer* buf, int buf_len) {
int rv = HANDLE_EINTR(send(socket_fd_, buf->data(), buf_len, kSendFlags));
#else
int rv = HANDLE_EINTR(write(socket_fd_, buf->data(), buf_len));
#endif // BUILDFLAG(IS_LINUX) || BUILDFLAG(IS_CHROMEOS) || BUILDFLAG(IS_ANDROID) ||\
// defined(STARBOARD)
#endif
return rv >= 0 ? rv : MapSystemError(errno);
}

Expand All @@ -539,24 +588,39 @@ void SocketPosix::WriteCompleted() {
if (rv == ERR_IO_PENDING)
return;

#if defined(STARBOARD)
bool ok = ClearWatcherIfOperationsNotPending();
#else
bool ok = write_socket_watcher_.StopWatchingFileDescriptor();
#endif
DCHECK(ok);
write_buf_.reset();
write_buf_len_ = 0;
std::move(write_callback_).Run(rv);
}

#if defined(STARBOARD)
bool SocketPosix::ClearWatcherIfOperationsNotPending() {
bool ok = true;
if (!read_pending() && !write_pending() && !accept_pending()) {
ok = socket_watcher_.StopWatchingFileDescriptor();
}
return ok;
}
#endif

void SocketPosix::StopWatchingAndCleanUp(bool close_socket) {
#if defined(STARBOARD)
bool ok = socket_watcher_.StopWatchingFileDescriptor();
DCHECK(ok);
#else
bool ok = accept_socket_watcher_.StopWatchingFileDescriptor();
DCHECK(ok);
ok = read_socket_watcher_.StopWatchingFileDescriptor();
DCHECK(ok);
ok = write_socket_watcher_.StopWatchingFileDescriptor();
DCHECK(ok);
#endif

// These needs to be done after the StopWatchingFileDescriptor() calls, but
// before deleting the write buffer.
Expand Down
20 changes: 19 additions & 1 deletion net/socket/socket_posix.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ struct SockaddrStorage;
// Socket class to provide asynchronous read/write operations on top of the
// posix socket api. It supports AF_INET, AF_INET6, and AF_UNIX addresses.
class NET_EXPORT_PRIVATE SocketPosix
#if defined(STARBOARD)
: public base::MessagePumpForIO::Watcher {
#else
: public base::MessagePumpForIO::FdWatcher {
#endif
public:
SocketPosix();

Expand Down Expand Up @@ -109,7 +113,7 @@ class NET_EXPORT_PRIVATE SocketPosix
SocketDescriptor socket_fd() const { return socket_fd_; }

private:
// base::MessagePumpForIO::Watcher methods.
// base::MessagePumpForIO::FdWatcher methods.
void OnFileCanReadWithoutBlocking(int fd) override;
void OnFileCanWriteWithoutBlocking(int fd) override;

Expand All @@ -126,22 +130,32 @@ class NET_EXPORT_PRIVATE SocketPosix
int DoWrite(IOBuffer* buf, int buf_len);
void WriteCompleted();

#if defined(STARBOARD)
bool read_pending() const { return !read_if_ready_callback_.is_null(); }
bool write_pending() const {
return !write_callback_.is_null() && !waiting_connect_;
}
bool accept_pending() const { return !accept_callback_.is_null(); }

bool ClearWatcherIfOperationsNotPending();
#endif

// |close_socket| indicates whether the socket should also be closed.
void StopWatchingAndCleanUp(bool close_socket);

SocketDescriptor socket_fd_;

#if !defined(STARBOARD)
base::MessagePumpForIO::FdWatchController accept_socket_watcher_;
#endif

raw_ptr<std::unique_ptr<SocketPosix>> accept_socket_;
CompletionOnceCallback accept_callback_;

#if !defined(STARBOARD)
base::MessagePumpForIO::FdWatchController read_socket_watcher_;
#endif

// Non-null when a Read() is in progress.
scoped_refptr<IOBuffer> read_buf_;
int read_buf_len_ = 0;
Expand All @@ -150,7 +164,11 @@ class NET_EXPORT_PRIVATE SocketPosix
// Non-null when a ReadIfReady() is in progress.
CompletionOnceCallback read_if_ready_callback_;

#if defined(STARBOARD)
base::MessagePumpForIO::SocketWatcher socket_watcher_;
#else
base::MessagePumpForIO::FdWatchController write_socket_watcher_;
#endif
scoped_refptr<IOBuffer> write_buf_;
int write_buf_len_ = 0;
// External callback; called when write or connect is complete.
Expand Down
5 changes: 5 additions & 0 deletions net/socket/tcp_socket_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,16 @@ int TCPSocketPosix::Accept(std::unique_ptr<TCPSocketPosix>* tcp_socket,
CompletionOnceCallback callback) {
DCHECK(tcp_socket);
DCHECK(!callback.is_null());
#if !defined(STARBOARD)
DCHECK(socket_);
#endif
DCHECK(!accept_socket_);

#if defined(STARBOARD)
if ((!socket_)) {
return MapSystemError(errno);
}
#endif

net_log_.BeginEvent(NetLogEventType::TCP_ACCEPT);

Expand Down
32 changes: 22 additions & 10 deletions net/socket/udp_socket_posix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -575,13 +575,13 @@ int UDPSocketPosix::SetDoNotFragment() {
}

void UDPSocketPosix::SetMsgConfirm(bool confirm) {
#if !BUILDFLAG(IS_APPLE) && defined(MSG_CONFIRM)
#if !BUILDFLAG(IS_APPLE)
if (confirm) {
sendto_flags_ |= MSG_CONFIRM;
} else {
sendto_flags_ &= ~MSG_CONFIRM;
}
#endif // !BUILDFLAG(IS_APPLE) && defined(MSG_CONFIRM)
#endif // !BUILDFLAG(IS_APPLE)
}

int UDPSocketPosix::AllowAddressReuse() {
Expand Down Expand Up @@ -779,23 +779,31 @@ int UDPSocketPosix::InternalRecvFromNonConnectedSocket(IOBuffer* buf,
int buf_len,
IPEndPoint* address) {
SockaddrStorage storage;
int result = -1;
int bytes_transferred = -1;
bytes_transferred = HANDLE_EINTR(recvfrom(socket_, buf->data(), static_cast<size_t>(buf_len),
0, storage.addr, &storage.addr_len));
struct iovec iov = {
.iov_base = buf->data(),
.iov_len = static_cast<size_t>(buf_len),
};
struct msghdr msg = {
.msg_name = storage.addr,
.msg_namelen = storage.addr_len,
.msg_iov = &iov,
.msg_iovlen = 1,
};
int result;
int bytes_transferred = HANDLE_EINTR(recvmsg(socket_, &msg, 0));
if (bytes_transferred < 0) {
result = MapSystemError(errno);
if (result == ERR_IO_PENDING) {
return result;
}
} else {
if (bytes_transferred == buf_len) {
storage.addr_len = msg.msg_namelen;
if (msg.msg_flags & MSG_TRUNC) {
// NB: recvfrom(..., MSG_TRUNC, ...) would be a simpler way to do this on
// Linux, but isn't supported by POSIX.
// When received data size == buffer size, it means the buffer isn't big enough,
// i.e. truncated.
result = ERR_MSG_TOO_BIG;
} else if (address && !address->FromSockAddr(storage.addr, storage.addr_len)) {
} else if (address &&
!address->FromSockAddr(storage.addr, storage.addr_len)) {
result = ERR_ADDRESS_INVALID;
} else {
result = bytes_transferred;
Expand Down Expand Up @@ -846,10 +854,14 @@ int UDPSocketPosix::SetMulticastOptions() {
if (rv < 0)
return MapSystemError(errno);
}
#if defined(STARBOARD)
#if defined(IP_DEFAULT_MULTICAST_TTL)
if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
#elif defined(IP_MULTICAST_TTL)
if (multicast_time_to_live_ != IP_MULTICAST_TTL) {
#endif
#else
if (multicast_time_to_live_ != IP_DEFAULT_MULTICAST_TTL) {
#endif
int rv;
if (addr_family_ == AF_INET) {
Expand Down
Loading

0 comments on commit 1e5af93

Please sign in to comment.