Skip to content

Commit

Permalink
refactor: BaseChunkInputStreamGenerator can be ReferenceCounted (#5889)
Browse files Browse the repository at this point in the history
Reduces some duplication of code, by sharing the ReferenceCounted logic
for tracking when a CISG type should close its chunks or other managed
resources.

Partial #188
Co-authored-by: Ryan Caudy <[email protected]>
  • Loading branch information
niloc132 committed Aug 7, 2024
1 parent bf5bfef commit 3bd5587
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,32 @@
//
package io.deephaven.extensions.barrage.chunk;

import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.RowSequenceFactory;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.Chunk;
import io.deephaven.util.referencecounting.ReferenceCounted;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public abstract class BaseChunkInputStreamGenerator<T extends Chunk<Values>> implements ChunkInputStreamGenerator {
public abstract class BaseChunkInputStreamGenerator<T extends Chunk<Values>>
extends ReferenceCounted
implements ChunkInputStreamGenerator {

public static final byte[] PADDING_BUFFER = new byte[8];
public static final int REMAINDER_MOD_8_MASK = 0x7;

// Ensure that we clean up chunk only after all copies of the update are released.
private volatile int refCount = 1;

// Field updater for refCount, so we can avoid creating an {@link java.util.concurrent.atomic.AtomicInteger} for
// each instance.
@SuppressWarnings("rawtypes")
protected static final AtomicIntegerFieldUpdater<BaseChunkInputStreamGenerator> REFERENCE_COUNT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(BaseChunkInputStreamGenerator.class, "refCount");

protected final T chunk;
protected final int elementSize;

private final long rowOffset;

BaseChunkInputStreamGenerator(final T chunk, final int elementSize, final long rowOffset) {
super(1);
this.chunk = chunk;
this.elementSize = elementSize;
this.rowOffset = rowOffset;
Expand All @@ -51,10 +46,13 @@ public long getLastRowOffset() {

@Override
public void close() {
if (REFERENCE_COUNT_UPDATER.decrementAndGet(this) == 0) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
decrementReferenceCount();
}

@Override
protected void onReferenceCountAtZero() {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
}

Expand Down Expand Up @@ -87,7 +85,7 @@ abstract class BaseChunkInputStream extends DrainableColumn {
this.options = options;
this.subset = chunk.size() == 0 ? RowSequenceFactory.EMPTY
: subset != null ? subset.copy() : RowSequenceFactory.forRange(0, chunk.size() - 1);
REFERENCE_COUNT_UPDATER.incrementAndGet(BaseChunkInputStreamGenerator.this);
BaseChunkInputStreamGenerator.this.incrementReferenceCount();
// ignore the empty chunk as these are intentionally empty generators that should work for any subset
if (chunk.size() > 0 && this.subset.lastRowKey() >= chunk.size()) {
throw new IllegalStateException(
Expand All @@ -97,7 +95,7 @@ abstract class BaseChunkInputStream extends DrainableColumn {

@Override
public void close() throws IOException {
BaseChunkInputStreamGenerator.this.close();
BaseChunkInputStreamGenerator.this.decrementReferenceCount();
subset.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.deephaven.extensions.barrage.util.StreamReaderOptions;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.datastructures.LongSizedDataStructure;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.util.mutable.MutableInt;
import io.deephaven.util.mutable.MutableLong;
Expand Down Expand Up @@ -214,14 +213,10 @@ private synchronized void computePayload() throws IOException {
}

@Override
public void close() {
if (REFERENCE_COUNT_UPDATER.decrementAndGet(this) == 0) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
if (byteStorage != null) {
byteStorage.close();
}
protected void onReferenceCountAtZero() {
super.onReferenceCountAtZero();
if (byteStorage != null) {
byteStorage.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderSequential;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -60,17 +59,13 @@ private synchronized void computePayload() {
}

@Override
public void close() {
if (REFERENCE_COUNT_UPDATER.decrementAndGet(this) == 0) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
if (offsets != null) {
offsets.close();
}
if (innerGenerator != null) {
innerGenerator.close();
}
protected void onReferenceCountAtZero() {
super.onReferenceCountAtZero();
if (offsets != null) {
offsets.close();
}
if (innerGenerator != null) {
innerGenerator.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.chunk.attributes.ChunkPositions;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.PoolableChunk;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetBuilderSequential;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -60,17 +59,13 @@ private synchronized void computePayload() {
}

@Override
public void close() {
if (REFERENCE_COUNT_UPDATER.decrementAndGet(this) == 0) {
if (chunk instanceof PoolableChunk) {
((PoolableChunk) chunk).close();
}
if (offsets != null) {
offsets.close();
}
if (innerGenerator != null) {
innerGenerator.close();
}
protected void onReferenceCountAtZero() {
super.onReferenceCountAtZero();
if (offsets != null) {
offsets.close();
}
if (innerGenerator != null) {
innerGenerator.close();
}
}

Expand Down

0 comments on commit 3bd5587

Please sign in to comment.