Skip to content

Commit

Permalink
[SYSTEMDS-3597] Fix race conditions in double buffering output streams
Browse files Browse the repository at this point in the history
This patch fixes race conditions in the new double buffering, where
the lambda function is not immediately executed leading to false
orderings of array references. We now pass dedicated write tasks to
the thread pool.
  • Loading branch information
mboehm7 committed Jul 21, 2023
1 parent 791d7f1 commit 0cc90b2
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import org.apache.commons.lang3.concurrent.ConcurrentUtils;

public class DoubleBufferingOutputStream extends FilterOutputStream
{
ExecutorService _pool = CommonThreadPool.get(1); //no outrun
Future<?>[] _locks;
protected ExecutorService _pool = CommonThreadPool.get(1);
protected Future<?>[] _locks;
protected byte[][] _buff;
private int _pos;

Expand Down Expand Up @@ -68,7 +69,7 @@ public void write(byte[] b, int off, int len)
System.arraycopy(b, off, _buff[_pos], 0, len);

//submit write request
_locks[_pos] = _pool.submit(() -> writeBuffer(_buff[_pos], 0, len));
_locks[_pos] = _pool.submit(new WriteTask(_buff[_pos], len));
_pos = (_pos+1) % _buff.length;
}
}
Expand Down Expand Up @@ -105,4 +106,20 @@ public void close() throws IOException {
_pool.shutdown();
super.close();
}

private class WriteTask implements Callable<Object> {
private final byte[] _b;
private final int _len;

protected WriteTask(byte[] buff, int len) {
_b = buff;
_len = len;
}

@Override
public Object call() {
writeBuffer(_b, 0, _len);
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void writeDoubleArray(int len, double[] varr)
long tmp = Double.doubleToRawLongBits(varr[i+j]);
longToBa(tmp, _buff, _count);
_count += 8;
}
}

//flush buffer for current block
flushBuffer(); //based on count
Expand Down Expand Up @@ -258,7 +258,7 @@ public void writeSparseRows(int rlen, SparseBlock rows)
if( alen2 < _bufflen )
{
if (_count+alen2 > _bufflen)
flushBuffer();
flushBuffer();

for( int j=apos; j<apos+alen; j++ )
{
Expand All @@ -274,7 +274,7 @@ public void writeSparseRows(int rlen, SparseBlock rows)
for( int j=apos; j<apos+alen; j++ )
{
if (_count+12 > _bufflen)
flushBuffer();
flushBuffer();

long tmp2 = Double.doubleToRawLongBits(avals[j]);
intToBa(aix[j], _buff, _count);
Expand Down

0 comments on commit 0cc90b2

Please sign in to comment.