Skip to content

Commit

Permalink
[backend] Change Telemetry manager locking behavior (#6292)
Browse files Browse the repository at this point in the history
- Locking need to be kept on one instance only
  • Loading branch information
richard-julien committed May 6, 2024
1 parent 7a404a5 commit 34aceed
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
35 changes: 22 additions & 13 deletions opencti-platform/opencti-graphql/src/manager/managerModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,16 @@ import { SYSTEM_USER } from '../utils/access';
import { utcDate } from '../utils/format';
import { wait } from '../database/utils';

export interface HandlerInput {
shutdown?: () => Promise<void>
}

export interface ManagerCronScheduler {
handler: (input?: any) => void;
interval: number;
lockKey: string;
createHandlerInput?: () => Promise<any>;
handler: (input?: any) => Promise<void>
interval: number
lockKey: string
handlerInfinite?: boolean
handlerInitializer?: () => Promise<HandlerInput>
}

export interface ManagerStreamScheduler {
Expand Down Expand Up @@ -44,19 +49,25 @@ const initManager = (manager: ManagerDefinition) => {
let running = false;
let shutdown = false;

const cronHandler = async (cronInput?: any) => {
const cronHandler = async (cronInputFn?: () => Promise<HandlerInput>) => {
if (manager.cronSchedulerHandler) {
let lock;
let cronInput;
const startDate = utcDate();
try {
// date
// Lock the manager
lock = await lockResource([manager.cronSchedulerHandler.lockKey], { retryCount: 0 });
running = true;
if (cronInput) {
await manager.cronSchedulerHandler.handler(cronInput);
cronInput = cronInputFn ? await cronInputFn() : undefined;
if (manager.cronSchedulerHandler.handlerInfinite) {
logApp.info(`[OPENCTI-MODULE] Running ${manager.label} infinite cron handler`);
while (!shutdown) {
await manager.cronSchedulerHandler.handler(cronInput);
await wait(manager.cronSchedulerHandler.interval);
}
} else {
await manager.cronSchedulerHandler.handler();
await manager.cronSchedulerHandler.handler(cronInput);
}
} catch (e: any) {
if (e.name === TYPE_LOCK_ERROR) {
Expand All @@ -67,6 +78,7 @@ const initManager = (manager: ManagerDefinition) => {
} finally {
running = false;
if (lock) await lock.unlock();
if (cronInput && cronInput.shutdown) await cronInput.shutdown();
if (startDate) {
const duration = moment.duration(utcDate().diff(startDate)).asMilliseconds();
logApp.debug(`[OPENCTI-MODULE] ${manager.label} done in ${duration}ms`);
Expand Down Expand Up @@ -108,13 +120,10 @@ const initManager = (manager: ManagerDefinition) => {
manager,
start: async () => {
logApp.info(`[OPENCTI-MODULE] Starting ${manager.label}`);
let cronInput: any;
if (manager.cronSchedulerHandler?.createHandlerInput) {
cronInput = await manager.cronSchedulerHandler.createHandlerInput();
}
if (manager.cronSchedulerHandler) {
const { handlerInitializer } = manager.cronSchedulerHandler;
scheduler = setIntervalAsync(async () => {
await cronHandler(cronInput);
await cronHandler(handlerInitializer);
}, manager.cronSchedulerHandler.interval);
}
if (manager.streamSchedulerHandler) {
Expand Down
13 changes: 7 additions & 6 deletions opencti-platform/opencti-graphql/src/manager/telemetryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { Settings } from '../generated/graphql';
import { getClusterInformation, getSettings } from '../domain/settings';
import { usersWithActiveSession } from '../database/session';
import { TELEMETRY_SERVICE_NAME, TelemetryMeterManager } from '../telemetry/TelemetryMeterManager';
import type { ManagerDefinition } from './managerModule';
import type { HandlerInput, ManagerDefinition } from './managerModule';
import { registerManager } from './managerModule';
import { MetricFileExporter } from '../telemetry/MetricFileExporter';
import { getEntitiesListFromCache, getEntityFromCache } from '../database/cache';
Expand All @@ -21,23 +21,23 @@ import type { BasicStoreSettings } from '../types/settings';
import { getHttpClient } from '../utils/http-client';
import type { StoreConnector } from '../types/connector';

const TELEMETRY_DEV_DEBUG = false;
const TELEMETRY_DEV_DEBUG = conf.get('telemetry_manager:debug') ?? false;
const TELEMETRY_MANAGER_KEY = conf.get('telemetry_manager:lock_key');
const FILIGRAN_OTLP_TELEMETRY = DEV_MODE
? 'https://telemetry.staging.filigran.io/v1/metrics' : 'https://telemetry.filigran.io/v1/metrics';

const ONE_MINUTE = 60 * 1000;
const FIVE_MINUTE = 5 * ONE_MINUTE;
const TWO_MINUTE = 2 * ONE_MINUTE;
const ONE_HOUR = 60 * ONE_MINUTE;
const SIX_HOUR = 6 * ONE_HOUR;
// Collect data period, corresponds to data point collection
const TELEMETRY_COLLECT_INTERVAL = DEV_MODE ? ONE_MINUTE : ONE_HOUR;
// Export data period, sending information to files, console and otlp
const TELEMETRY_EXPORT_INTERVAL = DEV_MODE ? FIVE_MINUTE : SIX_HOUR;
const TELEMETRY_EXPORT_INTERVAL = DEV_MODE ? TWO_MINUTE : SIX_HOUR;
// Manager schedule, data point generation
const SCHEDULE_TIME = DEV_MODE ? ONE_MINUTE / 2 : ONE_HOUR / 2;

const telemetryHandler = async () => {
const telemetryInitializer = async (): Promise<HandlerInput> => {
// Build readers
const filigranMetricReaders = [];
// region File exporter
Expand Down Expand Up @@ -140,9 +140,10 @@ const TELEMETRY_MANAGER_DEFINITION: ManagerDefinition = {
executionContext: 'telemetry_manager',
cronSchedulerHandler: {
handler: fetchTelemetryData,
handlerInitializer: telemetryInitializer, // Init meter manager is required
handlerInfinite: true, // Lock needs to be kept, inner scheduler will be done.
interval: SCHEDULE_TIME,
lockKey: TELEMETRY_MANAGER_KEY,
createHandlerInput: telemetryHandler,
},
enabledByConfig: true,
enabledToStart(): boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export class TelemetryMeterManager {
this.meterProvider = meterProvider;
}

async shutdown() {
return this.meterProvider.shutdown();
}

setIsEEActivated(EE: number) {
this.isEEActivated = EE;
}
Expand Down

0 comments on commit 34aceed

Please sign in to comment.