Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ts strict checking for binding mqtt #1053

Merged
merged 7 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 60 additions & 22 deletions packages/binding-mqtt/src/mqtt-broker-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
const { info, debug, error, warn } = createLoggers("binding-mqtt", "mqtt-broker-server");

export default class MqttBrokerServer implements ProtocolServer {
private static brokerIsInitialized(broker?: mqtt.MqttClient): asserts broker is mqtt.MqttClient {
if (broker === undefined) {
throw new Error(
`Broker not initialized. You need to start the ${MqttBrokerServer.name} before you can expose things.`
);
}

Check warning on line 49 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L46-L49

Added lines #L46 - L49 were not covered by tests
}

readonly scheme: string = "mqtt";

private readonly ACTION_SEGMENT_LENGTH = 3;
Expand All @@ -51,30 +59,33 @@
private readonly INTERACTION_NAME_SEGMENT_INDEX = 2;
private readonly INTERACTION_EXT_SEGMENT_INDEX = 3;

private readonly defaults: MqttBrokerServerConfig = { uri: "mqtt://localhost:1883" };

private port = -1;
private address: string = undefined;
private address?: string = undefined;

private brokerURI: string = undefined;
private brokerURI: string;

private readonly things: Map<string, ExposedThing> = new Map();

private readonly config: MqttBrokerServerConfig;

private broker: mqtt.MqttClient;
private broker?: mqtt.MqttClient;

private hostedServer: Aedes;
private hostedBroker: net.Server;
private hostedServer?: Aedes;
private hostedBroker?: net.Server;

constructor(config: MqttBrokerServerConfig) {
this.config = config ?? { uri: "mqtt://localhost:1883" };
this.config = config ?? this.defaults;
this.config.uri = this.config.uri ?? this.defaults.uri;

if (config.uri !== undefined) {
// if there is a MQTT protocol indicator missing, add this
if (config.uri.indexOf("://") === -1) {
config.uri = this.scheme + "://" + config.uri;
}
this.brokerURI = config.uri;
// if there is a MQTT protocol indicator missing, add this
if (config.uri.indexOf("://") === -1) {
config.uri = this.scheme + "://" + config.uri;

Check warning on line 84 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L84

Added line #L84 was not covered by tests
}

this.brokerURI = config.uri;

if (config.selfHost) {
this.hostedServer = Server({});
let server;
Expand All @@ -86,7 +97,7 @@
const parsed = new url.URL(this.brokerURI);
const port = parseInt(parsed.port);
this.port = port > 0 ? port : 1883;
this.hostedBroker = server.listen(port);
this.hostedBroker = server.listen(port, parsed.hostname);
this.hostedServer.authenticate = this.selfHostAuthentication.bind(this);
}
}
Expand Down Expand Up @@ -130,6 +141,7 @@
}

private exposeProperty(name: string, propertyName: string, thing: ExposedThing) {
MqttBrokerServer.brokerIsInitialized(this.broker);
const topic = encodeURIComponent(name) + "/properties/" + encodeURIComponent(propertyName);
const property = thing.properties[propertyName];

Expand All @@ -143,6 +155,12 @@
const observeListener = async (content: Content) => {
debug(`MqttBrokerServer at ${this.brokerURI} publishing to Property topic '${propertyName}' `);
const buffer = await content.toBuffer();

if (this.broker === undefined) {
warn(`MqttBrokerServer at ${this.brokerURI} has no client to publish to. Probably it was closed.`);
return;
}

Check warning on line 163 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L158-L163

Added lines #L158 - L163 were not covered by tests
this.broker.publish(topic, buffer);
};
thing.handleObserveProperty(propertyName, observeListener, { formIndex: property.forms.length - 1 });
Expand All @@ -158,6 +176,8 @@
}

private exposeAction(name: string, actionName: string, thing: ExposedThing) {
MqttBrokerServer.brokerIsInitialized(this.broker);

const topic = encodeURIComponent(name) + "/actions/" + encodeURIComponent(actionName);
this.broker.subscribe(topic);

Expand All @@ -179,6 +199,11 @@
debug(`MqttBrokerServer at ${this.brokerURI} assigns '${href}' to Event '${eventName}'`);

const eventListener = async (content: Content) => {
if (this.broker === undefined) {
warn(`MqttBrokerServer at ${this.brokerURI} has no client to publish to. Probably it was closed.`);
return;
}

Check warning on line 205 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L203-L205

Added lines #L203 - L205 were not covered by tests

if (!content) {
warn(`MqttBrokerServer on port ${this.getPort()} cannot process data for Event ${eventName}`);
thing.handleUnsubscribeEvent(eventName, eventListener, { formIndex: event.forms.length - 1 });
Expand All @@ -199,6 +224,9 @@
payload = rawPayload;
} else if (typeof rawPayload === "string") {
payload = Buffer.from(rawPayload);
} else {
warn(`MqttBrokerServer on port ${this.getPort()} received unexpected payload type`);
return;

Check warning on line 229 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L227-L229

Added lines #L227 - L229 were not covered by tests
}

if (segments.length === this.ACTION_SEGMENT_LENGTH) {
Expand Down Expand Up @@ -308,7 +336,7 @@
error(
`MqttBrokerServer at ${this.brokerURI} got error on writing to property '${
segments[this.INTERACTION_NAME_SEGMENT_INDEX]
}': ${err.message}`
}': ${err}`

Check warning on line 339 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L339

Added line #L339 was not covered by tests
);
}
} else {
Expand All @@ -322,14 +350,16 @@

public async destroy(thingId: string): Promise<boolean> {
debug(`MqttBrokerServer on port ${this.getPort()} destroying thingId '${thingId}'`);
let removedThing: ExposedThing;
let removedThing: ExposedThing | undefined;

for (const name of Array.from(this.things.keys())) {
const expThing = this.things.get(name);
if (expThing != null && expThing.id != null && expThing.id === thingId) {
this.things.delete(name);
removedThing = expThing;
}
}

if (removedThing) {
info(`MqttBrokerServer succesfully destroyed '${removedThing.title}'`);
} else {
Expand Down Expand Up @@ -385,16 +415,24 @@
}

if (this.hostedBroker !== undefined) {
await new Promise<void>((resolve) => this.hostedServer.close(() => resolve()));
await new Promise<void>((resolve) => this.hostedBroker.close(() => resolve()));
// When the broker is hosted, we need to close it.
// Both this.hostedBroker and this.hostedServer are defined at the same time.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await new Promise<void>((resolve) => this.hostedServer!.close(() => resolve()));
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await new Promise<void>((resolve) => this.hostedBroker!.close(() => resolve()));
}
}

public getPort(): number {
return this.port;
}

public getAddress(): string {
/**
*
* @returns the address of the broker or undefined if the Server is not started.
*/
public getAddress(): string | undefined {
return this.address;
}

Expand All @@ -408,15 +446,15 @@
for (let i = 0; i < this.config.selfHostAuthentication.length; i++) {
if (
username === this.config.selfHostAuthentication[i].username &&
password.equals(Buffer.from(this.config.selfHostAuthentication[i].password))
password.equals(Buffer.from(this.config.selfHostAuthentication[i].password ?? ""))

Check warning on line 449 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L449

Added line #L449 was not covered by tests
) {
done(undefined, true);
done(null, true);

Check warning on line 451 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L451

Added line #L451 was not covered by tests
return;
}
}
done(undefined, false);
done(null, false);

Check warning on line 455 in packages/binding-mqtt/src/mqtt-broker-server.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-broker-server.ts#L455

Added line #L455 was not covered by tests
return;
}
done(undefined, true);
done(null, true);
}
}
31 changes: 25 additions & 6 deletions packages/binding-mqtt/src/mqtt-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* Protocol test suite to test protocol implementations
*/

