From 8c588b25e52de73e5bccf648da5c0220be4b90f6 Mon Sep 17 00:00:00 2001 From: Ege Korkan Date: Tue, 8 Aug 2023 23:45:18 +0200 Subject: [PATCH 1/6] mqtt: add strict type check to config --- packages/binding-mqtt/tsconfig.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/binding-mqtt/tsconfig.json b/packages/binding-mqtt/tsconfig.json index e9fa7c062..df9cd1d90 100644 --- a/packages/binding-mqtt/tsconfig.json +++ b/packages/binding-mqtt/tsconfig.json @@ -2,7 +2,8 @@ "extends": "../../tsconfig.json", "compilerOptions": { "outDir": "dist", - "rootDir": "src" + "rootDir": "src", + "strict": true }, "include": ["src/**/*"], "references": [{ "path": "../td-tools" }, { "path": "../core" }] From d7664ab80f935bae48299b437f1c8e0fedaf4113 Mon Sep 17 00:00:00 2001 From: Ege Korkan Date: Tue, 8 Aug 2023 23:45:45 +0200 Subject: [PATCH 2/6] mqtt: fix typo --- packages/binding-mqtt/src/mqtt.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/binding-mqtt/src/mqtt.ts b/packages/binding-mqtt/src/mqtt.ts index b5e5418a4..459557c34 100644 --- a/packages/binding-mqtt/src/mqtt.ts +++ b/packages/binding-mqtt/src/mqtt.ts @@ -47,7 +47,7 @@ export class MqttForm extends Form { } export interface MqttClientConfig { - // username & password are redundated here (also find them in MqttClientSecurityParameters) + // username & password are redundant here (also find them in MqttClientSecurityParameters) // because MqttClient.setSecurity() method can inject authentication credentials into this interface // which will be then passed to mqtt.connect() once for all username?: string; From b55df3755f0a7911b87413ba9be0c15f6b22e302 Mon Sep 17 00:00:00 2001 From: Ege Korkan Date: Tue, 8 Aug 2023 23:48:13 +0200 Subject: [PATCH 3/6] mqtt: start handling undefined types --- packages/binding-mqtt/src/mqtt-client.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/binding-mqtt/src/mqtt-client.ts b/packages/binding-mqtt/src/mqtt-client.ts index 2b1f2d15a..fce985d95 100644 --- a/packages/binding-mqtt/src/mqtt-client.ts +++ b/packages/binding-mqtt/src/mqtt-client.ts @@ -40,7 +40,7 @@ export default class MqttClient implements ProtocolClient { this.scheme = "mqtt" + (secure ? "s" : ""); } - private client: mqtt.MqttClient = undefined; + private client: mqtt.MqttClient | undefined = undefined; public subscribeResource( form: MqttForm, @@ -166,8 +166,14 @@ export default class MqttClient implements ProtocolClient { const security: TD.SecurityScheme = metadata[0]; if (security.scheme === "basic") { - this.config.username = credentials.username; - this.config.password = credentials.password; + if (credentials === undefined) { + // FIXME: This error message should be reworded and adapt to logging convention + throw new Error("binding-mqtt: security wants to be basic but you have provided no credentials"); + } else { + this.config.username = credentials.username; + this.config.password = credentials.password; + } + } return true; } From e5a8d0a9251c41115df1e0ea0883e88edb0a297f Mon Sep 17 00:00:00 2001 From: Ege Korkan Date: Wed, 9 Aug 2023 00:00:12 +0200 Subject: [PATCH 4/6] chore: run format --- packages/binding-mqtt/src/mqtt-client.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/binding-mqtt/src/mqtt-client.ts b/packages/binding-mqtt/src/mqtt-client.ts index fce985d95..d21ae798f 100644 --- a/packages/binding-mqtt/src/mqtt-client.ts +++ b/packages/binding-mqtt/src/mqtt-client.ts @@ -173,7 +173,6 @@ export default class MqttClient implements ProtocolClient { this.config.username = credentials.username; this.config.password = credentials.password; } - } return true; } From 7ff47a613492bdc0f45ae067457ae082a8def1d2 Mon Sep 17 00:00:00 2001 From: Cristiano Aguzzi Date: Wed, 9 Aug 2023 09:32:13 +0200 Subject: [PATCH 5/6] style(binding-mqtt/mqtt-client): use ? instead of | undefined Co-authored-by: Jan Romann --- packages/binding-mqtt/src/mqtt-client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/binding-mqtt/src/mqtt-client.ts b/packages/binding-mqtt/src/mqtt-client.ts index d21ae798f..bbb9d1cf5 100644 --- a/packages/binding-mqtt/src/mqtt-client.ts +++ b/packages/binding-mqtt/src/mqtt-client.ts @@ -40,7 +40,7 @@ export default class MqttClient implements ProtocolClient { this.scheme = "mqtt" + (secure ? "s" : ""); } - private client: mqtt.MqttClient | undefined = undefined; + private client?: mqtt.MqttClient; public subscribeResource( form: MqttForm, From 28c0815b31d00179f8fd6e7f5ebcc0b0c34f3059 Mon Sep 17 00:00:00 2001 From: reluc Date: Fri, 11 Aug 2023 14:37:28 +0200 Subject: [PATCH 6/6] chore(binding-mqtt): use ts strict checking. --- .../binding-mqtt/src/mqtt-broker-server.ts | 82 ++++++++++++++----- packages/binding-mqtt/src/mqtt-client.ts | 20 ++++- .../mqtt-client-subscribe-test.integration.ts | 8 ++ .../test/mqtt-client-subscribe-test.unit.ts | 4 +- 4 files changed, 87 insertions(+), 27 deletions(-) diff --git a/packages/binding-mqtt/src/mqtt-broker-server.ts b/packages/binding-mqtt/src/mqtt-broker-server.ts index c7a6d0376..7522ecc6f 100644 --- a/packages/binding-mqtt/src/mqtt-broker-server.ts +++ b/packages/binding-mqtt/src/mqtt-broker-server.ts @@ -41,6 +41,14 @@ import { Readable } from "stream"; const { info, debug, error, warn } = createLoggers("binding-mqtt", "mqtt-broker-server"); export default class MqttBrokerServer implements ProtocolServer { + private static brokerIsInitialized(broker?: mqtt.MqttClient): asserts broker is mqtt.MqttClient { + if (broker === undefined) { + throw new Error( + `Broker not initialized. You need to start the ${MqttBrokerServer.name} before you can expose things.` + ); + } + } + readonly scheme: string = "mqtt"; private readonly ACTION_SEGMENT_LENGTH = 3; @@ -51,30 +59,33 @@ export default class MqttBrokerServer implements ProtocolServer { private readonly INTERACTION_NAME_SEGMENT_INDEX = 2; private readonly INTERACTION_EXT_SEGMENT_INDEX = 3; + private readonly defaults: MqttBrokerServerConfig = { uri: "mqtt://localhost:1883" }; + private port = -1; - private address: string = undefined; + private address?: string = undefined; - private brokerURI: string = undefined; + private brokerURI: string; private readonly things: Map = new Map(); private readonly config: MqttBrokerServerConfig; - private broker: mqtt.MqttClient; + private broker?: mqtt.MqttClient; - private hostedServer: Aedes; - private hostedBroker: net.Server; + private hostedServer?: Aedes; + private hostedBroker?: net.Server; constructor(config: MqttBrokerServerConfig) { - this.config = config ?? { uri: "mqtt://localhost:1883" }; + this.config = config ?? this.defaults; + this.config.uri = this.config.uri ?? this.defaults.uri; - if (config.uri !== undefined) { - // if there is a MQTT protocol indicator missing, add this - if (config.uri.indexOf("://") === -1) { - config.uri = this.scheme + "://" + config.uri; - } - this.brokerURI = config.uri; + // if there is a MQTT protocol indicator missing, add this + if (config.uri.indexOf("://") === -1) { + config.uri = this.scheme + "://" + config.uri; } + + this.brokerURI = config.uri; + if (config.selfHost) { this.hostedServer = Server({}); let server; @@ -86,7 +97,7 @@ export default class MqttBrokerServer implements ProtocolServer { const parsed = new url.URL(this.brokerURI); const port = parseInt(parsed.port); this.port = port > 0 ? port : 1883; - this.hostedBroker = server.listen(port); + this.hostedBroker = server.listen(port, parsed.hostname); this.hostedServer.authenticate = this.selfHostAuthentication.bind(this); } } @@ -130,6 +141,7 @@ export default class MqttBrokerServer implements ProtocolServer { } private exposeProperty(name: string, propertyName: string, thing: ExposedThing) { + MqttBrokerServer.brokerIsInitialized(this.broker); const topic = encodeURIComponent(name) + "/properties/" + encodeURIComponent(propertyName); const property = thing.properties[propertyName]; @@ -143,6 +155,12 @@ export default class MqttBrokerServer implements ProtocolServer { const observeListener = async (content: Content) => { debug(`MqttBrokerServer at ${this.brokerURI} publishing to Property topic '${propertyName}' `); const buffer = await content.toBuffer(); + + if (this.broker === undefined) { + warn(`MqttBrokerServer at ${this.brokerURI} has no client to publish to. Probably it was closed.`); + return; + } + this.broker.publish(topic, buffer); }; thing.handleObserveProperty(propertyName, observeListener, { formIndex: property.forms.length - 1 }); @@ -158,6 +176,8 @@ export default class MqttBrokerServer implements ProtocolServer { } private exposeAction(name: string, actionName: string, thing: ExposedThing) { + MqttBrokerServer.brokerIsInitialized(this.broker); + const topic = encodeURIComponent(name) + "/actions/" + encodeURIComponent(actionName); this.broker.subscribe(topic); @@ -179,6 +199,11 @@ export default class MqttBrokerServer implements ProtocolServer { debug(`MqttBrokerServer at ${this.brokerURI} assigns '${href}' to Event '${eventName}'`); const eventListener = async (content: Content) => { + if (this.broker === undefined) { + warn(`MqttBrokerServer at ${this.brokerURI} has no client to publish to. Probably it was closed.`); + return; + } + if (!content) { warn(`MqttBrokerServer on port ${this.getPort()} cannot process data for Event ${eventName}`); thing.handleUnsubscribeEvent(eventName, eventListener, { formIndex: event.forms.length - 1 }); @@ -199,6 +224,9 @@ export default class MqttBrokerServer implements ProtocolServer { payload = rawPayload; } else if (typeof rawPayload === "string") { payload = Buffer.from(rawPayload); + } else { + warn(`MqttBrokerServer on port ${this.getPort()} received unexpected payload type`); + return; } if (segments.length === this.ACTION_SEGMENT_LENGTH) { @@ -308,7 +336,7 @@ export default class MqttBrokerServer implements ProtocolServer { error( `MqttBrokerServer at ${this.brokerURI} got error on writing to property '${ segments[this.INTERACTION_NAME_SEGMENT_INDEX] - }': ${err.message}` + }': ${err}` ); } } else { @@ -322,7 +350,8 @@ export default class MqttBrokerServer implements ProtocolServer { public async destroy(thingId: string): Promise { debug(`MqttBrokerServer on port ${this.getPort()} destroying thingId '${thingId}'`); - let removedThing: ExposedThing; + let removedThing: ExposedThing | undefined; + for (const name of Array.from(this.things.keys())) { const expThing = this.things.get(name); if (expThing != null && expThing.id != null && expThing.id === thingId) { @@ -330,6 +359,7 @@ export default class MqttBrokerServer implements ProtocolServer { removedThing = expThing; } } + if (removedThing) { info(`MqttBrokerServer succesfully destroyed '${removedThing.title}'`); } else { @@ -385,8 +415,12 @@ export default class MqttBrokerServer implements ProtocolServer { } if (this.hostedBroker !== undefined) { - await new Promise((resolve) => this.hostedServer.close(() => resolve())); - await new Promise((resolve) => this.hostedBroker.close(() => resolve())); + // When the broker is hosted, we need to close it. + // Both this.hostedBroker and this.hostedServer are defined at the same time. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + await new Promise((resolve) => this.hostedServer!.close(() => resolve())); + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + await new Promise((resolve) => this.hostedBroker!.close(() => resolve())); } } @@ -394,7 +428,11 @@ export default class MqttBrokerServer implements ProtocolServer { return this.port; } - public getAddress(): string { + /** + * + * @returns the address of the broker or undefined if the Server is not started. + */ + public getAddress(): string | undefined { return this.address; } @@ -408,15 +446,15 @@ export default class MqttBrokerServer implements ProtocolServer { for (let i = 0; i < this.config.selfHostAuthentication.length; i++) { if ( username === this.config.selfHostAuthentication[i].username && - password.equals(Buffer.from(this.config.selfHostAuthentication[i].password)) + password.equals(Buffer.from(this.config.selfHostAuthentication[i].password ?? "")) ) { - done(undefined, true); + done(null, true); return; } } - done(undefined, false); + done(null, false); return; } - done(undefined, true); + done(null, true); } } diff --git a/packages/binding-mqtt/src/mqtt-client.ts b/packages/binding-mqtt/src/mqtt-client.ts index bbb9d1cf5..3c279a4d9 100644 --- a/packages/binding-mqtt/src/mqtt-client.ts +++ b/packages/binding-mqtt/src/mqtt-client.ts @@ -17,7 +17,7 @@ * Protocol test suite to test protocol implementations */ -import { ProtocolClient, Content, DefaultContent, createLoggers } from "@node-wot/core"; +import { ProtocolClient, Content, DefaultContent, createLoggers, ContentSerdes } from "@node-wot/core"; import * as TD from "@node-wot/td-tools"; import * as mqtt from "mqtt"; import { MqttClientConfig, MqttForm, MqttQoS } from "./mqtt"; @@ -50,7 +50,7 @@ export default class MqttClient implements ProtocolClient { ): Promise { return new Promise((resolve, reject) => { // get MQTT-based metadata - const contentType = form.contentType; + const contentType = form.contentType ?? ContentSerdes.DEFAULT; const requestUri = new url.URL(form.href); const topic = requestUri.pathname.slice(1); const brokerUri: string = `${this.scheme}://` + requestUri.host; @@ -63,15 +63,29 @@ export default class MqttClient implements ProtocolClient { this.client.subscribe(topic); resolve( new Subscription(() => { + if (!this.client) { + warn( + `MQTT Client is undefined. This means that the client either failed to connect or was never initialized.` + ); + return; + } this.client.unsubscribe(topic); }) ); } this.client.on("connect", () => { - this.client.subscribe(topic); + // In this case, the client is definitely defined. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + this.client!.subscribe(topic); resolve( new Subscription(() => { + if (!this.client) { + warn( + `MQTT Client is undefined. This means that the client either failed to connect or was never initialized.` + ); + return; + } this.client.unsubscribe(topic); }) ); diff --git a/packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts b/packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts index eaf490f96..6a98cfa47 100644 --- a/packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts +++ b/packages/binding-mqtt/test/mqtt-client-subscribe-test.integration.ts @@ -76,6 +76,10 @@ describe("MQTT client implementation", () => { if (!eventReceived) { eventReceived = true; } else { + if (!x.data) { + done(new Error("No data received")); + return; + } ProtocolHelpers.readStreamFully(ProtocolHelpers.toNodeStream(x.data)).then( (received) => { expect(JSON.parse(received.toString())).to.equal(++check); @@ -134,6 +138,10 @@ describe("MQTT client implementation", () => { if (!eventReceived) { eventReceived = true; } else { + if (!x.data) { + done(new Error("No data received")); + return; + } ProtocolHelpers.readStreamFully(ProtocolHelpers.toNodeStream(x.data)).then( (received) => { expect(JSON.parse(received.toString())).to.equal(++check); diff --git a/packages/binding-mqtt/test/mqtt-client-subscribe-test.unit.ts b/packages/binding-mqtt/test/mqtt-client-subscribe-test.unit.ts index a734bd8fd..e889869a2 100644 --- a/packages/binding-mqtt/test/mqtt-client-subscribe-test.unit.ts +++ b/packages/binding-mqtt/test/mqtt-client-subscribe-test.unit.ts @@ -120,10 +120,10 @@ describe("MQTT client implementation", () => { beforeEach(() => { aedes.authenticate = function (_client, username: Readonly, password: Readonly, done) { if (username !== undefined) { - done(undefined, username === "user" && password.equals(Buffer.from("pass"))); + done(null, username === "user" && password.equals(Buffer.from("pass"))); return; } - done(undefined, true); + done(null, true); }; const server = net.createServer(aedes.handle); hostedBroker = server.listen(brokerPort);