Skip to content

Commit

Permalink
feat(sdk): implement cloud.Bucket inflight method signedUrl for `…
Browse files Browse the repository at this point in the history
…sim` target (#7137)

Co-authored-by: wingbot <[email protected]>
Co-authored-by: monada-bot[bot] <[email protected]>
  • Loading branch information
3 people authored Sep 17, 2024
1 parent 9d96625 commit 2f2665e
Show file tree
Hide file tree
Showing 9 changed files with 426 additions and 137 deletions.
47 changes: 5 additions & 42 deletions packages/@winglang/sdk/src/target-sim/api.inflight.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as fs from "fs";
import { Server } from "http";
import { AddressInfo, Socket } from "net";
import { join } from "path";
import express from "express";
import { IEventPublisher } from "./event-mapping";
Expand All @@ -10,7 +9,7 @@ import {
ApiRoute,
EventSubscription,
} from "./schema-resources";
import { exists } from "./util";
import { exists, isPortAvailable, listenExpress } from "./util";
import {
API_FQN,
ApiRequest,
Expand All @@ -28,8 +27,6 @@ import {
} from "../simulator/simulator";
import { LogLevel, Json, TraceType } from "../std";

const LOCALHOST_ADDRESS = "127.0.0.1";

const STATE_FILENAME = "state.json";

/**
Expand Down Expand Up @@ -114,21 +111,10 @@ export class Api
}
}

// `server.address()` returns `null` until the server is listening
// on a port. We use a promise to wait for the server to start
// listening before returning the URL.
const addrInfo: AddressInfo = await new Promise((resolve, reject) => {
this.server = this.app.listen(lastPort ?? 0, LOCALHOST_ADDRESS, () => {
const addr = this.server?.address();
if (addr && typeof addr === "object" && (addr as AddressInfo).port) {
resolve(addr);
} else {
reject(new Error("No address found"));
}
});
});
this.port = addrInfo.port;
this.url = `http://${addrInfo.address}:${addrInfo.port}`;
const { server, address } = await listenExpress(this.app, lastPort);
this.server = server;
this.port = address.port;
this.url = `http://${address.address}:${address.port}`;

this.addTrace(`Server listening on ${this.url}`, LogLevel.VERBOSE);

Expand Down Expand Up @@ -343,26 +329,3 @@ function asyncMiddleware(
Promise.resolve(fn(req, res, next)).catch(next);
};
}

async function isPortAvailable(port: number): Promise<boolean> {
return new Promise((resolve, _reject) => {
const s = new Socket();
s.once("error", (err) => {
s.destroy();
if ((err as any).code !== "ECONNREFUSED") {
resolve(false);
} else {
// connection refused means the port is not used
resolve(true);
}
});

s.once("connect", () => {
s.destroy();
// connection successful means the port is used
resolve(false);
});

s.connect(port, LOCALHOST_ADDRESS);
});
}
223 changes: 212 additions & 11 deletions packages/@winglang/sdk/src/target-sim/bucket.inflight.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import * as crypto from "crypto";
import * as fs from "fs";
import { Server } from "http";
import { dirname, join } from "path";
import * as url from "url";
import { pathToFileURL } from "url";
import express from "express";
import mime from "mime-types";
import { BucketAttributes, BucketSchema } from "./schema-resources";
import { exists } from "./util";
import { exists, isPortAvailable, listenExpress } from "./util";
import {
ITopicClient,
BucketSignedUrlOptions,
Expand All @@ -16,6 +18,8 @@ import {
BucketGetOptions,
BucketTryGetOptions,
BUCKET_FQN,
BucketSignedUrlAction,
CorsHeaders,
} from "../cloud";
import { deserialize, serialize } from "../simulator/serialization";
import {
Expand All @@ -27,19 +31,133 @@ import { Datetime, Json, LogLevel, TraceType } from "../std";

export const METADATA_FILENAME = "metadata.json";

const STATE_FILENAME = "state.json";

/**
* Contents of the state file for this resource.
*/
interface StateFileContents {
/**
* The last port used by the API server on a previous simulator run.
*/
readonly lastPort?: number;
}

export class Bucket implements IBucketClient, ISimulatorResourceInstance {
private _fileDir!: string;
private _context: ISimulatorContext | undefined;
private readonly initialObjects: Record<string, string>;
private readonly _public: boolean;
private readonly topicHandlers: Partial<Record<BucketEventType, string>>;
private _metadata: Map<string, ObjectMetadata>;
private readonly app: express.Application;
private server: Server | undefined;
private url: string | undefined;
private port: number | undefined;

public constructor(props: BucketSchema) {
this.initialObjects = props.initialObjects ?? {};
this._public = props.public ?? false;
this.topicHandlers = props.topics;
this._metadata = new Map();

this.app = express();

// Enable cors for all requests.
this.app.use((req, res, next) => {
const corsHeaders: CorsHeaders = {
defaultResponse: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, PUT",
"Access-Control-Allow-Headers": "*",
},
optionsResponse: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET, PUT",
"Access-Control-Allow-Headers": "*",
"Access-Control-Max-Age": "86400",
},
};
const method =
req.method && req.method.toUpperCase && req.method.toUpperCase();

if (method === "OPTIONS") {
for (const [key, value] of Object.entries(
corsHeaders.optionsResponse
)) {
res.setHeader(key, value);
}
res.status(204).send();
} else {
for (const [key, value] of Object.entries(
corsHeaders.defaultResponse
)) {
res.setHeader(key, value);
}
next();
}
});

// Handle signed URL uploads.
this.app.put("*", (req, res) => {
const action = req.query.action;
if (action !== BucketSignedUrlAction.UPLOAD) {
return res.status(403).send("Operation not allowed");
}

const validUntil = req.query.validUntil?.toString();
if (!validUntil || Date.now() > parseInt(validUntil)) {
return res.status(403).send("Signed URL has expired");
}

const key = req.path.slice(1); // remove leading slash
const hash = this.hashKey(key);
const filename = join(this._fileDir, hash);

const actionType: BucketEventType = this._metadata.has(key)
? BucketEventType.UPDATE
: BucketEventType.CREATE;

const contentType = req.header("content-type");
if (contentType?.startsWith("multipart/form-data")) {
return res.status(400).send("Multipart uploads not supported");
}

const fileStream = fs.createWriteStream(filename);
req.pipe(fileStream);

fileStream.on("error", () => {
res.status(500).send("Failed to save the file.");
});

fileStream.on("finish", () => {
void this.updateMetadataAndNotify(key, actionType, contentType).then(
() => {
res.status(200).send();
}
);
});

return;
});

// Handle signed URL downloads.
this.app.get("*", (req, res) => {
const action = req.query.action;
if (action !== BucketSignedUrlAction.DOWNLOAD) {
return res.status(403).send("Operation not allowed");
}

const validUntil = req.query.validUntil?.toString();
if (!validUntil || Date.now() > parseInt(validUntil)) {
return res.status(403).send("Signed URL has expired");
}

const key = req.path.slice(1); // remove leading slash
const hash = this.hashKey(key);
const filename = join(this._fileDir, hash);
return res.download(filename);
});
}

private get context(): ISimulatorContext {
Expand Down Expand Up @@ -86,22 +204,79 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
});
}

