-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[not-fixup] - Reduce memory consumption for Buffered File Writer #2377
Changes from 29 commits
5e5a57a
3b21ea3
9aaf0c8
f7f890a
1edc0a3
d530798
ea8bc7f
3bc4482
510ce03
f6f1f44
7659980
b0dbbbf
96ef60a
a07c39b
e41cced
8e3d56c
3e67392
d8e5589
3894900
999d662
54c8a65
57a2305
faa423b
c8b4b02
8190669
c1cf67c
2e74a4c
a2d23ef
10532d5
f1bd25d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,12 +13,60 @@ import ( | |
"github.com/trufflesecurity/trufflehog/v3/pkg/context" | ||
) | ||
|
||
// bufferPool is used to store buffers for reuse. | ||
var bufferPool = sync.Pool{ | ||
// TODO: Consider growing the buffer before returning it if we can find an optimal size. | ||
// Ideally the size would cover the majority of cases without being too large. | ||
// This would avoid the need to grow the buffer when writing to it, reducing allocations. | ||
New: func() any { return new(bytes.Buffer) }, | ||
type bufPoolOpt func(pool *bufferPool) | ||
|
||
type bufferPool struct { | ||
bufferSize uint32 | ||
*sync.Pool | ||
} | ||
|
||
const defaultBufferSize = 2 << 10 // 2KB | ||
func newBufferPool(opts ...bufPoolOpt) *bufferPool { | ||
pool := &bufferPool{bufferSize: defaultBufferSize} | ||
|
||
for _, opt := range opts { | ||
opt(pool) | ||
} | ||
pool.Pool = &sync.Pool{ | ||
New: func() any { | ||
buf := new(bytes.Buffer) | ||
buf.Grow(int(pool.bufferSize)) | ||
return buf | ||
}, | ||
} | ||
|
||
return pool | ||
} | ||
|
||
// sharedBufferPool is the shared buffer pool used by all BufferedFileWriters. | ||
// This allows for efficient reuse of buffers across multiple writers. | ||
var sharedBufferPool *bufferPool | ||
|
||
func init() { sharedBufferPool = newBufferPool() } | ||
|
||
func (bp *bufferPool) get(ctx context.Context) *bytes.Buffer { | ||
buf, ok := bp.Pool.Get().(*bytes.Buffer) | ||
if !ok { | ||
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") | ||
buf = bytes.NewBuffer(make([]byte, 0, bp.bufferSize)) | ||
} | ||
|
||
return buf | ||
} | ||
|
||
func (bp *bufferPool) put(buf *bytes.Buffer) { | ||
// If the buffer is more than twice the default size, replace it with a new, smaller one. | ||
// This prevents us from returning very large buffers to the pool. | ||
const maxAllowedCapacity = 2 * defaultBufferSize | ||
if buf.Cap() > maxAllowedCapacity { | ||
// Replace the buffer with a new, smaller one. No need to copy data since we're resetting it. | ||
buf = bytes.NewBuffer(make([]byte, 0, defaultBufferSize)) | ||
ahrav marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} else { | ||
// Reset the buffer to clear any existing data. | ||
buf.Reset() | ||
} | ||
|
||
bp.Put(buf) | ||
} | ||
|
||
// state represents the current mode of BufferedFileWriter. | ||
|
@@ -39,7 +87,8 @@ type BufferedFileWriter struct { | |
|
||
state state // Current state of the writer. (writeOnly or readOnly) | ||
|
||
buf bytes.Buffer // Buffer for storing data under the threshold in memory. | ||
bufPool *bufferPool // Pool for storing buffers for reuse. | ||
buf *bytes.Buffer // Buffer for storing data under the threshold in memory. | ||
filename string // Name of the temporary file. | ||
file io.WriteCloser // File for storing data over the threshold. | ||
} | ||
|
@@ -55,7 +104,11 @@ func WithThreshold(threshold uint64) Option { | |
// New creates a new BufferedFileWriter with the given options. | ||
func New(opts ...Option) *BufferedFileWriter { | ||
const defaultThreshold = 10 * 1024 * 1024 // 10MB | ||
w := &BufferedFileWriter{threshold: defaultThreshold, state: writeOnly} | ||
w := &BufferedFileWriter{ | ||
threshold: defaultThreshold, | ||
state: writeOnly, | ||
bufPool: sharedBufferPool, | ||
} | ||
for _, opt := range opts { | ||
opt(w) | ||
} | ||
|
@@ -78,17 +131,16 @@ func (w *BufferedFileWriter) String() (string, error) { | |
} | ||
defer file.Close() | ||
|
||
// Create a buffer large enough to hold file data and additional buffer data, if any. | ||
fileSize := w.size | ||
buf := bytes.NewBuffer(make([]byte, 0, fileSize)) | ||
|
||
var buf bytes.Buffer | ||
// Read the file contents into the buffer. | ||
if _, err := io.Copy(buf, file); err != nil { | ||
if _, err := io.CopyBuffer(&buf, file, nil); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the effect of this change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should be identical exepct we use
|
||
return "", fmt.Errorf("failed to read file contents: %w", err) | ||
} | ||
|
||
// Append buffer data, if any, to the end of the file contents. | ||
buf.Write(w.buf.Bytes()) | ||
if _, err := w.buf.WriteTo(&buf); err != nil { | ||
return "", err | ||
} | ||
|
||
return buf.String(), nil | ||
} | ||
|
@@ -100,33 +152,44 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error | |
} | ||
|
||
size := uint64(len(data)) | ||
|
||
if w.buf == nil || w.buf.Len() == 0 { | ||
w.buf = w.bufPool.get(ctx) | ||
} | ||
|
||
bufferLength := w.buf.Len() | ||
|
||
defer func() { | ||
w.size += size | ||
ctx.Logger().V(4).Info( | ||
"write complete", | ||
"data_size", size, | ||
"content_size", w.buf.Len(), | ||
"content_size", bufferLength, | ||
"total_size", w.size, | ||
) | ||
}() | ||
|
||
if w.buf.Len() == 0 { | ||
bufPtr, ok := bufferPool.Get().(*bytes.Buffer) | ||
if !ok { | ||
ctx.Logger().Error(fmt.Errorf("buffer pool returned unexpected type"), "using new buffer") | ||
bufPtr = new(bytes.Buffer) | ||
} | ||
bufPtr.Reset() // Reset the buffer to clear any existing data | ||
w.buf = *bufPtr | ||
} | ||
|
||
if uint64(w.buf.Len())+size <= w.threshold { | ||
totalSizeNeeded := uint64(bufferLength) + uint64(len(data)) | ||
if totalSizeNeeded <= w.threshold { | ||
// If the total size is within the threshold, write to the buffer. | ||
ctx.Logger().V(4).Info( | ||
"writing to buffer", | ||
"data_size", size, | ||
"content_size", w.buf.Len(), | ||
"content_size", bufferLength, | ||
) | ||
|
||
availableSpace := w.buf.Cap() - bufferLength | ||
growSize := int(totalSizeNeeded) - bufferLength | ||
Comment on lines
+180
to
+181
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That seems wrong, or I'm misunderstanding what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had to read this part like four times as well. I was going to complain until I noticed that that the old code was equally confusing so I decided to just blame the buffer interface. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea I am ashamed to say I spent WAY too long trying to fully grok the buffer interface... there are definitely some gotchas that took me way way too long to comprehend. 😞 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't be ashamed! Intuitive API design is difficult and sometimes impossible. Some stuff is just hard to understand. |
||
if growSize > availableSpace { | ||
ctx.Logger().V(4).Info( | ||
"buffer size exceeded, growing buffer", | ||
"current_size", bufferLength, | ||
"new_size", totalSizeNeeded, | ||
"available_space", availableSpace, | ||
"grow_size", growSize, | ||
) | ||
} | ||
|
||
return w.buf.Write(data) | ||
} | ||
|
||
|
@@ -143,14 +206,12 @@ func (w *BufferedFileWriter) Write(ctx context.Context, data []byte) (int, error | |
|
||
// Transfer existing data in buffer to the file, then clear the buffer. | ||
ahrav marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// This ensures all the data is in one place - either entirely in the buffer or the file. | ||
if w.buf.Len() > 0 { | ||
ctx.Logger().V(4).Info("writing buffer to file", "content_size", w.buf.Len()) | ||
if _, err := w.file.Write(w.buf.Bytes()); err != nil { | ||
if bufferLength > 0 { | ||
ctx.Logger().V(4).Info("writing buffer to file", "content_size", bufferLength) | ||
if _, err := w.buf.WriteTo(w.file); err != nil { | ||
return 0, err | ||
} | ||
// Reset the buffer to clear any existing data and return it to the pool. | ||
w.buf.Reset() | ||
bufferPool.Put(&w.buf) | ||
w.bufPool.put(w.buf) | ||
} | ||
} | ||
ctx.Logger().V(4).Info("writing to file", "data_size", size) | ||
|
@@ -167,7 +228,7 @@ func (w *BufferedFileWriter) CloseForWriting() error { | |
} | ||
|
||
if w.buf.Len() > 0 { | ||
_, err := w.file.Write(w.buf.Bytes()) | ||
_, err := w.buf.WriteTo(w.file) | ||
rosecodym marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return err | ||
} | ||
|
@@ -199,7 +260,7 @@ func (w *BufferedFileWriter) ReadCloser() (io.ReadCloser, error) { | |
// Data is in memory. | ||
return &bufferReadCloser{ | ||
Reader: bytes.NewReader(w.buf.Bytes()), | ||
onClose: func() { bufferPool.Put(&w.buf) }, | ||
onClose: func() { w.bufPool.put(w.buf) }, | ||
}, nil | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small cleanup here.