Skip to content

Commit

Permalink
Add serialize_writes option
Browse files Browse the repository at this point in the history
  • Loading branch information
edfletcher committed Jun 17, 2023
1 parent a609363 commit 46214c6
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 4 deletions.
3 changes: 2 additions & 1 deletion src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ module.exports = class IrcClient extends EventEmitter {
message_max_length: 350,
sasl_disconnect_on_fail: false,
transport: default_transport,
websocket_protocol: 'text.ircv3.net'
websocket_protocol: 'text.ircv3.net',
serialize_writes: false
};

const props = Object.keys(defaults);
Expand Down
35 changes: 32 additions & 3 deletions src/transports/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,31 @@ module.exports = class Connection extends EventEmitter {
this.socket_events = [];

this.encoding = 'utf8';

this.write_queue = null;
this.write_queue_servicer = () => {};
}

isConnected() {
return this.state === SOCK_CONNECTED;
}

_writeLineConnected(line, cb) {
if (this.encoding !== 'utf8') {
this.socket.write(iconv.encode(line + '\r\n', this.encoding), cb);
} else {
this.socket.write(line + '\r\n', cb);
}
}

writeLine(line, cb) {
if (this.socket && this.isConnected()) {
if (this.encoding !== 'utf8') {
this.socket.write(iconv.encode(line + '\r\n', this.encoding), cb);
if (this.options.serialize_writes && this.write_queue) {
if (this.write_queue.push({ line, cb }) === 1) {
this.write_queue_servicer();
}
} else {
this.socket.write(line + '\r\n', cb);
this._writeLineConnected(line, cb);
}
} else {
this.debugOut('writeLine() called when not connected');
Expand Down Expand Up @@ -77,6 +90,22 @@ module.exports = class Connection extends EventEmitter {
this.requested_disconnect = false;
this.incoming_buffer = Buffer.from('');

if (options.serialize_writes) {
this.write_queue = [];
this.write_queue_servicer = () => {
if (this.write_queue.length) {
this._writeLineConnected(this.write_queue[0].line, () => {
if (this.write_queue[0].cb) {
this.write_queue[0].cb();
}

this.write_queue = this.write_queue.slice(1);
process.nextTick(this.write_queue_servicer);
});
}
};
}

// Include server name (SNI) if provided host is not an IP address
if (!this.getAddressFamily(ircd_host)) {
sni = ircd_host;
Expand Down
93 changes: 93 additions & 0 deletions test/rawWriteOrdering.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
'use strict';

/* globals describe, it */

const net = require('net');
const Connection = require('../src/transports/net');

const chai = require('chai');

chai.use(require('chai-subset'));

async function runTest(serialize_writes) {
const numLines = 12;
const timeSlice = 100;

return new Promise((resolve) => {
let conn;
let server; // eslint-disable-line prefer-const
let wroteLines = [];
const bufferedLines = [];

const clientHandler = (client) => {
client.on('data', (data) => {
const dataStr = data.toString('utf8');
bufferedLines.push(dataStr);

if (wroteLines.length && wroteLines.length === bufferedLines.length) {
conn.close();
server.close();
resolve({ wroteLines, bufferedLines });
}
});
};

server = net.createServer(clientHandler);
server.listen(0, '0.0.0.0', () => {
conn = new Connection({
host: server.address().address,
port: server.address().port,
tls: false,
serialize_writes,
});

wroteLines = Array.from({ length: numLines }).map((_, i) => i).map(String);
let delay = wroteLines.length / timeSlice;
const rudeHandler = {
get(target, prop) {
if (prop === 'write') {
return (data, cb) => {
setTimeout(() => target[prop](data, cb), delay * 1000);
delay -= 1 / timeSlice;
};
} else {
return target[prop];
}
}
};

conn.on('open', () => {
conn.socket = new Proxy(conn.socket, rudeHandler);
wroteLines.forEach((line) => conn.writeLine(line));
});

conn.connect();
});
});
}

function compareLines(wroteLines, bufferedLines) {
return bufferedLines.map((l) => l.trim()).every((line, index) => line === wroteLines[index]);
}

describe('src/transports/net.js', function() {
it('should recieve messages in reverse of the order sent when serialize_writes is false', function(done) {
runTest(false).then(({ wroteLines, bufferedLines }) => {
let error = null;
if (compareLines(wroteLines, bufferedLines) === true) {
error = new Error('Line order matches when it should not!');
}
done(error);
});
});

it('should recieve messages in the order sent when serialize_writes is true', function(done) {
runTest(true).then(({ wroteLines, bufferedLines }) => {
let error = null;
if (compareLines(wroteLines, bufferedLines) === false) {
error = new Error('Line order does not match when it should!');
}
done(error);
});
});
});

0 comments on commit 46214c6

Please sign in to comment.