import { ProtocolClient, Content, DefaultContent, createLoggers } from "@node-wot/core";
import { ProtocolClient, Content, DefaultContent, createLoggers, ContentSerdes } from "@node-wot/core";
import * as TD from "@node-wot/td-tools";
import * as mqtt from "mqtt";
import { MqttClientConfig, MqttForm, MqttQoS } from "./mqtt";
Expand All @@ -40,7 +40,7 @@
this.scheme = "mqtt" + (secure ? "s" : "");
}

private client: mqtt.MqttClient = undefined;
private client?: mqtt.MqttClient;

public subscribeResource(
form: MqttForm,
Expand All @@ -50,7 +50,7 @@
): Promise<Subscription> {
return new Promise<Subscription>((resolve, reject) => {
// get MQTT-based metadata
const contentType = form.contentType;
const contentType = form.contentType ?? ContentSerdes.DEFAULT;
const requestUri = new url.URL(form.href);
const topic = requestUri.pathname.slice(1);
const brokerUri: string = `${this.scheme}://` + requestUri.host;
Expand All @@ -63,15 +63,29 @@
this.client.subscribe(topic);
resolve(
new Subscription(() => {
if (!this.client) {
warn(
`MQTT Client is undefined. This means that the client either failed to connect or was never initialized.`
);
return;
}

Check warning on line 71 in packages/binding-mqtt/src/mqtt-client.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-client.ts#L67-L71

Added lines #L67 - L71 were not covered by tests
this.client.unsubscribe(topic);
})
);
}

this.client.on("connect", () => {
this.client.subscribe(topic);
// In this case, the client is definitely defined.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.client!.subscribe(topic);
resolve(
new Subscription(() => {
if (!this.client) {
warn(
`MQTT Client is undefined. This means that the client either failed to connect or was never initialized.`
);
return;
}

Check warning on line 88 in packages/binding-mqtt/src/mqtt-client.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-client.ts#L84-L88

Added lines #L84 - L88 were not covered by tests
this.client.unsubscribe(topic);
})
);
Expand Down Expand Up @@ -166,8 +180,13 @@
const security: TD.SecurityScheme = metadata[0];

if (security.scheme === "basic") {
this.config.username = credentials.username;
this.config.password = credentials.password;
if (credentials === undefined) {
// FIXME: This error message should be reworded and adapt to logging convention
throw new Error("binding-mqtt: security wants to be basic but you have provided no credentials");

Check warning on line 185 in packages/binding-mqtt/src/mqtt-client.ts

View check run for this annotation

Codecov / codecov/patch

packages/binding-mqtt/src/mqtt-client.ts#L184-L185

Added lines #L184 - L185 were not covered by tests
} else {
this.config.username = credentials.username;
this.config.password = credentials.password;
}
}
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/binding-mqtt/src/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class MqttForm extends Form {
}

export interface MqttClientConfig {
// username & password are redundated here (also find them in MqttClientSecurityParameters)
// username & password are redundant here (also find them in MqttClientSecurityParameters)
// because MqttClient.setSecurity() method can inject authentication credentials into this interface
// which will be then passed to mqtt.connect() once for all
username?: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ describe("MQTT client implementation", () => {
if (!eventReceived) {
eventReceived = true;
} else {
if (!x.data) {
done(new Error("No data received"));
return;
}
ProtocolHelpers.readStreamFully(ProtocolHelpers.toNodeStream(x.data)).then(
(received) => {
expect(JSON.parse(received.toString())).to.equal(++check);
Expand Down Expand Up @@ -134,6 +138,10 @@ describe("MQTT client implementation", () => {
if (!eventReceived) {
eventReceived = true;
} else {
if (!x.data) {
done(new Error("No data received"));
return;
}
ProtocolHelpers.readStreamFully(ProtocolHelpers.toNodeStream(x.data)).then(
(received) => {
expect(JSON.parse(received.toString())).to.equal(++check);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,10 @@ describe("MQTT client implementation", () => {
beforeEach(() => {
aedes.authenticate = function (_client, username: Readonly<string>, password: Readonly<Buffer>, done) {
if (username !== undefined) {
done(undefined, username === "user" && password.equals(Buffer.from("pass")));
done(null, username === "user" && password.equals(Buffer.from("pass")));
return;
}
done(undefined, true);
done(null, true);
};
const server = net.createServer(aedes.handle);
hostedBroker = server.listen(brokerPort);
Expand Down
3 changes: 2 additions & 1 deletion packages/binding-mqtt/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
"rootDir": "src",
"strict": true
},
"include": ["src/**/*"],
"references": [{ "path": "../td-tools" }, { "path": "../core" }]
Expand Down
Loading