Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-1627: New class to handle collection of BufferLedger(s) within … #4

Open
wants to merge 10 commits into
base: dremio
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class AllocationManager {
private final long allocatorManagerId = MANAGER_ID_GENERATOR.incrementAndGet();
private final int size;
private final UnsafeDirectLittleEndian underlying;
private final IdentityHashMap<BufferAllocator, BufferLedger> map = new IdentityHashMap<>();
private final LowCostIdentityHasMap<BufferLedger> map = new LowCostIdentityHasMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final AutoCloseableLock readLock = new AutoCloseableLock(lock.readLock());
private final AutoCloseableLock writeLock = new AutoCloseableLock(lock.writeLock());
Expand Down Expand Up @@ -140,65 +140,52 @@ private BufferLedger associate(final BaseAllocator allocator, final boolean reta
return existingLedger;
}

final BufferLedger ledger = new BufferLedger(allocator, new ReleaseListener(allocator));
final BufferLedger ledger = new BufferLedger(allocator);
if (retain) {
ledger.inc();
}
BufferLedger oldLedger = map.put(allocator, ledger);
BufferLedger oldLedger = map.put(ledger);
Preconditions.checkArgument(oldLedger == null);
allocator.associateLedger(ledger);
return ledger;
}
}


/**
* The way that a particular BufferLedger communicates back to the AllocationManager that it
* now longer needs to hold
* a reference to particular piece of memory.
* Can only be called when you already hold the writeLock.
*/
private class ReleaseListener {

private final BufferAllocator allocator;

public ReleaseListener(BufferAllocator allocator) {
this.allocator = allocator;
}

/**
* Can only be called when you already hold the writeLock.
*/
public void release() {
allocator.assertOpen();
private void release(final BufferLedger ledger) {
final BaseAllocator allocator = ledger.getAllocator();
allocator.assertOpen();

final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);
final BufferLedger oldLedger = map.remove(allocator);
oldLedger.allocator.dissociateLedger(oldLedger);

if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
BufferLedger newLedger = map.values().iterator().next();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
if (oldLedger == owningLedger) {
if (map.isEmpty()) {
// no one else owns, lets release.
oldLedger.allocator.releaseBytes(size);
underlying.release();
amDestructionTime = System.nanoTime();
owningLedger = null;
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}
// we need to change the owning allocator. we've been removed so we'll get whatever is
// top of list
BufferLedger newLedger = map.getNextValue();

// we'll forcefully transfer the ownership and not worry about whether we exceeded the
// limit
// since this consumer can't do anything with this.
oldLedger.transferBalance(newLedger);
}
} else {
if (map.isEmpty()) {
throw new IllegalStateException("The final removal of a ledger should be connected to " +
"the owning ledger.");
}


}
}

Expand All @@ -209,7 +196,7 @@ public void release() {
* As with AllocationManager, the only reason this is public is due to ArrowBuf being in io
* .netty.buffer package.
*/
public class BufferLedger {
public class BufferLedger implements ValueWithKeyIncluded {

private final IdentityHashMap<ArrowBuf, Object> buffers =
BaseAllocator.DEBUG ? new IdentityHashMap<ArrowBuf, Object>() : null;
Expand All @@ -221,16 +208,27 @@ public class BufferLedger {
// correctly
private final long lCreationTime = System.nanoTime();
private final BaseAllocator allocator;
private final ReleaseListener listener;
private final HistoricalLog historicalLog = BaseAllocator.DEBUG ? new HistoricalLog
(BaseAllocator.DEBUG_LOG_LENGTH,
"BufferLedger[%d]", 1)
: null;
private volatile long lDestructionTime = 0;

private BufferLedger(BaseAllocator allocator, ReleaseListener listener) {
private BufferLedger(BaseAllocator allocator) {
this.allocator = allocator;
this.listener = listener;
}

/**
* Get the allocator for this ledger
* @return allocator
*/
private BaseAllocator getAllocator() {
return allocator;
}

@Override
public Object getKey() {
return allocator;
}

/**
Expand Down Expand Up @@ -339,7 +337,7 @@ public int decrement(int decrement) {
outcome = bufRefCnt.addAndGet(-decrement);
if (outcome == 0) {
lDestructionTime = System.nanoTime();
listener.release();
release(this);
}
}

Expand Down Expand Up @@ -463,4 +461,4 @@ boolean isOwningLedger() {

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private static String createErrorMsg(final BufferAllocator allocator, final int
* @param val An integer value.
* @return The closest power of two of that value.
*/
static int nextPowerOfTwo(int val) {
public static int nextPowerOfTwo(int val) {
int highestBit = Integer.highestOneBit(val);
if (highestBit == val) {
return val;
Expand All @@ -142,6 +142,21 @@ static int nextPowerOfTwo(int val) {
}
}

/**
* Rounds up the provided value to the nearest power of two.
*
* @param val A long value.
* @return The closest power of two of that value.
*/
public static long nextPowerOfTwo(long val) {
long highestBit = Long.highestOneBit(val);
if (highestBit == val) {
return val;
} else {
return highestBit << 1;
}
}

public static StringBuilder indent(StringBuilder sb, int indent) {
final char[] indentation = new char[indent * 2];
Arrays.fill(indentation, ' ');
Expand Down
Loading