Skip to content

Commit

Permalink
Merge pull request #1053 from egekorkan/strict-mqtt
Browse files Browse the repository at this point in the history
Use ts strict checking for binding mqtt
  • Loading branch information
relu91 authored Aug 11, 2023
2 parents e0e3fc4 + 28c0815 commit 0e6e117
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 32 deletions.
82 changes: 60 additions & 22 deletions packages/binding-mqtt/src/mqtt-broker-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ import { Readable } from "stream";
const { info, debug, error, warn } = createLoggers("binding-mqtt", "mqtt-broker-server");

export default class MqttBrokerServer implements ProtocolServer {
private static brokerIsInitialized(broker?: mqtt.MqttClient): asserts broker is mqtt.MqttClient {
if (broker === undefined) {
throw new Error(
`Broker not initialized. You need to start the ${MqttBrokerServer.name} before you can expose things.`
);
}
}

readonly scheme: string = "mqtt";

private readonly ACTION_SEGMENT_LENGTH = 3;
Expand All @@ -51,30 +59,33 @@ export default class MqttBrokerServer implements ProtocolServer {
private readonly INTERACTION_NAME_SEGMENT_INDEX = 2;
private readonly INTERACTION_EXT_SEGMENT_INDEX = 3;

private readonly defaults: MqttBrokerServerConfig = { uri: "mqtt://localhost:1883" };

private port = -1;
private address: string = undefined;
private address?: string = undefined;

private brokerURI: string = undefined;
private brokerURI: string;

private readonly things: Map<string, ExposedThing> = new Map();

private readonly config: MqttBrokerServerConfig;

private broker: mqtt.MqttClient;
private broker?: mqtt.MqttClient;

private hostedServer: Aedes;
private hostedBroker: net.Server;
private hostedServer?: Aedes;
private hostedBroker?: net.Server;

constructor(config: MqttBrokerServerConfig) {
this.config = config ?? { uri: "mqtt://localhost:1883" };
this.config = config ?? this.defaults;
this.config.uri = this.config.uri ?? this.defaults.uri;

if (config.uri !== undefined) {
// if there is a MQTT protocol indicator missing, add this
if (config.uri.indexOf("://") === -1) {
config.uri = this.scheme + "://" + config.uri;
}
this.brokerURI = config.uri;
// if there is a MQTT protocol indicator missing, add this
if (config.uri.indexOf("://") === -1) {
config.uri = this.scheme + "://" + config.uri;
}

this.brokerURI = config.uri;

if (config.selfHost) {
this.hostedServer = Server({});
let server;
Expand All @@ -86,7 +97,7 @@ export default class MqttBrokerServer implements ProtocolServer {
const parsed = new url.URL(this.brokerURI);
const port = parseInt(parsed.port);
this.port = port > 0 ? port : 1883;
this.hostedBroker = server.listen(port);
this.hostedBroker = server.listen(port, parsed.hostname);
this.hostedServer.authenticate = this.selfHostAuthentication.bind(this);
}
}
Expand Down Expand Up @@ -130,6 +141,7 @@ export default class MqttBrokerServer implements ProtocolServer {
}

private exposeProperty(name: string, propertyName: string, thing: ExposedThing) {
MqttBrokerServer.brokerIsInitialized(this.broker);
const topic = encodeURIComponent(name) + "/properties/" + encodeURIComponent(propertyName);
const property = thing.properties[propertyName];

Expand All @@ -143,6 +155,12 @@ export default class MqttBrokerServer implements ProtocolServer {
const observeListener = async (content: Content) => {
debug(`MqttBrokerServer at ${this.brokerURI} publishing to Property topic '${propertyName}' `);
const buffer = await content.toBuffer();

if (this.broker === undefined) {
warn(`MqttBrokerServer at ${this.brokerURI} has no client to publish to. Probably it was closed.`);
return;
}

this.broker.publish(topic, buffer);
};
thing.handleObserveProperty(propertyName, observeListener, { formIndex: property.forms.length - 1 });
Expand All @@ -158,6 +176,8 @@ export default class MqttBrokerServer implements ProtocolServer {
}

private exposeAction(name: string, actionName: string, thing: ExposedThing) {
MqttBrokerServer.brokerIsInitialized(this.broker);

const topic = encodeURIComponent(name) + "/actions/" + encodeURIComponent(actionName);
this.broker.subscribe(topic);

Expand All @@ -179,6 +199,11 @@ export default class MqttBrokerServer implements ProtocolServer {
debug(`MqttBrokerServer at ${this.brokerURI} assigns '${href}' to Event '${eventName}'`);

const eventListener = async (content: Content) => {
if (this.broker === undefined) {
warn(`MqttBrokerServer at ${this.brokerURI} has no client to publish to. Probably it was closed.`);
return;
}

if (!content) {
warn(`MqttBrokerServer on port ${this.getPort()} cannot process data for Event ${eventName}`);
thing.handleUnsubscribeEvent(eventName, eventListener, { formIndex: event.forms.length - 1 });
Expand All @@ -199,6 +224,9 @@ export default class MqttBrokerServer implements ProtocolServer {
payload = rawPayload;
} else if (typeof rawPayload === "string") {
payload = Buffer.from(rawPayload);
} else {
warn(`MqttBrokerServer on port ${this.getPort()} received unexpected payload type`);
return;
}

if (segments.length === this.ACTION_SEGMENT_LENGTH) {
Expand Down Expand Up @@ -308,7 +336,7 @@ export default class MqttBrokerServer implements ProtocolServer {
error(
`MqttBrokerServer at ${this.brokerURI} got error on writing to property '${
segments[this.INTERACTION_NAME_SEGMENT_INDEX]
}': ${err.message}`
}': ${err}`
);
}
} else {
Expand All @@ -322,14 +350,16 @@ export default class MqttBrokerServer implements ProtocolServer {

public async destroy(thingId: string): Promise<boolean> {
debug(`MqttBrokerServer on port ${this.getPort()} destroying thingId '${thingId}'`);
let removedThing: ExposedThing;
let removedThing: ExposedThing | undefined;

for (const name of Array.from(this.things.keys())) {
const expThing = this.things.get(name);
if (expThing != null && expThing.id != null && expThing.id === thingId) {
this.things.delete(name);
removedThing = expThing;
}
}

if (removedThing) {
info(`MqttBrokerServer succesfully destroyed '${removedThing.title}'`);
} else {
Expand Down Expand Up @@ -385,16 +415,24 @@ export default class MqttBrokerServer implements ProtocolServer {
}

if (this.hostedBroker !== undefined) {
await new Promise<void>((resolve) => this.hostedServer.close(() => resolve()));
await new Promise<void>((resolve) => this.hostedBroker.close(() => resolve()));
// When the broker is hosted, we need to close it.
// Both this.hostedBroker and this.hostedServer are defined at the same time.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await new Promise<void>((resolve) => this.hostedServer!.close(() => resolve()));
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await new Promise<void>((resolve) => this.hostedBroker!.close(() => resolve()));
}
}

public getPort(): number {
return this.port;
}

public getAddress(): string {
/**
*
* @returns the address of the broker or undefined if the Server is not started.
*/
public getAddress(): string | undefined {
return this.address;
}

Expand All @@ -408,15 +446,15 @@ export default class MqttBrokerServer implements ProtocolServer {
for (let i = 0; i < this.config.selfHostAuthentication.length; i++) {
if (
username === this.config.selfHostAuthentication[i].username &&
password.equals(Buffer.from(this.config.selfHostAuthentication[i].password))
password.equals(Buffer.from(this.config.selfHostAuthentication[i].password ?? ""))
) {
done(undefined, true);
done(null, true);
return;
}
}
done(undefined, false);
done(null, false);
return;
}
done(undefined, true);
done(null, true);
}
}
31 changes: 25 additions & 6 deletions packages/binding-mqtt/src/mqtt-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Protocol test suite to test protocol implementations
*/

import { ProtocolClient, Content, DefaultContent, createLoggers } from "@node-wot/core";
import { ProtocolClient, Content, DefaultContent, createLoggers, ContentSerdes } from "@node-wot/core";
import * as TD from "@node-wot/td-tools";
import * as mqtt from "mqtt";
import { MqttClientConfig, MqttForm, MqttQoS } from "./mqtt";
Expand All @@ -40,7 +40,7 @@ export default class MqttClient implements ProtocolClient {
this.scheme = "mqtt" + (secure ? "s" : "");
}

private client: mqtt.MqttClient = undefined;
private client?: mqtt.MqttClient;

public subscribeResource(
form: MqttForm,
Expand All @@ -50,7 +50,7 @@ export default class MqttClient implements ProtocolClient {
): Promise<Subscription> {
return new Promise<Subscription>((resolve, reject) => {
// get MQTT-based metadata
const contentType = form.contentType;
const contentType = form.contentType ?? ContentSerdes.DEFAULT;
const requestUri = new url.URL(form.href);
const topic = requestUri.pathname.slice(1);
const brokerUri: string = `${this.scheme}://` + requestUri.host;
Expand All @@ -63,15 +63,29 @@ export default class MqttClient implements ProtocolClient {
this.client.subscribe(topic);
resolve(
new Subscription(() => {
if (!this.client) {
warn(
`MQTT Client is undefined. This means that the client either failed to connect or was never initialized.`
);
return;
}
this.client.unsubscribe(topic);
})
);
}

this.client.on("connect", () => {
this.client.subscribe(topic);
// In this case, the client is definitely defined.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.client!.subscribe(topic);
resolve(
new Subscription(() => {
if (!this.client) {
warn(
`MQTT Client is undefined. This means that the client either failed to connect or was never initialized.`
);
return;
}
this.client.unsubscribe(topic);
})
);
Expand Down Expand Up @@ -166,8 +180,13 @@ export default class MqttClient implements ProtocolClient {
const security: TD.SecurityScheme = metadata[0];

if (security.scheme === "basic") {
this.config.username = credentials.username;
this.config.password = credentials.password;
if (credentials === undefined) {
// FIXME: This error message should be reworded and adapt to logging convention
throw new Error("binding-mqtt: security wants to be basic but you have provided no credentials");
} else {
this.config.username = credentials.username;
this.config.password = credentials.password;
}
}
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/binding-mqtt/src/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class MqttForm extends Form {
}

export interface MqttClientConfig {
// username & password are redundated here (also find them in MqttClientSecurityParameters)
// username & password are redundant here (also find them in MqttClientSecurityParameters)
// because MqttClient.setSecurity() method can inject authentication credentials into this interface
// which will be then passed to mqtt.connect() once for all
username?: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ describe("MQTT client implementation", () => {
if (!eventReceived) {
eventReceived = true;
} else {
if (!x.data) {
done(new Error("No data received"));
return;
}
ProtocolHelpers.readStreamFully(ProtocolHelpers.toNodeStream(x.data)).then(
(received) => {
expect(JSON.parse(received.toString())).to.equal(++check);
Expand Down Expand Up @@ -134,6 +138,10 @@ describe("MQTT client implementation", () => {
if (!eventReceived) {
eventReceived = true;
} else {
if (!x.data) {
done(new Error("No data received"));
return;
}
ProtocolHelpers.readStreamFully(ProtocolHelpers.toNodeStream(x.data)).then(
(received) => {
expect(JSON.parse(received.toString())).to.equal(++check);
Expand Down
4 changes: 2 additions & 2 deletions packages/binding-mqtt/test/mqtt-client-subscribe-test.unit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ describe("MQTT client implementation", () => {
beforeEach(() => {
aedes.authenticate = function (_client, username: Readonly<string>, password: Readonly<Buffer>, done) {
if (username !== undefined) {
done(undefined, username === "user" && password.equals(Buffer.from("pass")));
done(null, username === "user" && password.equals(Buffer.from("pass")));
return;
}
done(undefined, true);
done(null, true);
};
const server = net.createServer(aedes.handle);
hostedBroker = server.listen(brokerPort);
Expand Down
3 changes: 2 additions & 1 deletion packages/binding-mqtt/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
"rootDir": "src",
"strict": true
},
"include": ["src/**/*"],
"references": [{ "path": "../td-tools" }, { "path": "../core" }]
Expand Down

0 comments on commit 0e6e117

Please sign in to comment.