Skip to content

Commit

Permalink
utils.controls.Digest (#162)
Browse files Browse the repository at this point in the history
* utils.controls.Digest

* fix linting errors

* increase minor version of the controls bundle, add changelog entry with Digest component

* fix output for array output type

* add index and count to outputs

* fix typo

* add lock to prevent "too fast" processing and therefore prevent duplicate output

* add option to drain by webhook
  • Loading branch information
DavidDurman authored Oct 2, 2024
1 parent 3ec64e3 commit 0572b37
Show file tree
Hide file tree
Showing 8 changed files with 756 additions and 1 deletion.
153 changes: 153 additions & 0 deletions src/appmixer/utils/controls/Digest/Digest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
'use strict';

const parser = require('cron-parser');
const moment = require('moment');

module.exports = {

async receive(context) {

const { threshold, generateOutputPortOptions, getWebhookUrl, outputType = 'array' } = context.properties;

if (getWebhookUrl) {
return context.sendJson({
inputs: {
webhookUrl: {
defaultValue: context.getWebhookUrl()
}
}
}, 'out');
}

if (generateOutputPortOptions) {
return this.getOutputPortOptions(context, outputType);
}

let lock;
try {
lock = await context.lock(context.componentId);

const entries = await context.stateGet('entries') || [];

if (context.messages.webhook) {
// Manually drained by webhook.
await this.sendEntries(context, entries, outputType);
await context.stateUnset('entries');
return context.response();
}

if (context.messages.timeout) {
if (!threshold || (threshold && entries.length >= threshold)) {
if (entries.length > 0) {
await this.sendEntries(context, entries, outputType);
await context.stateUnset('entries');
}
}
const previousDate = context.messages.timeout.content.previousDate;
return this.scheduleDrain(context, { previousDate });
}

const { entry } = context.messages.in.content;
entries.push(entry);
await context.stateSet('entries', entries);

if (threshold) {
if (entries.length >= threshold) {
await this.sendEntries(context, entries, outputType);
await context.stateUnset('entries');
}
}
} finally {
if (lock) {
lock.unlock();
}
}
},

async start(context) {

const { minute, hour, dayMonth, dayWeek } = context.properties;

if (minute || hour || dayMonth || dayWeek) {
return this.scheduleDrain(context, { previousDate: null });
}
},

async sendEntries(context, entries = [], outputType) {

if (outputType === 'first') {
if (entries.length) {
await context.sendJson({ entry: entries[0], index: 0, count: entries.length }, 'out');
}
} else if (outputType === 'object') {
for (let index = 0; index < entries.length; index++) {
const entry = entries[index];
await context.sendJson({ entry, index, count: entries.length }, 'out');
}
} else if (outputType === 'array') {
return context.sendJson({ entries, count: entries.length }, 'out');
} else if (outputType === 'file') {
if (entries.length) {
// Stored into CSV file.
const headers = Object.keys(entries[0] || {});
let csvRows = [];
csvRows.push(headers.join(','));
for (const entry of entries) {
const values = headers.map(header => {
const val = entry[header];
return `"${val}"`;
});
csvRows.push(values.join(','));
}
const csvString = csvRows.join('\n');
let buffer = Buffer.from(csvString, 'utf8');
const fileName = `utils-controls-Digest-${(new Date).toISOString()}.csv`;
const savedFile = await context.saveFileStream(fileName, buffer);
await context.sendJson({ fileId: savedFile.fileId, count: entries.length }, 'out');
}
}
},

async scheduleDrain(context, { previousDate = null }) {

const { timezone, minute, hour, dayMonth, dayWeek } = context.properties;
if (timezone && !moment.tz.zone(timezone)) {
throw new context.CancelError('Invalid timezone');
}

const expression = `${minute} ${hour} ${dayMonth} * ${dayWeek}`;
const options = timezone ? { tz: timezone } : {};
const interval = parser.parseExpression(expression, options);
if (!interval.hasNext()) {
throw new context.CancelError('Next scheduled date doesn\'t exist');
}

const now = moment().toISOString();
const nextDate = interval.next().toISOString();
previousDate = previousDate ? moment(previousDate).toISOString() : null;

const diff = moment(nextDate).diff(now);
await context.setTimeout({ previousDate: now }, diff);
},

getOutputPortOptions(context, outputType) {

if (outputType === 'object' || outputType === 'first') {
return context.sendJson([
{ label: 'Current Entry Index', value: 'index', schema: { type: 'integer' } },
{ label: 'Entries Count', value: 'count', schema: { type: 'integer' } },
{ label: 'Entry', value: 'entry' }
], 'out');
} else if (outputType === 'array') {
return context.sendJson([
{ label: 'Entries Count', value: 'count', schema: { type: 'integer' } },
{ label: 'Entries', value: 'entries', schema: { type: 'array' } }
], 'out');
} else if (outputType === 'file') {
return context.sendJson([
{ label: 'Entries Count', value: 'count', schema: { type: 'integer' } },
{ label: 'File ID', value: 'fileId', schema: { type: 'string', format: 'appmixer-file-id' } }
], 'out');
}
}
};
152 changes: 152 additions & 0 deletions src/appmixer/utils/controls/Digest/component.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
{
"name": "appmixer.utils.controls.Digest",
"author": "Appmixer <[email protected]>",
"description": "Compile data in a single batch and send it at regular intervals or when a certain number of entries is reached.",
"version": "1.0.0",
"properties": {
"schema": {
"properties": {
"threshold": { "type": "number" },
"getWebhookUrl": { "type": "boolean" },
"webhookUrl": { "type": "string" },
"minute": { "type": "string" },
"hour": { "type": "string" },
"dayMonth": { "type": "string" },
"dayWeek": { "type": "string" },
"timezone": { "type": "string" },
"outputType": { "type": "string" }
}
},
"inspector": {
"groups": {
"threshold": {
"label": "Drain by Threshold",
"index": 1,
"open": true
},
"webhook": {
"label": "Drain by Webhook",
"index": 2,
"open": false
},
"schedule": {
"label": "Drain by Schedule",
"index": 3,
"open": false
}
},
"inputs": {
"threshold": {
"group": "threshold",
"type": "number",
"index": 1,
"label": "Threshold",
"tooltip": "Enter the number of entries that will trigger the output. If both the threshold and the interval (configuration below) are set, the output will be triggered in regular intervals but only if the threshold is reached, i.e. both conditions must be met. If you only want to trigger the output when the threshold is reached, leave the interval configuration below empty. If you only want to trigger the output at regular intervals, leave the threshold empty."
},
"webhookUrl": {
"group": "webhook",
"type": "text",
"index": 2,
"label": "Webhook URL to Drain Entries",
"tooltip": "Optionally, you can send a POST request to this URL to manually drain the entries at any time (e.g., even when the threshold is not reached). In other words, sending a POST request to the URL releases all the collected entries and triggers an output.",
"readonly": true,
"source": {
"url": "/component/appmixer/utils/controls/Digest?outPort=out",
"data": {
"properties": { "getWebhookUrl": true }
}
}
},
"minute": {
"group": "schedule",
"type": "text",
"index": 3,
"label": "Minute",
"tooltip": "Allowed characters are *, -, /, 0-59. Specify the minute of the hour when the digest will be sent. If the minute is set to *, the digest will be sent every minute. Use the - character to specify range of values, e.g. 1-5 means all 1st, 2nd, 3rd, 4th and 5th minute of the hour. Use the / character to specify a step value, e.g. 0-20/2 means every second minute from 0 through 20 minutes of the hour. Use the , character to specify a list of values, e.g. 1,5,10 means the 1st, 5th and 10th minute of the hour."
},
"hour": {
"group": "schedule",
"type": "text",
"label": "Hour",
"index": 4,
"tooltip": "Allowed values are *, -, /, 0-23. Specify the hour of the day when the digest will be sent. If the hour is set to *, the digest will be sent every hour. Use the - character to specify range of values, e.g. 1-5 means all 1st, 2nd, 3rd, 4th and 5th hour of the day. Use the / character to specify a step value, e.g. 0-20/2 means every second hour from 0 through 20 hours of the day. Use the , character to specify a list of values, e.g. 1,5,10 means the 1st, 5th and 10th hour of the day."
},
"dayMonth": {
"group": "schedule",
"type": "text",
"index": 5,
"label": "Day of the Month",
"tooltip": "Allowed values are *, -, /, 1-31. Specify the day of the month when the digest will be sent. If the day is set to *, the digest will be sent every day. Use the - character to specify range of values, e.g. 1-5 means all 1st, 2nd, 3rd, 4th and 5th day of the month. Use the / character to specify a step value, e.g. 0-20/2 means every second day from 0 through 20 days of the month. Use the , character to specify a list of values, e.g. 1,5,10 means the 1st, 5th and 10th day of the month."
},
"dayWeek": {
"group": "schedule",
"type": "text",
"index": 6,
"label": "Day of the Week",
"tooltip": "Allowed values are *, -, /, 0-6, SUN-SAT. Specify the day of the week when the digest will be sent. If the day is set to *, the digest will be sent every day. Use the - character to specify range of values, e.g. 1-3 means all Monday, Tuesday and Wednesday. Use the / character to specify a step value, e.g. 1-5/2 means every second day of the week from Monday through Friday. Use the , character to specify a list of values, e.g. 1,2 means on Monday and Tuesday of the week."
},
"timezone": {
"group": "schedule",
"type": "text",
"index": 7,
"label": "Timezone",
"tooltip": "Specify the timezone for scheduling (e.g., 'Europe/Prague'). GMT is used by default.",
"source": {
"url": "/component/appmixer/utils/controls/ListTimeZones?outPort=out",
"data": {
"properties": { "sendWholeArray": true },
"transform": "./ListTimeZones#timezonesToSelectArray"
}
}
},
"outputType": {
"type": "select",
"label": "Output Type",
"index": 8,
"defaultValue": "array",
"tooltip": "Choose whether you want to receive the entries as one complete list, or one entry at a time (as soon as the threshold or interval conditions are met, one entry right after the another, at the same time) or a CSV file with all items.",
"options": [
{ "label": "First Entry", "value": "first" },
{ "label": "All entries at once", "value": "array" },
{ "label": "One entry at a time", "value": "object" },
{ "label": "CSV file with all entries", "value": "file" }
]
}
}
}
},
"inPorts": [
{
"name": "in",
"schema": {
"properties": {
"entry": {}
}
},
"inspector": {
"inputs": {
"entry": {
"type": "text",
"index": 1,
"label": "Entry",
"tooltip": "Enter the data that will be added to the digest."
}
}
}
}
],
"outPorts": [{
"name": "out",
"source": {
"url": "/component/appmixer/utils/controls/Digest?outPort=out",
"data": {
"properties": {
"generateOutputPortOptions": true,
"outputType": "properties/outputType"
}
}
}
}],
"icon": ""
}

63 changes: 63 additions & 0 deletions src/appmixer/utils/controls/Digest/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions src/appmixer/utils/controls/Digest/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"name": "appmixer.utils.controls.Digest",
"version": "1.0.0",
"dependencies": {
"cron-parser": "4.9.0",
"moment": "2.30.1"
}
}
18 changes: 18 additions & 0 deletions src/appmixer/utils/controls/ListTimeZones/ListTimeZones.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict';
const timezones = require('./timezones.json');

module.exports = {

async receive(context) {

await context.sendJson({ timezones }, 'out');

},
timezonesToSelectArray({ timezones }) {

return timezones.map(timezone => {
return { label: `${timezone.name} (${timezone.timezone})`, value: timezone.timezone };
});
}
};

Loading

0 comments on commit 0572b37

Please sign in to comment.