diff --git a/packages/binding-coap/src/coap-server.ts b/packages/binding-coap/src/coap-server.ts index 091bda57a..a9543ce5d 100644 --- a/packages/binding-coap/src/coap-server.ts +++ b/packages/binding-coap/src/coap-server.ts @@ -32,11 +32,15 @@ import { Server, createServer, registerFormat, IncomingMessage, OutgoingMessage import slugify from "slugify"; import { Readable } from "stream"; import { MdnsIntroducer } from "./mdns-introducer"; +import { PropertyElement, DataSchema } from "wot-thing-description-types"; const { debug, warn, info, error } = createLoggers("binding-coap", "coap-server"); type CoreLinkFormatParameters = Map; +// TODO: Move to core? +type AugmentedInteractionOptions = WoT.InteractionOptions & { formIndex: number }; + const thingDescriptionParameters: CoreLinkFormatParameters = new Map( Object.entries({ rt: ["wot.thing"], @@ -56,8 +60,8 @@ export default class CoapServer implements ProtocolServer { private readonly ACTION_DIR = "actions"; private readonly EVENT_DIR = "events"; - private readonly port: number = 5683; - private readonly address?: string = undefined; + private readonly port: number; + private readonly address?: string; private mdnsIntroducer: MdnsIntroducer; @@ -73,12 +77,8 @@ export default class CoapServer implements ProtocolServer { private readonly coreResources = new Map(); constructor(port?: number, address?: string) { - if (port !== undefined) { - this.port = port; - } - if (address !== undefined) { - this.address = address; - } + this.port = port ?? 5683; + this.address = address; // WoT-specific content formats registerFormat(ContentSerdes.JSON_LD, 2100); @@ -307,15 +307,13 @@ export default class CoapServer implements ProtocolServer { } private handleWellKnownCore(req: IncomingMessage, res: OutgoingMessage) { - if (req.method === "GET") { - res.setOption("Content-Format", "application/link-format"); - res.code = "2.05"; - const payload = this.formatCoreLinkFormatResources(); - res.end(payload); - } else { - res.code = "4.05"; - res.end("Method Not Allowed"); + if (req.method !== "GET") { + this.sendMethodNotAllowedResponse(res); + return; } + + const payload = this.formatCoreLinkFormatResources(); + this.sendContentResponse(res, payload, "application/link-format"); } /** @@ -337,8 +335,7 @@ export default class CoapServer implements ProtocolServer { */ private async handleTdRequest(req: IncomingMessage, res: OutgoingMessage, thing: ExposedThing) { if (req.method !== "GET") { - res.code = "4.05"; - res.end("Method Not Allowed"); + this.sendMethodNotAllowedResponse(res); return; } @@ -346,21 +343,22 @@ export default class CoapServer implements ProtocolServer { const contentSerdes = ContentSerdes.get(); - if (accept == null || (typeof accept === "string" && contentSerdes.isSupported(accept))) { - debug(`Received an available or no Content-Format (${accept}) in Accept option.`); - const contentFormat = (accept as string) ?? ContentSerdes.TD; - res.setHeader("Content-Format", contentFormat); - res.code = "2.05"; + const isUnsupportedAcceptValue = typeof accept === "string" && !contentSerdes.isSupported(accept); - const content = contentSerdes.valueToContent(thing.getThingDescription(), undefined, contentFormat); - const payload = await ProtocolHelpers.readStreamFully(content.body); - debug(`Sending CoAP response for TD with Content-Format ${contentFormat}.`); - res.end(payload); - } else { + if (isUnsupportedAcceptValue) { debug(`Request contained an accept option with value ${accept} which is not supported.`); - res.code = "4.06"; - res.end(`Content-Format ${accept} is not supported by this resource.`); + this.sendErrorResponse(res, "4.06", `Content-Format ${accept} is not supported by this resource.`); + return; } + + debug(`Received an available or no Content-Format (${accept}) in Accept option.`); + const contentFormat = (accept as string) ?? ContentSerdes.TD; + + const content = contentSerdes.valueToContent(thing.getThingDescription(), undefined, contentFormat); + const payload = await ProtocolHelpers.readStreamFully(content.body); + + debug(`Sending CoAP response for TD with Content-Format ${contentFormat}.`); + this.sendContentResponse(res, payload, contentFormat); } private async handleRequest(req: IncomingMessage, res: OutgoingMessage) { @@ -377,341 +375,419 @@ export default class CoapServer implements ProtocolServer { ); }); - const requestUri = req.url; - let contentType = req.headers["Content-Format"] as string; + const contentType = this.getContentTypeFromRequest(req); + const method = req.method; - if (req.method === "PUT" || req.method === "POST") { - if (!contentType && req.payload) { - warn( - `CoapServer on port ${this.getPort()} received no Content-Format from ${Helpers.toUriLiteral( - req.rsinfo.address - )}:${req.rsinfo.port}` - ); - contentType = ContentSerdes.DEFAULT; - } else if ( - ContentSerdes.get().getSupportedMediaTypes().indexOf(ContentSerdes.getMediaType(contentType)) < 0 - ) { - res.code = "4.15"; - res.end("Unsupported Media Type"); - return; - } + if (!this.checkContentTypeSupportForInput(method, contentType)) { + this.sendErrorResponse(res, "4.15", "Unsupported Media Type"); + return; } - // route request - let parsedRequestUri = requestUri; - if (parsedRequestUri.indexOf("?") !== -1) { - parsedRequestUri = parsedRequestUri.substring(0, parsedRequestUri.indexOf("?")); - } - const segments = decodeURI(parsedRequestUri).split("/"); - - if (segments[1] === "") { - // no path -> list all Things - if (req.method === "GET") { - res.setHeader("Content-Format", ContentSerdes.DEFAULT); - res.code = "2.05"; - const list = []; - for (const address of Helpers.getAddresses()) { - // FIXME are Iterables really such a non-feature that I need array? - for (const name of Array.from(this.things.keys())) { - list.push( - this.scheme + - "://" + - Helpers.toUriLiteral(address) + - ":" + - this.getPort() + - "/" + - encodeURIComponent(name) - ); - } + const requestUri = this.processRequestUri(req); + + if (requestUri === "/") { + this.handleThingsRequest(method, res); + return; + } + + if (requestUri === "/.well-known/core") { + this.handleWellKnownCore(req, res); + return; + } + + const { thingKey, affordanceType, affordanceKey } = this.parseUriSegments(requestUri); + const thing = this.things.get(thingKey); + + if (thing == null) { + this.sendNotFoundResponse(res); + return; + } + + // TODO: Remove support for trailing slashes (or rather: empty URI segments) + if (affordanceType == null || affordanceType === "") { + await this.handleTdRequest(req, res, thing); + return; + } + + switch (affordanceType) { + case this.PROPERTY_DIR: + this.handlePropertyRequest(thing, affordanceKey, req, res, contentType); + break; + case this.ACTION_DIR: + this.handleActionRequest(thing, affordanceKey, req, res, contentType); + break; + case this.EVENT_DIR: + this.handleEventRequest(thing, affordanceKey, req, res, contentType); + break; + default: + this.sendNotFoundResponse(res); + } + } + + private processRequestUri(req: IncomingMessage) { + const uri = req.url; + + if (uri.includes("?")) { + return uri.substring(0, uri.indexOf("?")); + } + + return uri; + } + + private handleThingsRequest(method: string, res: OutgoingMessage) { + if (method !== "GET") { + this.sendMethodNotAllowedResponse(res); + return; + } + + const payload = JSON.stringify(this.getThingDescriptionPayload()); + this.sendContentResponse(res, payload, ContentSerdes.DEFAULT); + } + + private async handlePropertyRequest( + thing: ExposedThing, + affordanceKey: string, + req: IncomingMessage, + res: OutgoingMessage, + contentType?: string + ) { + const property = thing.properties[affordanceKey]; + + if (property == null) { + this.sendNotFoundResponse(res); + return; + } + + switch (req.method) { + case "GET": + if (req.headers.Observe == null) { + this.handleReadProperty(property, req, contentType, thing, res, affordanceKey); + } else { + this.handleObserveProperty(req, thing, res, affordanceKey); } - res.end(JSON.stringify(list)); + break; + case "PUT": + if (property.readOnly === true) { + this.sendErrorResponse(res, "4.00", "Property readOnly"); + return; + } + + this.handleWriteProperty(property, req, contentType, thing, res, affordanceKey); + break; + default: + this.sendMethodNotAllowedResponse(res); + } + } + + private async handleReadProperty( + property: PropertyElement, + req: IncomingMessage, + contentType: string, + thing: ExposedThing, + res: OutgoingMessage, + affordanceKey: string + ) { + try { + const interactionOptions = this.createInteractionOptions( + property.forms, + property.uriVariables, + thing, + req, + contentType + ); + const content = await thing.handleReadProperty(affordanceKey, interactionOptions); + this.streamContentResponse(res, content); + } catch (err) { + error(`CoapServer on port ${this.getPort()} got internal error on read '${req.url}': ${err.message}`); + this.sendErrorResponse(res, "5.00", err.message); + } + } + + private async handleObserveProperty( + req: IncomingMessage, + thing: ExposedThing, + res: OutgoingMessage, + affordanceKey: string + ) { + const listener = async (content: Content) => { + try { + this.streamContentResponse(res, content, { end: true }); + } catch (err) { + error(`CoapServer on port ${this.getPort()} got internal error on read '${req.url}': ${err.message}`); + this.sendErrorResponse(res, "5.00", err.message); + } + }; + + thing + .handleObserveProperty(affordanceKey, listener, null) + .then(() => res.end()) + .catch(() => res.end()); + + res.on("finish", (err: Error) => { + if (err) { + error(`CoapServer on port ${this.port} failed on observe with: ${err.message}`); + } + thing.handleUnobserveProperty(affordanceKey, listener, null); + }); + + setTimeout(() => thing.handleUnobserveProperty(affordanceKey, listener, null), 60 * 60 * 1000); + } + + private async handleWriteProperty( + property: PropertyElement, + req: IncomingMessage, + contentType: string, + thing: ExposedThing, + res: OutgoingMessage, + affordanceKey: string + ) { + try { + const interactionOptions = this.createInteractionOptions( + property.forms, + property.uriVariables, + thing, + req, + contentType + ); + await thing.handleWriteProperty( + affordanceKey, + new Content(contentType, Readable.from(req.payload)), + interactionOptions + ); + this.sendChangedResponse(res); + } catch (err) { + error(`CoapServer on port ${this.getPort()} got internal error on write '${req.url}': ${err.message}`); + this.sendErrorResponse(res, "5.00", err.message); + } + } + + private async handleActionRequest( + thing: ExposedThing, + affordanceKey: string, + req: IncomingMessage, + res: OutgoingMessage, + contentType?: string + ) { + const action = thing.actions[affordanceKey]; + + if (action == null) { + this.sendNotFoundResponse(res); + return; + } + + if (req.method !== "POST") { + this.sendMethodNotAllowedResponse(res); + return; + } + + const interactionOptions = this.createInteractionOptions( + action.forms, + action.uriVariables, + thing, + req, + contentType + ); + try { + const output = await thing.handleInvokeAction( + affordanceKey, + new Content(contentType, Readable.from(req.payload)), + interactionOptions + ); + if (output) { + this.streamContentResponse(res, output, { end: true }); } else { - res.code = "4.05"; - res.end("Method Not Allowed"); + this.sendChangedResponse(res); } - // resource found and response sent + } catch (err) { + error(`CoapServer on port ${this.getPort()} got internal error on invoke '${req.url}': ${err.message}`); + this.sendErrorResponse(res, "5.00", err.message); + } + } + + private createInteractionOptions( + forms: TD.Form[], + affordanceUriVariables: { [k: string]: DataSchema }, + thing: ExposedThing, + req: IncomingMessage, + contentType: string + ) { + const options: AugmentedInteractionOptions = { + formIndex: ProtocolHelpers.findRequestMatchingFormIndex(forms, this.scheme, req.url, contentType), + }; + const uriVariables = Helpers.parseUrlParameters(req.url, thing.uriVariables, affordanceUriVariables); + if (!this.isEmpty(uriVariables)) { + options.uriVariables = uriVariables; + } + + return options; + } + + private async handleEventRequest( + thing: ExposedThing, + affordanceKey: string, + req: IncomingMessage, + res: OutgoingMessage, + contentType?: string + ) { + const event = thing.events[affordanceKey]; + + if (event == null) { + this.sendNotFoundResponse(res); return; - } else if (parsedRequestUri === "/.well-known/core") { - this.handleWellKnownCore(req, res); + } + + if (req.method !== "GET") { + this.sendMethodNotAllowedResponse(res); return; - } else { - // path -> select Thing - const thing = this.things.get(segments[1]); - if (thing) { - if (segments.length === 2 || segments[2] === "") { - // Thing root -> send TD - await this.handleTdRequest(req, res, thing); - return; - } else if (segments[2] === this.PROPERTY_DIR) { - // sub-path -> select Property - const property = thing.properties[segments[3]]; - if (property) { - if (req.method === "GET") { - // readproperty - if (req.headers.Observe === undefined) { - try { - const options: WoT.InteractionOptions & { formIndex: number } = { - formIndex: ProtocolHelpers.findRequestMatchingFormIndex( - property.forms, - this.scheme, - req.url, - contentType - ), - }; - const uriVariables = Helpers.parseUrlParameters( - req.url, - thing.uriVariables, - property.uriVariables - ); - if (!this.isEmpty(uriVariables)) { - options.uriVariables = uriVariables; - } - const content = await thing.handleReadProperty(segments[3], options); - res.setOption("Content-Format", content.type); - res.code = "2.05"; - content.body.pipe(res); - } catch (err) { - error( - `CoapServer on port ${this.getPort()} got internal error on read '${requestUri}': ${ - err.message - }` - ); - res.code = "5.00"; - res.end(err.message); - } - // observeproperty - } else { - const listener = async (content: Content) => { - try { - res.setOption("Content-Format", content.type); - res.code = "2.05"; - // send event data - content.body.pipe(res, { end: true }); - } catch (err) { - error( - `CoapServer on port ${this.getPort()} got internal error on read '${requestUri}': ${ - err.message - }` - ); - res.code = "5.00"; - res.end(err.message); - } - }; - - thing - .handleObserveProperty(segments[3], listener, null) - .then(() => res.end()) - .catch(() => res.end()); - - res.on("finish", (err: Error) => { - if (err) { - error(`CoapServer on port ${this.port} failed on observe with: ${err.message}`); - } - thing.handleUnobserveProperty(segments[3], listener, null); - }); - - setTimeout( - () => thing.handleUnobserveProperty(segments[3], listener, null), - 60 * 60 * 1000 - ); - } - // writeproperty - } else if (req.method === "PUT") { - if (!property.readOnly) { - try { - const options: WoT.InteractionOptions & { formIndex: number } = { - formIndex: ProtocolHelpers.findRequestMatchingFormIndex( - property.forms, - this.scheme, - req.url, - contentType - ), - }; - await thing.handleWriteProperty( - segments[3], - new Content(contentType, Readable.from(req.payload)), - options - ); - res.code = "2.04"; - res.end("Changed"); - } catch (err) { - error( - `CoapServer on port ${this.getPort()} got internal error on write '${requestUri}': ${ - err.message - }` - ); - res.code = "5.00"; - res.end(err.message); - } - } else { - res.code = "4.00"; - res.end("Property readOnly"); - } - } else { - res.code = "4.05"; - res.end("Method Not Allowed"); - } - // resource found and response sent - return; - } // Property exists? - } else if (segments[2] === this.ACTION_DIR) { - // sub-path -> select Action - const action = thing.actions[segments[3]]; - if (action) { - // invokeaction - if (req.method === "POST") { - const options: WoT.InteractionOptions & { formIndex: number } = { - formIndex: ProtocolHelpers.findRequestMatchingFormIndex( - action.forms, - this.scheme, - req.url, - contentType - ), - }; - const uriVariables = Helpers.parseUrlParameters( - req.url, - thing.uriVariables, - action.uriVariables - ); - if (!this.isEmpty(uriVariables)) { - options.uriVariables = uriVariables; - } - try { - const output = await thing.handleInvokeAction( - segments[3], - new Content(contentType, Readable.from(req.payload)), - options - ); - if (output) { - res.setOption("Content-Format", output.type); - res.code = "2.05"; - output.body.pipe(res, { end: true }); - } else { - res.code = "2.04"; - res.end(); - } - } catch (err) { - error( - `CoapServer on port ${this.getPort()} got internal error on invoke '${requestUri}': ${ - err.message - }` - ); - res.code = "5.00"; - res.end(err.message); - } - } else { - res.code = "4.05"; - res.end("Method Not Allowed"); - } - // resource found and response sent - return; - } // Action exists? - } else if (segments[2] === this.EVENT_DIR) { - // sub-path -> select Event - const event = thing.events[segments[3]]; - if (event) { - // subscribeevent - if (req.method === "GET") { - if (req.headers.Observe === 0) { - // work-around to avoid duplicate requests (resend due to no response) - // (node-coap does not deduplicate when Observe is set) - const packet = res._packet; - packet.code = "0.00"; - packet.payload = Buffer.from(""); - packet.reset = false; - packet.ack = true; - packet.token = Buffer.alloc(0); - - res._send(res, packet); - - res._packet.confirmable = res._request.confirmable; - res._packet.token = res._request.token; - // end of work-around - - const options: WoT.InteractionOptions & { formIndex: number } = { - formIndex: ProtocolHelpers.findRequestMatchingFormIndex( - event.forms, - this.scheme, - req.url, - contentType - ), - }; - const uriVariables = Helpers.parseUrlParameters( - req.url, - thing.uriVariables, - event.uriVariables - ); - if (!this.isEmpty(uriVariables)) { - options.uriVariables = uriVariables; - } - const listener = async (value: Content) => { - try { - // send event data - debug( - `CoapServer on port ${this.getPort()} sends '${ - segments[3] - }' notification to ${Helpers.toUriLiteral(req.rsinfo.address)}:${ - req.rsinfo.port - }` - ); - res.setOption("Content-Format", value.type); - res.code = "2.05"; - value.body.pipe(res); - } catch (err) { - debug( - `CoapServer on port ${this.getPort()} failed '${segments[3]}' subscription` - ); - res.code = "5.00"; - res.end(); - } - }; - - thing - .handleSubscribeEvent(segments[3], listener, options) - .then(() => res.end()) - .catch(() => res.end()); - res.on("finish", () => { - debug( - `CoapServer on port ${this.getPort()} ends '${ - segments[3] - }' observation from ${Helpers.toUriLiteral(req.rsinfo.address)}:${ - req.rsinfo.port - }` - ); - thing.handleUnsubscribeEvent(segments[3], listener, options); - }); - } else if (req.headers.Observe > 0) { - debug( - `CoapServer on port ${this.getPort()} sends '${ - segments[3] - }' response to ${Helpers.toUriLiteral(req.rsinfo.address)}:${req.rsinfo.port}` - ); - // node-coap does not support GET cancellation - res.code = "5.01"; - res.end("node-coap issue: no GET cancellation, send RST"); - } else { - debug( - `CoapServer on port ${this.getPort()} rejects '${ - segments[3] - }' read from ${Helpers.toUriLiteral(req.rsinfo.address)}:${req.rsinfo.port}` - ); - res.code = "4.00"; - res.end("No Observe Option"); - } - } else { - res.code = "4.05"; - res.end("Method Not Allowed"); - } - // resource found and response sent - return; - } // Event exists? + } + + const observe = req.headers.Observe as number; + + if (observe == null) { + debug( + `CoapServer on port ${this.getPort()} rejects '${affordanceKey}' read from ${Helpers.toUriLiteral( + req.rsinfo.address + )}:${req.rsinfo.port}` + ); + this.sendErrorResponse(res, "4.00", "No Observe Option"); + return; + } + + if (observe === 0) { + // work-around to avoid duplicate requests (resend due to no response) + // (node-coap does not deduplicate when Observe is set) + const packet = res._packet; + packet.code = "0.00"; + packet.payload = Buffer.from(""); + packet.reset = false; + packet.ack = true; + packet.token = Buffer.alloc(0); + + res._send(res, packet); + + res._packet.confirmable = res._request.confirmable; + res._packet.token = res._request.token; + // end of work-around + + const options: AugmentedInteractionOptions = { + formIndex: ProtocolHelpers.findRequestMatchingFormIndex(event.forms, this.scheme, req.url, contentType), + }; + const uriVariables = Helpers.parseUrlParameters(req.url, thing.uriVariables, event.uriVariables); + if (!this.isEmpty(uriVariables)) { + options.uriVariables = uriVariables; + } + const listener = async (value: Content) => { + try { + debug( + `CoapServer on port ${this.getPort()} sends '${affordanceKey}' notification to ${Helpers.toUriLiteral( + req.rsinfo.address + )}:${req.rsinfo.port}` + ); + this.streamContentResponse(res, value); + } catch (err) { + debug(`CoapServer on port ${this.getPort()} failed '${affordanceKey}' subscription`); + this.sendErrorResponse(res, "5.00", `Subscription to event failed`); } - } // Thing exists? + }; + + thing + .handleSubscribeEvent(affordanceKey, listener, options) + .then(() => res.end()) + .catch(() => res.end()); + res.on("finish", () => { + debug( + `CoapServer on port ${this.getPort()} ends '${affordanceKey}' observation from ${Helpers.toUriLiteral( + req.rsinfo.address + )}:${req.rsinfo.port}` + ); + thing.handleUnsubscribeEvent(affordanceKey, listener, options); + }); + } else if (observe > 0) { + debug( + `CoapServer on port ${this.getPort()} sends '${affordanceKey}' response to ${Helpers.toUriLiteral( + req.rsinfo.address + )}:${req.rsinfo.port}` + ); + // TODO: Check if this has been fixed in the meantime + // node-coap does not support GET cancellation + this.sendErrorResponse(res, "5.01", "node-coap issue: no GET cancellation, send RST"); } + } + + private getContentTypeFromRequest(req: IncomingMessage): string { + const contentType = req.headers["Content-Format"] as string; + + if (contentType == null) { + warn( + `CoapServer on port ${this.getPort()} received no Content-Format from ${Helpers.toUriLiteral( + req.rsinfo.address + )}:${req.rsinfo.port}` + ); + } + + return contentType ?? ContentSerdes.DEFAULT; + } + + private checkContentTypeSupportForInput(method: string, contentType: string) { + const methodsWithPayload: string[] = ["PUT", "POST", "FETCH", "iPATCH", "PATCH"]; + const notAMethodWithPayload = !methodsWithPayload.includes(method); + + return ( + notAMethodWithPayload || + ContentSerdes.get().getSupportedMediaTypes().includes(ContentSerdes.getMediaType(contentType)) + ); + } + + private getThingDescriptionPayload() { + return Helpers.getAddresses().flatMap((address) => + Array.from(this.things.keys()).map( + (thingKey) => + `${this.scheme}://${Helpers.toUriLiteral(address)}:${this.getPort()}/${encodeURIComponent( + thingKey + )}` + ) + ); + } + + private parseUriSegments(requestUri: string) { + const segments = decodeURI(requestUri).split("/"); + + return { + thingKey: segments[1], + affordanceType: segments[2], + affordanceKey: segments[3], + }; + } + + private sendContentResponse(res: OutgoingMessage, payload: Buffer | string, contentType: string) { + res.setOption("Content-Format", contentType); + res.code = "2.05"; + res.end(payload); + } + + private sendChangedResponse(res: OutgoingMessage, payload?: Buffer | string) { + res.code = "2.04"; + res.end(payload); + } + + // TODO: The name of this method might not be ideal yet. + private streamContentResponse(res: OutgoingMessage, content: Content, options?: { end?: boolean | undefined }) { + res.setOption("Content-Format", content.type); + res.code = "2.05"; + content.body.pipe(res, options); + } + + private sendNotFoundResponse(res: OutgoingMessage) { + this.sendErrorResponse(res, "4.04", "Not Found"); + } + + private sendMethodNotAllowedResponse(res: OutgoingMessage) { + this.sendErrorResponse(res, "4.05", "Method Not Allowed"); + } - // resource not found - res.code = "4.04"; - res.end("Not Found"); + private sendErrorResponse(res: OutgoingMessage, errorCode: string, errorMessage: string) { + res.code = errorCode; + res.end(errorMessage); } private isEmpty(obj: Record) {