From 5025d5c3aeee36d1a3eb4037443ac9e3d80d24f8 Mon Sep 17 00:00:00 2001 From: siddharth Date: Wed, 27 Sep 2017 13:55:07 -0700 Subject: [PATCH] ARROW-1618: Reduce Heap Usage (Phase 1) --- .../arrow/memory/AllocationManager.java | 81 +++++++++---------- 1 file changed, 37 insertions(+), 44 deletions(-) diff --git a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java index 70ca1dc32a1b3..8bce5e4b9d663 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/AllocationManager.java @@ -140,7 +140,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta return existingLedger; } - final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator)); + final BufferLedger ledger = new BufferLedger(allocator); if (retain) { ledger.inc(); } @@ -151,54 +151,41 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta } } - /** * The way that a particular BufferLedger communicates back to the AllocationManager that it * now longer needs to hold * a reference to particular piece of memory. + * Can only be called when you already hold the writeLock. */ - private class ReleaseListener { - - private final BufferAllocator allocator; - - public ReleaseListener(BufferAllocator allocator) { - this.allocator = allocator; - } - - /** - * Can only be called when you already hold the writeLock. - */ - public void release() { - allocator.assertOpen(); + private void release(final BufferLedger ledger) { + final BaseAllocator allocator = ledger.getAllocator(); + allocator.assertOpen(); - final BufferLedger oldLedger = map.remove(allocator); - oldLedger.allocator.dissociateLedger(oldLedger); + final BufferLedger oldLedger = map.remove(allocator); + oldLedger.allocator.dissociateLedger(oldLedger); - if (oldLedger == owningLedger) { - if (map.isEmpty()) { - // no one else owns, lets release. - oldLedger.allocator.releaseBytes(size); - underlying.release(); - amDestructionTime = System.nanoTime(); - owningLedger = null; - } else { - // we need to change the owning allocator. we've been removed so we'll get whatever is - // top of list - BufferLedger newLedger = map.values().iterator().next(); - - // we'll forcefully transfer the ownership and not worry about whether we exceeded the - // limit - // since this consumer can't do anything with this. - oldLedger.transferBalance(newLedger); - } + if (oldLedger == owningLedger) { + if (map.isEmpty()) { + // no one else owns, lets release. + oldLedger.allocator.releaseBytes(size); + underlying.release(); + amDestructionTime = System.nanoTime(); + owningLedger = null; } else { - if (map.isEmpty()) { - throw new IllegalStateException("The final removal of a ledger should be connected to " + - "the owning ledger."); - } + // we need to change the owning allocator. we've been removed so we'll get whatever is + // top of list + BufferLedger newLedger = map.values().iterator().next(); + + // we'll forcefully transfer the ownership and not worry about whether we exceeded the + // limit + // since this consumer can't do anything with this. + oldLedger.transferBalance(newLedger); + } + } else { + if (map.isEmpty()) { + throw new IllegalStateException("The final removal of a ledger should be connected to " + + "the owning ledger."); } - - } } @@ -221,16 +208,22 @@ public class BufferLedger { // correctly private final long lCreationTime = System.nanoTime(); private final BaseAllocator allocator; - private final ReleaseListener listener; private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog (BaseAllocator.DEBUG_LOG_LENGTH, "BufferLedger[%d]", 1) : null; private volatile long lDestructionTime = 0; - private BufferLedger(BaseAllocator allocator, ReleaseListener listener) { + private BufferLedger(BaseAllocator allocator) { this.allocator = allocator; - this.listener = listener; + } + + /** + * Get the allocator for this ledger + * @return allocator + */ + private BaseAllocator getAllocator() { + return allocator; } /** @@ -339,7 +332,7 @@ public int decrement(int decrement) { outcome = bufRefCnt.addAndGet(-decrement); if (outcome == 0) { lDestructionTime = System.nanoTime(); - listener.release(); + release(this); } }