Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
skyrpex committed Dec 11, 2023
1 parent 62688ad commit 757031f
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 12 deletions.
16 changes: 8 additions & 8 deletions dynamodb-table/lib.w
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ struct TableProps {

pub class Table {
host: Host;
var usesStreams: bool;
// var usesStreams: bool;

tableName: str;

new(props: TableProps) {
this.host = Host.of(this);
this.usesStreams = false;
// this.usesStreams = false;

let tableName = this.node.addr;
let state = new sim.State();
Expand Down Expand Up @@ -266,14 +266,14 @@ pub class Table {
});
}

pub onStream(handler: inflight (StreamRecord): void) {
if this.usesStreams {
throw "Table.onStream can only be called once";
}
this.usesStreams = true;
pub setStreamConsumer(handler: inflight (StreamRecord): void) {
// if this.usesStreams {
// throw "Table.onStream can only be called once";
// }
// this.usesStreams = true;
new cloud.Service(inflight () => {
Util.processRecordsAsync(this.host.endpoint, this.tableName, handler);
}) as "OnStreamHandler";
}) as "StreamConsumer";
}

inflight client: DocumentClient;
Expand Down
4 changes: 2 additions & 2 deletions dynamodb-table/main.w
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ let table = new lib.Table(
],
);

table.onStream(inflight (record) => {
table.setStreamConsumer(inflight (record) => {
log("record processed = {Json.stringify(record)}");
});

Expand All @@ -32,7 +32,7 @@ table.onStream(inflight (record) => {

let queue = new queues.FIFOQueue();

queue.onMessage(inflight (message) => {
queue.setConsumer(inflight (message) => {
// log("message = {Json.stringify(message)}");
table.put(
item: {
Expand Down
4 changes: 2 additions & 2 deletions dynamodb-table/queues.w
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ pub class FIFOQueue {
);
}

pub onMessage(handler: inflight (FIFOMessage): void) {
this.table.onStream(inflight (record) => {
pub setConsumer(handler: inflight (FIFOMessage): void) {
this.table.setStreamConsumer(inflight (record) => {
if let item = record.dynamodb.NewImage {
if item.get("pk").get("S").asStr().startsWith("GROUP_ID#") {
handler(
Expand Down

0 comments on commit 757031f

Please sign in to comment.