diff --git a/modules/affile/logproto-file-writer.c b/modules/affile/logproto-file-writer.c index 8af0ed490..5662015f7 100644 --- a/modules/affile/logproto-file-writer.c +++ b/modules/affile/logproto-file-writer.c @@ -1,6 +1,8 @@ /* * Copyright (c) 2002-2012 Balabit + * Copyright (c) 2024 Axoflow * Copyright (c) 1998-2012 Balázs Scheidler + * Copyright (c) 2024 László Várady * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 as published @@ -38,11 +40,92 @@ typedef struct _LogProtoFileWriter gint buf_size; gint buf_count; gint fd; - gint sum_len; + gsize sum_len; gboolean fsync; struct iovec buffer[0]; } LogProtoFileWriter; +static inline gboolean +_flush_partial(LogProtoFileWriter *self, LogProtoStatus *status) +{ + /* there is still some data from the previous file writing process */ + + gint len = self->partial_len - self->partial_pos; + gssize rc = log_transport_write(self->super.transport, self->partial + self->partial_pos, len); + + if (rc > 0 && self->fsync) + fsync(self->fd); + + if (rc < 0) + { + if (errno == EINTR || errno == EAGAIN) + { + *status = LPS_SUCCESS; + return FALSE; + } + + log_proto_client_msg_rewind(&self->super); + msg_error("I/O error occurred while writing", + evt_tag_int("fd", self->super.transport->fd), + evt_tag_error(EVT_TAG_OSERROR)); + + *status = LPS_ERROR; + return FALSE; + } + + if (rc != len) + { + self->partial_pos += rc; + *status = LPS_PARTIAL; + return FALSE; + } + + log_proto_client_msg_ack(&self->super, self->partial_messages); + g_free(self->partial); + self->partial = NULL; + self->partial_messages = 0; + return TRUE; +} + +static inline void +_process_partial_write(LogProtoFileWriter *self, gsize written) +{ + /* partial success: not everything has been written out */ + + /* look for the first chunk that has been cut */ + gsize sum = self->buffer[0].iov_len; /* sum is the cumulated length of the already processed items */ + gint i = 0; + while (written > sum) + sum += self->buffer[++i].iov_len; + + gsize first_non_written_msg_chunk_len = sum - written; + self->partial_len = first_non_written_msg_chunk_len; + gint first_non_written_chunk_index = i; + ++i; + + /* add the lengths of the following messages */ + while (i < self->buf_count) + self->partial_len += self->buffer[i++].iov_len; + + /* allocate and copy the remaining data */ + self->partial = (guchar *)g_malloc(self->partial_len); + gsize ofs = first_non_written_msg_chunk_len; + gsize pos = self->buffer[first_non_written_chunk_index].iov_len - ofs; + memcpy(self->partial, (guchar *) self->buffer[first_non_written_chunk_index].iov_base + pos, ofs); + i = first_non_written_chunk_index + 1; + while (i < self->buf_count) + { + memcpy(self->partial + ofs, self->buffer[i].iov_base, self->buffer[i].iov_len); + ofs += self->buffer[i].iov_len; + ++i; + } + + self->partial_pos = 0; + self->partial_messages = self->buf_count - first_non_written_chunk_index; + + log_proto_client_msg_ack(&self->super, self->buf_count - self->partial_messages); +} + /* * log_proto_file_writer_flush: * @@ -55,90 +138,28 @@ static LogProtoStatus log_proto_file_writer_flush(LogProtoClient *s) { LogProtoFileWriter *self = (LogProtoFileWriter *)s; - gint rc, i, i0, sum, ofs, pos; if (self->partial) { - /* there is still some data from the previous file writing process */ - gint len = self->partial_len - self->partial_pos; - - rc = log_transport_write(self->super.transport, self->partial + self->partial_pos, len); - if (rc > 0 && self->fsync) - fsync(self->fd); - if (rc < 0) - { - goto write_error; - } - else if (rc != len) - { - self->partial_pos += rc; - return LPS_PARTIAL; - } - else - { - log_proto_client_msg_ack(&self->super, self->partial_messages); - g_free(self->partial); - self->partial = NULL; - } + LogProtoStatus partial_flush_status; + if (!_flush_partial(self, &partial_flush_status)) + return partial_flush_status; } /* we might be called from log_writer_deinit() without having a buffer at all */ if (self->buf_count == 0) return LPS_SUCCESS; - rc = log_transport_writev(self->super.transport, self->buffer, self->buf_count); + gssize rc = log_transport_writev(self->super.transport, self->buffer, self->buf_count); + if (rc > 0 && self->fsync) fsync(self->fd); if (rc < 0) { - goto write_error; - } - else if (rc != self->sum_len) - { - /* partial success: not everything has been written out */ - /* look for the first chunk that has been cut */ - sum = self->buffer[0].iov_len; /* sum is the cumulated length of the already processed items */ - i = 0; - while (rc > sum) - sum += self->buffer[++i].iov_len; - self->partial_len = sum - rc; /* this is the length of the first non-written chunk */ - i0 = i; - ++i; - /* add the lengths of the following messages */ - while (i < self->buf_count) - self->partial_len += self->buffer[i++].iov_len; - /* allocate and copy the remaining data */ - self->partial = (guchar *)g_malloc(self->partial_len); - ofs = sum - rc; /* the length of the remaining (not processed) chunk in the first message */ - pos = self->buffer[i0].iov_len - ofs; - memcpy(self->partial, (guchar *) self->buffer[i0].iov_base + pos, ofs); - i = i0 + 1; - while (i < self->buf_count) - { - memcpy(self->partial + ofs, self->buffer[i].iov_base, self->buffer[i].iov_len); - ofs += self->buffer[i].iov_len; - ++i; - } - self->partial_pos = 0; - self->partial_messages = self->buf_count - i0; - } - else - { - log_proto_client_msg_ack(&self->super, self->buf_count); - } + if (errno == EINTR || errno == EAGAIN) + return LPS_SUCCESS; - /* free the previous message strings (the remaining part has been copied to the partial buffer) */ - for (i = 0; i < self->buf_count; ++i) - g_free(self->buffer[i].iov_base); - self->buf_count = 0; - self->sum_len = 0; - - return LPS_SUCCESS; - -write_error: - if (errno != EINTR && errno != EAGAIN) - { log_proto_client_msg_rewind(&self->super); msg_error("I/O error occurred while writing", evt_tag_int("fd", self->super.transport->fd), @@ -146,8 +167,18 @@ log_proto_file_writer_flush(LogProtoClient *s) return LPS_ERROR; } - return LPS_SUCCESS; + if (rc != self->sum_len) + _process_partial_write(self, rc); + else + log_proto_client_msg_ack(&self->super, self->buf_count); + + /* free the previous message strings (the remaining part has been copied to the partial buffer) */ + for (gint i = 0; i < self->buf_count; ++i) + g_free(self->buffer[i].iov_base); + self->buf_count = 0; + self->sum_len = 0; + return LPS_SUCCESS; } /* diff --git a/news/bugfix-303.md b/news/bugfix-303.md new file mode 100644 index 000000000..53ea8f9f6 --- /dev/null +++ b/news/bugfix-303.md @@ -0,0 +1,8 @@ +`file()`, `stdout()`: fix log sources getting stuck + +Due to an acknowledgment bug in the `file()` and `stdout()` destinations, +sources routed to those destinations may have gotten stuck as they were +flow-controlled incorrectly. + +This issue occured only in extremely rare cases with regular files, but it +occured frequently with `/dev/stderr` and other slow pseudo-devices.