Skip to content

Commit

Permalink
get rid of queue in sync and detect reorgs correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrick Nagurny committed Jun 20, 2017
1 parent 46134e1 commit 93779e6
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 188 deletions.
7 changes: 4 additions & 3 deletions lib/services/bitcoind/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ Bitcoin.prototype._wrapRPCError = function(errObj) {
};

Bitcoin.prototype._getGenesisBlock = function(callback) {

var self = this;

self.client.getBlockHash(0, function(err, response) {
Expand Down Expand Up @@ -290,7 +289,6 @@ Bitcoin.prototype._connectProcess = function(config) {
};

Bitcoin.prototype.start = function(callback) {

var self = this;

if (!self.options.connect) {
Expand All @@ -303,7 +301,10 @@ Bitcoin.prototype.start = function(callback) {
throw new Error('Could not connect to any servers in connect array.');
}

self._initChain(function() {
self._initChain(function(err) {
if(err) {
return callback(err);
}

log.info('Bitcoin Daemon Ready');
callback();
Expand Down
44 changes: 9 additions & 35 deletions lib/services/db/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ DB.prototype.start = function(callback) {
log.info('Bitcoin network tip is currently: ' + self.bitcoind.tiphash + ' at height: ' + self.bitcoind.height);
self._sync.sync();

self._sync.once('synced', function() {
self.emit('synced');
});

});

});
Expand Down Expand Up @@ -239,43 +243,13 @@ DB.prototype.createKeyStream = function(op) {
return stream;
};

DB.prototype.detectReorg = function(blocks) {

var self = this;

if (!blocks || blocks.length === 0) {
return;
}

var tipHash = self.reorgTipHash || self.tip.hash;
var chainMembers = [];

var loopIndex = 0;
var overallCounter = 0;

while(overallCounter < blocks.length) {

if (loopIndex >= blocks.length) {
overallCounter++;
loopIndex = 0;
}

var prevHash = BufferUtil.reverse(blocks[loopIndex].header.prevHash).toString('hex');
if (prevHash === tipHash) {
tipHash = blocks[loopIndex].hash;
chainMembers.push(blocks[loopIndex]);
}
loopIndex++;

}

for(var i = 0; i < blocks.length; i++) {
if (chainMembers.indexOf(blocks[i]) === -1) {
return blocks[i];
}
self.reorgTipHash = blocks[i].hash;
DB.prototype.detectReorg = function(block) {
var prevHash = BufferUtil.reverse(block.header.prevHash).toString('hex');
if(this.tip.hash !== prevHash) {
return true;
}

return false;
};

DB.prototype.handleReorg = function(forkBlock, callback) {
Expand Down
102 changes: 16 additions & 86 deletions lib/services/db/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ function BlockStream(highWaterMark, db, sync) {
this.db = db;
this.dbTip = this.db.tip;
this.lastReadHeight = this.dbTip.__height;
this.lastEmittedHash = this.dbTip.hash;
this.queue = [];
this.lastReadHash = this.dbTip.hash;
this.processing = false;
this.bitcoind = this.db.bitcoind;
}
Expand Down Expand Up @@ -101,18 +100,9 @@ Sync.prototype.sync = function() {
};

Sync.prototype._onFinish = function() {

var self = this;
self.syncing = false;

if (self.forkBlock) {
self.db.handleReorg(self.forkBlock, function() {
self.forkBlock = null;
self.sync();
});
return;
}

self._startSubscriptions();
self.emit('synced');

Expand Down Expand Up @@ -147,91 +137,27 @@ Sync.prototype._handleErrors = function(stream) {


BlockStream.prototype._read = function() {
var self = this;

if (this.lastEmittedHash === this.bitcoind.tiphash) {
if(this.lastReadHash === this.bitcoind.tiphash) {
return this.push(null);
}

this.queue.push(++this.lastReadHeight);
this._process();
};

BlockStream.prototype._process = function() {
var self = this;

if(self.processing) {
return;
if(this.lastReadHeight >= this.bitcoind.height) {
return this.push(null);
}

this.processing = true;

async.whilst(
function() {
return self.queue.length;
}, function(next) {

var blockArgs = self.queue.slice(0, Math.min(5, self.queue.length));
self.queue = self.queue.slice(blockArgs.length);
self._getBlocks(blockArgs, next);

}, function(err) {
if(err) {
return self.emit('error', err);
}
self.processing = false;
}
);
};


BlockStream.prototype._getBlocks = function(heights, callback) {

var self = this;
async.map(heights, function(height, next) {

if (height === 0) {
var block = new Block(self.bitcoind.genesisBuffer);
block.__height = 0;
return next(null, block);
}

self.bitcoind.getBlock(height, function(err, block) {

if(err) {
return next(err);
}

block.__height = height;
next(null, block);
});


}, function(err, blocks) {

self.bitcoind.getBlock(self.lastReadHeight + 1, function(err, block) {
if(err) {
return callback(err);
// add new stack lines to err
return self.emit('error', new Error(err));
}

//at this point, we know that all blocks we've sent down the pipe
//have not been reorg'ed, but the new batch here might have been
self.sync.forkBlock = self.db.detectReorg(blocks);

if (!self.sync.forkBlock) {

for(var i = 0; i < blocks.length; i++) {

self.lastEmittedHash = blocks[i].hash;
self.push(blocks[i]);

}

return callback();

}

self.push(null);
callback();
self.lastReadHeight++;
self.lastReadHash = block.hash;

block.__height = self.lastReadHeight;
self.push(block);
});
};

Expand Down Expand Up @@ -266,6 +192,10 @@ ProcessSerial.prototype._write = function(block, enc, callback) {
ProcessSerial.prototype._process = function(block, callback) {
var self = this;

if(self.db.detectReorg(block)) {
return self.db.handleReorg(block, callback);
}

self.db.getSerialBlockOperations(block, true, function(err, operations) {
if(err) {
return callback(err);
Expand Down
3 changes: 2 additions & 1 deletion test/data/blocks.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"genesis": "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f20020000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000",
"block1a": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f69965a91e7fc9ccccbe4051b74d086114741b96678f5e491b5609b18962252fd2d12f858ffff7f20040000000102000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0200f2052a0100000023210372bfaa748e546ba784a4d1395f5cedf673f9f5a8160effbe0f595fe905fb3e59ac0000000000000000266a24aa21a9ede2f61c3f71d1defd3fa999dfa36953755c690689799962b48bebd836974e8cf900000000",
"block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000"
"block1b": "0000002006226e46111a0b59caaf126043eb5bbf28c34f3a5e332a1fc7b2b73cf188910f36f612e7b9a88a90fdd781e8885ae425ee9124e17c22d6eb4253094d0e6f6ae6dfede058ffff7f20010000000101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff03510101ffffffff0100f2052a01000000232103caa96df67b8a5ce37cca53ddf394a093fab6922830fad79fc5f0d8369200121cac00000000",
"block2b": "00000020d459cd53b1e7fb9c9b7bdd7c48b571da9b0df08ff68b13cb6784b5b18761512e206072ebec8302cf43f859245d61dc304eaced26703e5d2fa035ae8068c6196ae0ede058ffff7f20010000000101000000000100f2052a010000001976a9142a48bf892a5461dffe8c68fe209be16a84289ca488ac00000000"
}
56 changes: 0 additions & 56 deletions test/services/db/index.unit.js

This file was deleted.

Loading

0 comments on commit 93779e6

Please sign in to comment.