Skip to content

Commit

Permalink
feat(containers): forwarding (#239)
Browse files Browse the repository at this point in the history
Introduce an API that allows forwarding various events to containers. Currently only implemented for `sim`.

The `workload.forward()` method returns an `IForward` object with a `fromXxx()` method for each supported handler type.

For example, this is how you can forward `cloud.Api` requests:

```js
let work = new containers.Workload(...);
let api = new cloud.Api();
api.get("/my_request", work.forward().fromApi());
```

You can pass an optional `route` and `method` to `forward()` in order to customize the behavior:

```js
work.forward(route: "/your_request", method: cloud.HttpMethod.PUT);
```
  • Loading branch information
eladb authored May 18, 2024
1 parent c09cb43 commit e0acf87
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 10 deletions.
19 changes: 19 additions & 0 deletions containers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,25 @@ new containers.Workload(
);
```

## Forwarding

The `workload.forward()` method returns an `IForward` object with a `fromXxx()` method for each
supported handler type.

For example, this is how you can forward `cloud.Api` requests:

```js
let work = new containers.Workload(...);
let api = new cloud.Api();
api.get("/my_request", work.forward().fromApi());
```

You can pass an optional `route` and `method` to `forward()` in order to customize the behavior:

```js
work.forward(route: "/your_request", method: cloud.HttpMethod.PUT);
```

## `sim`

When executed in the Wing Simulator, the workload is started within a local Docker container.
Expand Down
18 changes: 18 additions & 0 deletions containers/api.w
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
bring cloud;

pub struct ContainerOpts {
name: str;
Expand Down Expand Up @@ -27,3 +28,20 @@ pub struct ContainerOpts {
pub struct WorkloadProps extends ContainerOpts {

}

pub struct ForwardOptions {
route: str?;
method: cloud.HttpMethod?;
}

pub interface IWorkload {
forward(opts: ForwardOptions?): IForward;
}

pub interface IForward {
fromApi(): cloud.IApiEndpointHandler;
fromQueue(): cloud.IQueueSetConsumerHandler;
fromTopic(): cloud.ITopicOnMessageHandler;
fromSchedule(): cloud.IScheduleOnTickHandler;
fromBucketEvent(): cloud.IBucketEventHandler;
}
99 changes: 99 additions & 0 deletions containers/forwarders.test.w
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
bring cloud;
bring util;
bring http;
bring expect;
bring "./workload.w" as w;

let workload = new w.Workload(
image: "./test/forwarders",
name: "forwarders",
port: 3000,
public: true,
);

let requests = inflight (): Array<Json> => {
let response = http.get("{workload.publicUrl!}/requests");
assert(response.ok);
return Json.values(Json.parse(response.body));
};

let api = new cloud.Api();
api.get("/get-api", workload.forward().fromApi());
api.post("/post-api", workload.forward().fromApi());

api.get("/foof", workload.forward(route: "/foo").fromApi());

test "api forwarding" {
http.get("{api.url}/get-api?hello=world");

expect.equal(requests(), [
{ method: "GET", url: "/get-api" }
]);

http.post("{api.url}/post-api", body: "hello, body!");

expect.equal(requests(), [
{ method: "GET", url: "/get-api" },
{ method: "POST", url: "/post-api", body: "hello, body!" }
]);
}

let queue1 = new cloud.Queue() as "queue1";
let queue2 = new cloud.Queue() as "queue2";
queue1.setConsumer(workload.forward(route: "/queue_message", method: cloud.HttpMethod.PUT).fromQueue());
queue2.setConsumer(workload.forward().fromQueue());

test "queue forwarding" {
queue1.push("message1");
util.waitUntil(() => { return requests().length == 1; });

queue1.push("message2");
util.waitUntil(() => { return requests().length == 2; });

expect.equal(requests(), [
{ method: "PUT", url: "/queue_message", body: "message1" },
{ method: "PUT", url: "/queue_message", body: "message2" },
]);

queue2.push("message3");
util.waitUntil(() => { return requests().length == 3; });

expect.equal(requests(), [
{ method: "PUT", url: "/queue_message", body: "message1" },
{ method: "PUT", url: "/queue_message", body: "message2" },
{ method: "POST", url: "/", body: "message3" },
]);
}

let topic = new cloud.Topic();
topic.onMessage(workload.forward(route: "/my_topic").fromTopic());

test "subscribe to topic" {
topic.publish("message from topic!");
util.waitUntil(() => { return requests().length == 1; });

expect.equal(requests(), [
{ method: "POST", url: "/my_topic", body: "message from topic!" },
]);
}

let bucket = new cloud.Bucket();
bucket.onCreate(workload.forward(route: "/object-created").fromBucketEvent());

test "forward bucket events" {
bucket.put("object1", "content1");
util.waitUntil(() => { return requests().length == 1; });

expect.equal(requests(), [
{ method: "POST", url: "/object-created", body: Json.stringify({"key":"object1","type":"create"}), },
]);
}

let schedule = new cloud.Schedule(rate: 1m);
schedule.onTick(workload.forward(route: "/tick", method: cloud.HttpMethod.GET).fromSchedule());

test "forward schedule events" {
util.waitUntil(() => { return requests().length >= 1; });

expect.equal(requests()[0], { method: "GET", url: "/tick" });
}
10 changes: 5 additions & 5 deletions containers/helm.extern.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export interface MetadataEntry {
export class Node {
/** Add an ordering dependency on another construct.
An `IDependable` */
readonly addDependency: (deps?: ((readonly (IDependable)[])) | undefined) => void;
readonly addDependency: (deps: (readonly (IDependable)[])) => void;
/** Adds a metadata entry to this construct.
Entries are arbitrary values and will also include a stack trace to allow tracing back to
the code location for when the entry was added. It can be used, for example, to include source
Expand Down Expand Up @@ -202,7 +202,7 @@ export class ApiObjectMetadataDefinition {
/** Add an annotation. */
readonly addAnnotation: (key: string, value: string) => void;
/** Add one or more finalizers. */
readonly addFinalizers: (finalizers?: ((readonly (string)[])) | undefined) => void;
readonly addFinalizers: (finalizers: (readonly (string)[])) => void;
/** Add a label. */
readonly addLabel: (key: string, value: string) => void;
/** Add an owner. */
Expand All @@ -224,10 +224,10 @@ export class ApiObjectMetadataDefinition {
export class ApiObject extends Construct {
/** Create a dependency between this ApiObject and other constructs.
These can be other ApiObjects, Charts, or custom. */
readonly addDependency: (dependencies?: ((readonly (IConstruct)[])) | undefined) => void;
readonly addDependency: (dependencies: (readonly (IConstruct)[])) => void;
/** Applies a set of RFC-6902 JSON-Patch operations to the manifest synthesized for this API object.
kubePod.addJsonPatch(JsonPatch.replace('/spec/enableServiceLinks', true)); */
readonly addJsonPatch: (ops?: ((readonly (JsonPatch)[])) | undefined) => void;
readonly addJsonPatch: (ops: (readonly (JsonPatch)[])) => void;
/** The group portion of the API version (e.g. `authorization.k8s.io`). */
readonly apiGroup: string;
/** The object's API version (e.g. `authorization.k8s.io/v1`). */
Expand All @@ -252,7 +252,7 @@ export class ApiObject extends Construct {
export class Chart extends Construct {
/** Create a dependency between this Chart and other constructs.
These can be other ApiObjects, Charts, or custom. */
readonly addDependency: (dependencies?: ((readonly (IConstruct)[])) | undefined) => void;
readonly addDependency: (dependencies: (readonly (IConstruct)[])) => void;
/** Returns all the included API objects. */
readonly apiObjects: (readonly (ApiObject)[]);
/** Generates a app-unique name for an object given it's construct node path.
Expand Down
6 changes: 5 additions & 1 deletion containers/helm.w
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ bring "./api.w" as api;
bring "cdk8s-plus-27" as plus;
bring "cdk8s" as cdk8s;

pub class Chart extends cdk8s.Chart {
pub class Chart extends cdk8s.Chart impl api.IWorkload {
name: str;

new(props: api.WorkloadProps) {
Expand Down Expand Up @@ -83,5 +83,9 @@ pub class Chart extends cdk8s.Chart {
return Chart.toHelmChart(workdir, this);
}

pub forward(opts: api.ForwardOptions?): api.IForward {
throw "Not implemented";
}

extern "./helm.js" pub static toHelmChart(wingdir: str, chart: cdk8s.Chart): str;
}
2 changes: 1 addition & 1 deletion containers/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@winglibs/containers",
"version": "0.1.0",
"version": "0.1.1",
"description": "Container support for Wing",
"repository": {
"type": "git",
Expand Down
4 changes: 4 additions & 0 deletions containers/test/forwarders/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM node:20.8.0-alpine
EXPOSE 3000
ADD index.js /app/index.js
ENTRYPOINT [ "/app/index.js" ]
42 changes: 42 additions & 0 deletions containers/test/forwarders/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env node
const http = require('http');

const requests = [];

process.on('SIGTERM', () => {
console.info("Interrupted")
process.exit(0)
});

const server = http.createServer((req, res) => {

if (req.url === '/requests') {
res.writeHead(200, { 'Content-Type': 'application/json' });
return res.end(JSON.stringify(requests));
}

console.log(`request received: ${req.method} ${req.url}`);

const body = [];
req.on("data", (data) => {
body.push(data);
});

req.on("end", () => {
let s = Buffer.concat(body).toString();
if (s.length === 0) {
s = undefined;
}

requests.push({
method: req.method,
url: req.url,
body: s,
});

res.end('OK');
});
});

console.log('listening on port 3000');
server.listen(3000);
102 changes: 101 additions & 1 deletion containers/workload.sim.w
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ bring sim;
bring ui;
bring "./api.w" as api;

pub class Workload_sim {
pub class Workload_sim impl api.IWorkload {
pub publicUrl: str?;
pub internalUrl: str?;

Expand Down Expand Up @@ -64,6 +64,14 @@ pub class Workload_sim {
}
}

pub forward(opts: api.ForwardOptions?): api.IForward {
if this.publicUrl == nil {
throw "Cannot forward requests to a non-public container";
}

return new Forward(this.publicUrl!, opts) as "forward_{util.nanoid()}";
}

toEnv(input: Map<str?>?): Map<str> {
let env = MutMap<str>{};
let i = input ?? {};
Expand All @@ -76,3 +84,95 @@ pub class Workload_sim {
return env.copy();
}
}

class Forward impl api.IForward {
containerUrl: str;
route: str?;
method: cloud.HttpMethod?;

new(containerUrl: str, opts: api.ForwardOptions?) {
this.containerUrl = containerUrl;
this.route = opts?.route;
this.method = opts?.method;

if let r = this.route {
if !r.startsWith("/") {
throw "Route must start with '/'";
}
}
}

pub fromApi(): cloud.IApiEndpointHandler {
return inflight (request) => {
let var body = request.body;
if request.method == cloud.HttpMethod.GET || request.method == cloud.HttpMethod.HEAD {
body = nil;
}

let response = http.fetch("{this.containerUrl}{request.path}", {
body: body,
headers: request.headers,
method: request.method,
});

return {
body: response.body,
status: response.status,
headers: response.headers
};
};
}

pub fromQueue(): cloud.IQueueSetConsumerHandler {
return inflight (message) => {
let route = this.route ?? "/";
let method = this.method ?? cloud.HttpMethod.POST;
http.fetch("{this.containerUrl}{route}", {
body: message,
method: method,
});
};
}

pub fromTopic(): cloud.ITopicOnMessageHandler {
return inflight (message) => {
let route = this.route ?? "/";
let method = this.method ?? cloud.HttpMethod.POST;
http.fetch("{this.containerUrl}{route}", {
body: message,
method: method,
});
};
}

pub fromSchedule(): cloud.IScheduleOnTickHandler {
return inflight () => {
let route = this.route ?? "/";
let method = this.method ?? cloud.HttpMethod.GET;

http.fetch("{this.containerUrl}{route}", {
method: method,
});
};
}

pub fromBucketEvent(): cloud.IBucketEventHandler {
return inflight (key, type) => {
let route = this.route ?? "/";
let method = this.method ?? cloud.HttpMethod.POST;
let stype = () => {
if type == cloud.BucketEventType.CREATE { return "create"; }
if type == cloud.BucketEventType.UPDATE { return "update"; }
if type == cloud.BucketEventType.DELETE { return "delete"; }
}();

http.fetch("{this.containerUrl}{route}", {
method: method,
body: Json.stringify({
key: key,
type: stype
})
});
};
}
}
Loading

0 comments on commit e0acf87

Please sign in to comment.