return {};
// Check for a previous state file to see if there was a port that was previously being used
// if so, try to use it out of convenience
let lastPort: number | undefined;
const state: StateFileContents = await this.loadState();
if (state.lastPort) {
const portAvailable = await isPortAvailable(state.lastPort);
if (portAvailable) {
lastPort = state.lastPort;
}
}

const { server, address } = await listenExpress(this.app, lastPort);
this.server = server;
this.port = address.port;
this.url = `http://${address.address}:${address.port}`;

this.addTrace(`Server listening on ${this.url}`, LogLevel.VERBOSE);

return {
url: this.url,
};
}

public async cleanup(): Promise<void> {}
public async cleanup(): Promise<void> {
this.addTrace(`Closing server on ${this.url}`, LogLevel.VERBOSE);

return new Promise((resolve, reject) => {
this.server?.close((err) => {
if (err) {
return reject(err);
}

this.server?.closeAllConnections();
return resolve();
});
});
}

public async plan() {
return UpdatePlan.AUTO;
}

private async loadState(): Promise<StateFileContents> {
const stateFileExists = await exists(
join(this.context.statedir, STATE_FILENAME)
);
if (stateFileExists) {
const stateFileContents = await fs.promises.readFile(
join(this.context.statedir, STATE_FILENAME),
"utf-8"
);
return JSON.parse(stateFileContents);
} else {
return {};
}
}

private async saveState(state: StateFileContents): Promise<void> {
fs.writeFileSync(
join(this.context.statedir, STATE_FILENAME),
JSON.stringify(state)
);
}

public async save(): Promise<void> {
// no need to save individual files, since they are already persisted in the state dir
// during the bucket's lifecycle
fs.writeFileSync(
join(this.context.statedir, METADATA_FILENAME),
serialize(Array.from(this._metadata.entries())) // metadata contains Datetime values, so we need to serialize it
);

await this.saveState({ lastPort: this.port });
}

private async notifyListeners(
Expand Down Expand Up @@ -291,19 +466,36 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
);
}

return url.pathToFileURL(filePath).href;
return pathToFileURL(filePath).href;
},
});
}

public async signedUrl(key: string, options?: BucketSignedUrlOptions) {
options;
return this.context.withTrace({
message: `Signed URL (key=${key})`,
message: `Signed URL (key=${key}).`,
activity: async () => {
throw new Error(
`signedUrl is not implemented yet for the simulator (key=${key})`
const action = options?.action ?? BucketSignedUrlAction.DOWNLOAD;
// BUG: The `options?.duration` is supposed to be an instance of `Duration` but it is not. It's just
// a POJO with seconds, but TypeScript thinks otherwise.
const duration = options?.duration?.seconds ?? 900;

if (
action === BucketSignedUrlAction.DOWNLOAD &&
!(await this.exists(key))
) {
throw new Error(
`Cannot provide signed url for a non-existent key (key=${key})`
);
}

const url = new URL(key, this.url);
url.searchParams.set("action", action);
url.searchParams.set(
"validUntil",
String(Date.now() + duration * 1000)
);
return url.toString();
},
});
}
Expand Down Expand Up @@ -370,10 +562,19 @@ export class Bucket implements IBucketClient, ISimulatorResourceInstance {
await fs.promises.mkdir(dirName, { recursive: true });
await fs.promises.writeFile(filename, value);

await this.updateMetadataAndNotify(key, actionType, contentType);
}

private async updateMetadataAndNotify(
key: string,
actionType: BucketEventType,
contentType?: string
): Promise<void> {
const hash = this.hashKey(key);
const filename = join(this._fileDir, hash);
const filestat = await fs.promises.stat(filename);
const determinedContentType =
(contentType ?? mime.lookup(key)) || "application/octet-stream";

contentType ?? (mime.lookup(key) || "application/octet-stream");
this._metadata.set(key, {
size: filestat.size,
lastModified: Datetime.fromDate(filestat.mtime),
Expand Down
Loading

0 comments on commit 2f2665e

Please sign in to comment.