From ef11d4dc367c622699fd6f842893456316e1ef5c Mon Sep 17 00:00:00 2001 From: skadefro Date: Mon, 5 Jun 2023 11:16:33 +0200 Subject: [PATCH 1/4] Add custom grpc settings --- OpenFlow/src/Config.ts | 48 ++++++++++++++++++++++++++++++++++++++- OpenFlow/src/WebServer.ts | 12 ++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/OpenFlow/src/Config.ts b/OpenFlow/src/Config.ts index b19830aa..0b03deed 100644 --- a/OpenFlow/src/Config.ts +++ b/OpenFlow/src/Config.ts @@ -95,6 +95,18 @@ export class dbConfig extends Base { public otel_trace_mongodb_update_per_users: boolean; public otel_trace_mongodb_delete_per_users: boolean; + public grpc_keepalive_time_ms: number; + public grpc_keepalive_timeout_ms: number; + public grpc_http2_min_ping_interval_without_data_ms: number; + public grpc_max_connection_idle_ms: number; + public grpc_max_connection_age_ms: number; + public grpc_max_connection_age_grace_ms: number; + public grpc_http2_max_pings_without_data: number; + public grpc_keepalive_permit_without_calls: number; + public grpc_max_receive_message_length: number; + public grpc_max_send_message_length: number; + + public cache_workitem_queues: boolean; public agent_images: NoderedImage[] @@ -213,6 +225,19 @@ export class dbConfig extends Base { Config.otel_trace_mongodb_update_per_users = Config.parseBoolean(!NoderedUtil.IsNullEmpty(conf.otel_trace_mongodb_update_per_users) ? conf.otel_trace_mongodb_update_per_users : Config.getEnv("otel_trace_mongodb_update_per_users", "false")); Config.otel_trace_mongodb_delete_per_users = Config.parseBoolean(!NoderedUtil.IsNullEmpty(conf.otel_trace_mongodb_delete_per_users) ? conf.otel_trace_mongodb_delete_per_users : Config.getEnv("otel_trace_mongodb_delete_per_users", "false")); + Config.grpc_keepalive_time_ms = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_keepalive_time_ms) ? conf.grpc_keepalive_time_ms.toString() : Config.getEnv("grpc_keepalive_time_ms", (-1).toString())) + Config.grpc_keepalive_timeout_ms = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_keepalive_timeout_ms) ? conf.grpc_keepalive_timeout_ms.toString() : Config.getEnv("grpc_keepalive_timeout_ms", (-1).toString())) + Config.grpc_http2_min_ping_interval_without_data_ms = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_http2_min_ping_interval_without_data_ms) ? conf.grpc_http2_min_ping_interval_without_data_ms.toString() : Config.getEnv("grpc_http2_min_ping_interval_without_data_ms", (-1).toString())) + Config.grpc_max_connection_idle_ms = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_max_connection_idle_ms) ? conf.grpc_max_connection_idle_ms.toString() : Config.getEnv("grpc_max_connection_idle_ms", (-1).toString())) + Config.grpc_max_connection_age_ms = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_max_connection_age_ms) ? conf.grpc_max_connection_age_ms.toString() : Config.getEnv("grpc_max_connection_age_ms", (-1).toString())) + Config.grpc_max_connection_age_grace_ms = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_max_connection_age_grace_ms) ? conf.grpc_max_connection_age_grace_ms.toString() : Config.getEnv("grpc_max_connection_age_grace_ms", (-1).toString())) + Config.grpc_http2_max_pings_without_data = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_http2_max_pings_without_data) ? conf.grpc_http2_max_pings_without_data.toString() : Config.getEnv("grpc_http2_max_pings_without_data", (-1).toString())) + Config.grpc_keepalive_permit_without_calls = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_keepalive_permit_without_calls) ? conf.grpc_keepalive_permit_without_calls.toString() : Config.getEnv("grpc_keepalive_permit_without_calls", (-1).toString())) + Config.grpc_max_receive_message_length = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_max_receive_message_length) ? conf.grpc_max_receive_message_length.toString() : Config.getEnv("grpc_max_receive_message_length", (-1).toString())) + Config.grpc_max_send_message_length = parseInt(!NoderedUtil.IsNullEmpty(conf.grpc_max_send_message_length) ? conf.grpc_max_send_message_length.toString() : Config.getEnv("grpc_max_send_message_length", (-1).toString())) + + + Config.cache_workitem_queues = Config.parseBoolean(!NoderedUtil.IsNullEmpty(conf.cache_workitem_queues) ? conf.cache_workitem_queues : Config.getEnv("cache_workitem_queues", "false")); @@ -503,7 +528,17 @@ export class Config { Config.otel_trace_mongodb_update_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_update_per_users", "false")); Config.otel_trace_mongodb_delete_per_users = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_delete_per_users", "false")); - + Config.grpc_keepalive_time_ms = parseInt(Config.getEnv("grpc_keepalive_time_ms", "-1")); + Config.grpc_keepalive_timeout_ms = parseInt(Config.getEnv("grpc_keepalive_timeout_ms", "-1")); + Config.grpc_http2_min_ping_interval_without_data_ms = parseInt(Config.getEnv("grpc_http2_min_ping_interval_without_data_ms", "-1")); + Config.grpc_max_connection_idle_ms = parseInt(Config.getEnv("grpc_max_connection_idle_ms", "-1")); + Config.grpc_max_connection_age_ms = parseInt(Config.getEnv("grpc_max_connection_age_ms", "-1")); + Config.grpc_max_connection_age_grace_ms = parseInt(Config.getEnv("grpc_max_connection_age_grace_ms", "-1")); + Config.grpc_http2_max_pings_without_data = parseInt(Config.getEnv("grpc_http2_max_pings_without_data", "-1")); + Config.grpc_keepalive_permit_without_calls = parseInt(Config.getEnv("grpc_keepalive_permit_without_calls", "-1")); + Config.grpc_max_receive_message_length = parseInt(Config.getEnv("grpc_max_receive_message_length", "-1")); + Config.grpc_max_send_message_length = parseInt(Config.getEnv("grpc_max_send_message_length", "-1")); + Config.validate_user_form = Config.getEnv("validate_user_form", ""); } @@ -764,6 +799,17 @@ export class Config { public static otel_trace_mongodb_update_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_update_per_users", "false")); public static otel_trace_mongodb_delete_per_users: boolean = Config.parseBoolean(Config.getEnv("otel_trace_mongodb_delete_per_users", "false")); + public static grpc_keepalive_time_ms = parseInt(Config.getEnv("grpc_keepalive_time_ms", "-1")); + public static grpc_keepalive_timeout_ms = parseInt(Config.getEnv("grpc_keepalive_timeout_ms", "-1")); + public static grpc_http2_min_ping_interval_without_data_ms = parseInt(Config.getEnv("grpc_http2_min_ping_interval_without_data_ms", "-1")); + public static grpc_max_connection_idle_ms = parseInt(Config.getEnv("grpc_max_connection_idle_ms", "-1")); + public static grpc_max_connection_age_ms = parseInt(Config.getEnv("grpc_max_connection_age_ms", "-1")); + public static grpc_max_connection_age_grace_ms = parseInt(Config.getEnv("grpc_max_connection_age_grace_ms", "-1")); + public static grpc_http2_max_pings_without_data = parseInt(Config.getEnv("grpc_http2_max_pings_without_data", "-1")); + public static grpc_keepalive_permit_without_calls = parseInt(Config.getEnv("grpc_keepalive_permit_without_calls", "-1")); + public static grpc_max_receive_message_length = parseInt(Config.getEnv("grpc_max_receive_message_length", "-1")); + public static grpc_max_send_message_length = parseInt(Config.getEnv("grpc_max_send_message_length", "-1")); + public static validate_user_form: string = Config.getEnv("validate_user_form", ""); public static externalbaseurl(): string { diff --git a/OpenFlow/src/WebServer.ts b/OpenFlow/src/WebServer.ts index cc9be902..7ed324bc 100644 --- a/OpenFlow/src/WebServer.ts +++ b/OpenFlow/src/WebServer.ts @@ -294,6 +294,18 @@ export class WebServer { process.exit(404); } }); + if(Config.grpc_keepalive_time_ms > -1) protowrap.grpc_server_options['grpc.keepalive_time_ms'] = Config.grpc_keepalive_time_ms; + if(Config.grpc_keepalive_timeout_ms > -1) protowrap.grpc_server_options['grpc.keepalive_timeout_ms'] = Config.grpc_keepalive_timeout_ms; + if(Config.grpc_http2_min_ping_interval_without_data_ms > -1) protowrap.grpc_server_options['grpc.http2.min_ping_interval_without_data_ms'] = Config.grpc_http2_min_ping_interval_without_data_ms; + if(Config.grpc_max_connection_idle_ms > -1) protowrap.grpc_server_options['grpc.max_connection_idle_ms'] = Config.grpc_max_connection_idle_ms; + if(Config.grpc_max_connection_age_ms > -1) protowrap.grpc_server_options['grpc.max_connection_age_ms'] = Config.grpc_max_connection_age_ms; + if(Config.grpc_max_connection_age_grace_ms > -1) protowrap.grpc_server_options['grpc.max_connection_age_grace_ms'] = Config.grpc_max_connection_age_grace_ms; + if(Config.grpc_http2_max_pings_without_data > -1) protowrap.grpc_server_options['grpc.http2.max_pings_without_data'] = Config.grpc_http2_max_pings_without_data; + if(Config.grpc_keepalive_permit_without_calls > -1) protowrap.grpc_server_options['grpc.keepalive_permit_without_calls'] = Config.grpc_keepalive_permit_without_calls; + if(Config.grpc_max_receive_message_length > -1) protowrap.grpc_server_options['grpc.max_receive_message_length'] = Config.grpc_max_receive_message_length; + if(Config.grpc_max_send_message_length > -1) protowrap.grpc_server_options['grpc.max_send_message_length'] = Config.grpc_max_send_message_length; + + var servers = []; servers.push(protowrap.serve("pipe", this.onClientConnected, config.defaultsocketport, "testpipe", WebServer.wss, WebServer.app, WebServer.server, flowclient)); servers.push(protowrap.serve("socket", this.onClientConnected, config.defaultsocketport, null, WebServer.wss, WebServer.app, WebServer.server, flowclient)); From 115b03cb3f874184dcb1deb9495a87452d48968f Mon Sep 17 00:00:00 2001 From: skadefro Date: Mon, 5 Jun 2023 11:16:53 +0200 Subject: [PATCH 2/4] make agent queu premapped for agent --- OpenFlow/src/DBHelper.ts | 4 +++- OpenFlow/src/Messages/Message.ts | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/OpenFlow/src/DBHelper.ts b/OpenFlow/src/DBHelper.ts index da74eb61..b76ad3f7 100644 --- a/OpenFlow/src/DBHelper.ts +++ b/OpenFlow/src/DBHelper.ts @@ -343,10 +343,12 @@ export class DBHelper { } public FindAgentBySlugOrIdWrap(_id, jwt, span) { if (jwt === null || jwt == undefined || jwt == "") { jwt = Crypt.rootToken(); } + var agentslug = _id; + if(_id.endsWith("agent")) agentslug = _id.substring(0, _id.length - 5 ); Logger.instanse.debug("Add queue to cache : " + _id, span); return Config.db.GetOne({ query: {"_type": "agent", "$or": [ { _id }, - { slug: _id }]} , collectionname: "agents", jwt }, span); + { slug: _id }, { slug: agentslug } ]} , collectionname: "agents", jwt }, span); } public async FindAgentBySlugOrId(_id: string, jwt: string, parent: Span): Promise { diff --git a/OpenFlow/src/Messages/Message.ts b/OpenFlow/src/Messages/Message.ts index 42aee197..106fdc79 100644 --- a/OpenFlow/src/Messages/Message.ts +++ b/OpenFlow/src/Messages/Message.ts @@ -849,6 +849,9 @@ export class Message { } if (!allowed) { let mq = await Logger.DBHelper.FindQueueByName(msg.queuename, rootjwt, parent); + if (mq == null) { + mq = await Logger.DBHelper.FindAgentBySlugOrId(msg.queuename, rootjwt, span) as any; + } if (mq != null) { if (Config.amqp_force_sender_has_invoke) { if (!DatabaseConnection.hasAuthorization(tuser, mq, Rights.invoke)) { From 59c5c12082421fe2d99499a9c1e28bbfaa93bd4c Mon Sep 17 00:00:00 2001 From: skadefro Date: Wed, 7 Jun 2023 12:05:15 +0200 Subject: [PATCH 3/4] add null check on federationids --- OpenFlow/src/LoginProvider.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/OpenFlow/src/LoginProvider.ts b/OpenFlow/src/LoginProvider.ts index eb623a5e..d1d9dc8b 100644 --- a/OpenFlow/src/LoginProvider.ts +++ b/OpenFlow/src/LoginProvider.ts @@ -675,6 +675,7 @@ export class LoginProvider { if (!NoderedUtil.IsNullEmpty(profile["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/mobile"])) { (_user as any).mobile = profile["http://schemas.xmlsoap.org/ws/2005/05/identity/claims/mobile"]; } + if(_user.federationids == null) _user.federationids = []; var exists = _user.federationids.filter(x => x.id == username && x.issuer == issuer); if (exists.length == 0) { _user.federationids = _user.federationids.filter(x => x.issuer != issuer); @@ -759,6 +760,7 @@ export class LoginProvider { _user = await Logger.DBHelper.EnsureUser(jwt, _user.name, _user.username, null, null, extraoptions, span); } } else { + if(_user.federationids == null) _user.federationids = []; var exists = _user.federationids.filter(x => x.id == profile.id && x.issuer == issuer); if (exists.length == 0 || _user.emailvalidated == false) { _user.federationids = _user.federationids.filter(x => x.issuer != issuer); @@ -822,6 +824,7 @@ export class LoginProvider { _user = await Logger.DBHelper.EnsureUser(jwt, _user.name, _user.username, null, null, extraoptions, span); } } else { + if(_user.federationids == null) _user.federationids = []; var exists = _user.federationids.filter(x => x.id == username && x.issuer == issuer); if (exists.length == 0 || _user.emailvalidated == false) { _user.federationids = _user.federationids.filter(x => x.issuer != issuer); From 992caaa3d7d06416ed56af1036d5fbab16db4c66 Mon Sep 17 00:00:00 2001 From: skadefro Date: Wed, 7 Jun 2023 12:06:16 +0200 Subject: [PATCH 4/4] fix doc --- docs/README.md | 5 ----- docs/{npmnodered.md => npmnodered.old.md} | 0 docs/{npmopenflow.md => npmopenflow.old.md} | 0 3 files changed, 5 deletions(-) rename docs/{npmnodered.md => npmnodered.old.md} (100%) rename docs/{npmopenflow.md => npmopenflow.old.md} (100%) diff --git a/docs/README.md b/docs/README.md index a1e8a44b..5bbe23e5 100644 --- a/docs/README.md +++ b/docs/README.md @@ -37,11 +37,6 @@ Installing on [kubernetes](kubernetes) using our [helm-charts](https://github.com/open-rpa/helm-charts/) -#### How to install and manage OpenFlow using npm packages -Installing using [npm packages](npmopenflow) - -Installing remote/local nodereds using [npm packages](npmnodered) - #### How to build and run from source build [from source](buildsource) diff --git a/docs/npmnodered.md b/docs/npmnodered.old.md similarity index 100% rename from docs/npmnodered.md rename to docs/npmnodered.old.md diff --git a/docs/npmopenflow.md b/docs/npmopenflow.old.md similarity index 100% rename from docs/npmopenflow.md rename to docs/npmopenflow.old.md