Skip to content

Commit

Permalink
ARROW-1618: Reduce Heap Usage (Phase 1)
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharthteotia committed Sep 28, 2017
1 parent 1b8cabd commit 5025d5c
Showing 1 changed file with 37 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
return existingLedger;
}

final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
final BufferLedger ledger = new BufferLedger(allocator);
if (retain) {
ledger.inc();
}
Expand All @@ -151,54 +151,41 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
}
}


/**
* The way that a particular BufferLedger communicates back to the AllocationManager that it
* now longer needs to hold
* a reference to particular piece of memory.
* Can only be called when you already hold the writeLock.
*/
private class ReleaseListener {

private final BufferAllocator allocator;

public ReleaseListener(BufferAllocator allocator) {
this.allocator = allocator;
}

/**
* Can only be called when you already hold the writeLock.
*/
public void release() {
allocator.assertOpen();
private void release(final BufferLedger ledger) {
final BaseAllocator allocator = ledger.getAllocator();
allocator.assertOpen();

final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);
final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);

if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
BufferLedger newLedger = map.values().iterator().next();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
BufferLedger newLedger = map.values().iterator().next();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}


}
}

Expand All @@ -221,16 +208,22 @@ public class BufferLedger {
// correctly
private final long lCreationTime = System.nanoTime();
private final BaseAllocator allocator;
private final ReleaseListener listener;
private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog
(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1)
: null;
private volatile long lDestructionTime = 0;

private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
private BufferLedger(BaseAllocator allocator) {
this.allocator = allocator;
this.listener = listener;
}

/**
* Get the allocator for this ledger
* @return allocator
*/
private BaseAllocator getAllocator() {
return allocator;
}

/**
Expand Down Expand Up @@ -339,7 +332,7 @@ public int decrement(int decrement) {
outcome = bufRefCnt.addAndGet(-decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
listener.release();
release(this);
}
}

Expand Down

0 comments on commit 5025d5c

Please sign in to comment.