Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for skynet instances #140

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

### High Level

An autoscaler for Jitsi instances (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`), which are deployed in one of the following ways:
An autoscaler for Jitsi instances (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`, `skynet`), which are deployed in one of the following ways:
* as a parameterized Nomad batch job
* as an Instance in Oracle Cloud
* as a Droplet in Digital Ocean
* custom deployment model

The autoscaler manages multiple `groups` of instances, each having a `type` (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`) and being deployed in a specific `cloud` (`oracle`, `digitalocean`, `custom`).
The autoscaler manages multiple `groups` of instances, each having a `type` (`jibri`, `sip-jibri`, `jigasi`, `JVB`, `nomad`, `skynet`) and being deployed in a specific `cloud` (`oracle`, `digitalocean`, `custom`).

The autoscaler knows the Jitsi instances status and communicates with them via the [jitsi-autoscaler-sidecar](https://github.com/jitsi/jitsi-autoscaler-sidecar),
which needs to be co-located on each Jitsi instance. The sidecar periodically checks in with the autoscaler via a REST call and sends its status.
Expand Down
8 changes: 5 additions & 3 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ const lockManager: LockManager = new LockManager(logger, {
});

const instanceGroupManager = new InstanceGroupManager({
audit,
redisClient,
redisScanCount: config.RedisScanCount,
initialGroupList: config.GroupList,
Expand Down Expand Up @@ -216,6 +217,7 @@ const jobManager = new JobManager({
metricsLoop,
autoscalerProcessingTimeoutMs: config.GroupProcessingTimeoutMs,
launcherProcessingTimeoutMs: config.GroupProcessingTimeoutMs,
reportProcessingTimeoutMs: config.GroupProcessingTimeoutMs,
sanityLoopProcessingTimeoutMs: config.SanityProcessingTimoutMs,
});

Expand Down Expand Up @@ -556,13 +558,13 @@ app.put(
body('options.scaleUpPeriodsCount').optional().isInt({ min: 0 }).withMessage('Value must be positive'),
body('options.scaleDownPeriodsCount').optional().isInt({ min: 0 }).withMessage('Value must be positive'),
body('instanceType').custom(async (value) => {
if (!(await validator.supportedInstanceType(value))) {
throw new Error('Instance type not supported. Use jvb, jigasi, nomad, jibri or sip-jibri instead');
if (!validator.supportedInstanceType(value)) {
throw new Error('Instance type not supported. Use jvb, jigasi, nomad, jibri, sip-jibri or skynet instead');
}
return true;
}),
body('direction').custom(async (value) => {
if (!(await validator.supportedScalingDirection(value))) {
if (!validator.supportedScalingDirection(value)) {
throw new Error('Scaling direction not supported. Use up or down instead');
}
return true;
Expand Down
18 changes: 18 additions & 0 deletions src/audit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export interface GroupAudit {
timestamp?: number | string;
autoScalerActionItem?: AutoScalerActionItem;
launcherActionItem?: LauncherActionItem;
groupMetricValue?: number;
}

export interface AutoScalerActionItem {
Expand All @@ -37,6 +38,7 @@ export interface LauncherActionItem {
export interface GroupAuditResponse {
lastLauncherRun: string;
lastAutoScalerRun: string;
lastGroupMetricValue: number;
lastReconfigureRequest: string;
lastScaleMetrics?: Array<number>;
autoScalerActionItems?: AutoScalerActionItem[];
Expand Down Expand Up @@ -222,6 +224,18 @@ export default class Audit {
return updateResponse;
}

async updateLastGroupMetricValue(ctx: Context, groupName: string, groupMetricValue: number): Promise<boolean> {
const value: GroupAudit = {
groupMetricValue,
groupName,
type: 'last-group-metric-value',
};
const updateResponse = this.setGroupValue(groupName, value);
ctx.logger.info(`Updated last group metric for group ${groupName}`);

return updateResponse;
}

async updateLastAutoScalerRun(ctx: Context, groupName: string, scaleMetrics: Array<number>): Promise<boolean> {
const updateLastAutoScalerStart = process.hrtime();

Expand Down Expand Up @@ -345,6 +359,7 @@ export default class Audit {
const groupAuditResponse: GroupAuditResponse = {
lastLauncherRun: 'unknown',
lastAutoScalerRun: 'unknown',
lastGroupMetricValue: null,
lastReconfigureRequest: 'unknown',
lastScaleMetrics: [],
};
Expand All @@ -356,6 +371,9 @@ export default class Audit {
case 'last-launcher-run':
groupAuditResponse.lastLauncherRun = new Date(groupAudit.timestamp).toISOString();
break;
case 'last-group-metric-value':
groupAuditResponse.lastGroupMetricValue = groupAudit.groupMetricValue;
break;
case 'last-autoScaler-run':
groupAuditResponse.lastScaleMetrics = groupAudit.autoScalerActionItem
? groupAudit.autoScalerActionItem.scaleMetrics
Expand Down
21 changes: 17 additions & 4 deletions src/autoscaler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { InstanceMetric, InstanceTracker } from './instance_tracker';
import CloudManager from './cloud_manager';
import Redlock from 'redlock';
import Redis from 'ioredis';
import InstanceGroupManager, { InstanceGroup } from './instance_group';
import InstanceGroupManager, { GroupMetric, InstanceGroup } from './instance_group';
import LockManager from './lock_manager';
import { Context } from './context';
import Audit from './audit';
Expand Down Expand Up @@ -75,13 +75,24 @@ export default class AutoscaleProcessor {
group.scalingOptions.scaleUpPeriodsCount,
group.scalingOptions.scaleDownPeriodsCount,
);
const metricInventoryPerPeriod: Array<Array<InstanceMetric>> =
await this.instanceTracker.getMetricInventoryPerPeriod(

let metricInventoryPerPeriod = [];

if (group.metricsUrl) {
metricInventoryPerPeriod = await this.instanceGroupManager.getGroupMetricInventoryPerPeriod(
ctx,
group.name,
maxPeriodCount,
group.scalingOptions.scalePeriod,
);
} else {
metricInventoryPerPeriod = await this.instanceTracker.getMetricInventoryPerPeriod(
ctx,
group.name,
maxPeriodCount,
group.scalingOptions.scalePeriod,
);
}

const scaleMetrics = await this.updateDesiredCountIfNeeded(ctx, group, count, metricInventoryPerPeriod);
await this.audit.updateLastAutoScalerRun(ctx, group.name, scaleMetrics);
Expand All @@ -96,7 +107,7 @@ export default class AutoscaleProcessor {
ctx: Context,
group: InstanceGroup,
count: number,
metricInventoryPerPeriod: Array<Array<InstanceMetric>>,
metricInventoryPerPeriod: Array<Array<InstanceMetric | GroupMetric>>,
): Promise<Array<number>> {
ctx.logger.debug(
`[AutoScaler] Begin desired count adjustments for group ${group.name} with ${count} instances and current desired count ${group.scalingOptions.desiredCount}`,
Expand Down Expand Up @@ -191,6 +202,7 @@ export default class AutoscaleProcessor {
case 'jigasi':
case 'nomad':
case 'JVB':
case 'skynet':
// in the case of JVB scale up only if value (average stress level) is above or equal to threshhold
return (
(count < group.scalingOptions.maxDesired && value >= group.scalingOptions.scaleUpThreshold) ||
Expand All @@ -209,6 +221,7 @@ export default class AutoscaleProcessor {
case 'jigasi':
case 'nomad':
case 'JVB':
case 'skynet':
// in the case of JVB scale down only if value (average stress level) is below threshhold
return count > group.scalingOptions.minDesired && value < group.scalingOptions.scaleDownThreshold;
}
Expand Down
2 changes: 1 addition & 1 deletion src/group_report.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ export default class GroupReportGenerator {
instanceReport.scaleStatus = 'IN USE';
}
if (
instanceState.status.jigasiStatus &&
instanceState.status.nomadStatus &&
!instanceState.status.nomadStatus.eligibleForScheduling
) {
instanceReport.scaleStatus = 'GRACEFUL SHUTDOWN';
Expand Down
85 changes: 85 additions & 0 deletions src/instance_group.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import Redis from 'ioredis';
import { Context } from './context';
import got from 'got';
import Audit from './audit';

export interface ScalingOptions {
minDesired: number;
Expand All @@ -18,6 +20,16 @@ export interface InstanceGroupTags {
[id: string]: string;
}

export interface GroupMetric {
groupName: string;
timestamp: number;
value: number;
}

export const GroupTypeToGroupMetricKey: { [id: string]: string } = {
skynet: 'queueSize',
};

export interface InstanceGroup {
id: string;
name: string;
Expand All @@ -31,6 +43,7 @@ export interface InstanceGroup {
enableScheduler: boolean;
enableUntrackedThrottle: boolean;
enableReconfiguration?: boolean;
metricsUrl: string;
gracePeriodTTLSec: number;
protectedTTLSec: number;
scalingOptions: ScalingOptions;
Expand All @@ -39,6 +52,7 @@ export interface InstanceGroup {
}

export interface InstanceGroupManagerOptions {
audit: Audit;
redisClient: Redis.Redis;
redisScanCount: number;
initialGroupList: Array<InstanceGroup>;
Expand All @@ -48,13 +62,15 @@ export interface InstanceGroupManagerOptions {

export default class InstanceGroupManager {
private readonly GROUPS_HASH_NAME = 'allgroups';
private readonly audit: Audit;
private redisClient: Redis.Redis;
private readonly redisScanCount: number;
private readonly initialGroupList: Array<InstanceGroup>;
private readonly processingIntervalSeconds: number;
private readonly sanityJobsIntervalSeconds: number;

constructor(options: InstanceGroupManagerOptions) {
this.audit = options.audit;
this.redisClient = options.redisClient;
this.redisScanCount = options.redisScanCount;
this.initialGroupList = options.initialGroupList;
Expand Down Expand Up @@ -83,6 +99,10 @@ export default class InstanceGroupManager {
return this.initialGroupList;
}

private getGroupMetricsKey(groupName: string): string {
return `gmetric:${groupName}`;
}

async existsAtLeastOneGroup(): Promise<boolean> {
let cursor = '0';
do {
Expand Down Expand Up @@ -274,6 +294,71 @@ export default class InstanceGroupManager {
return !(result !== null && result.length > 0);
}

async fetchGroupMetrics(ctx: Context, groupName: string): Promise<boolean> {
try {
const group = await this.getInstanceGroup(groupName);
if (!group) {
throw new Error(`Group ${groupName} not found, failed to report on group metrics`);
}

if (!group.metricsUrl) {
ctx.logger.debug(`Group ${groupName} no metrics url, skipping metrics fetching`);
return false;
}

const metrics: { [id: string]: number } = await got(group.metricsUrl).json();

const key: string = Object.keys(metrics).find((key) => {
return GroupTypeToGroupMetricKey[group.type] === key;
});

const metricsObject: GroupMetric = {
groupName,
timestamp: Date.now(),
value: metrics[key],
};

// store the group metrics
await this.redisClient.zadd(
this.getGroupMetricsKey(groupName),
metricsObject.timestamp,
JSON.stringify(metricsObject),
);

await this.audit.updateLastGroupMetricValue(ctx, groupName, metricsObject.value);
} catch (e) {
ctx.logger.error(`Failed to report group metrics for group ${groupName}`, e);
return false;
}
}

async getGroupMetricInventoryPerPeriod(
ctx: Context,
groupName: string,
periodsCount: number,
periodDurationSeconds: number,
): Promise<Array<Array<GroupMetric>>> {
const metricPoints: Array<Array<GroupMetric>> = [];
const metricsKey = this.getGroupMetricsKey(groupName);
const now = Date.now();
const items: string[] = await this.redisClient.zrange(metricsKey, 0, -1);

for (let periodIdx = 0; periodIdx < periodsCount; periodIdx++) {
metricPoints[periodIdx] = [];
}

items.forEach((item) => {
const itemJson: GroupMetric = JSON.parse(item);
const periodIdx = Math.floor((now - itemJson.timestamp) / (periodDurationSeconds * 1000));

if (periodIdx >= 0 && periodIdx < periodsCount) {
metricPoints[periodIdx].push(itemJson);
}
});

return metricPoints;
}

async setGroupJobsCreationGracePeriod(): Promise<boolean> {
return this.setValue(`groupJobsCreationGracePeriod`, this.processingIntervalSeconds);
}
Expand Down
37 changes: 33 additions & 4 deletions src/instance_tracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Context } from './context';
import Redis from 'ioredis';
import ShutdownManager from './shutdown_manager';
import Audit from './audit';
import { InstanceGroup } from './instance_group';
import { GroupMetric, InstanceGroup } from './instance_group';

/* eslint-disable */
function isEmpty(obj: any) {
Expand Down Expand Up @@ -82,6 +82,7 @@ export interface JigasiStatus {
// largest_conference: number;
graceful_shutdown: boolean;
}

export interface InstanceDetails {
instanceId: string;
instanceType: string;
Expand Down Expand Up @@ -352,21 +353,49 @@ export class InstanceTracker {
async getSummaryMetricPerPeriod(
ctx: Context,
group: InstanceGroup,
metricInventoryPerPeriod: Array<Array<InstanceMetric>>,
metricInventoryPerPeriod: Array<Array<InstanceMetric | GroupMetric>>,
periodCount: number,
): Promise<Array<number>> {
switch (group.type) {
case 'jibri':
case 'sip-jibri':
return this.getAvailableMetricPerPeriod(ctx, metricInventoryPerPeriod, periodCount);
return this.getAvailableMetricPerPeriod(
ctx,
metricInventoryPerPeriod as Array<Array<InstanceMetric>>,
periodCount,
);
case 'nomad':
case 'jigasi':
case 'JVB':
return this.getAverageMetricPerPeriod(ctx, metricInventoryPerPeriod, periodCount);
return this.getAverageMetricPerPeriod(
ctx,
metricInventoryPerPeriod as Array<Array<InstanceMetric>>,
periodCount,
);
case 'skynet':
return this.getSkynetGroupMetricPerPeriod(
ctx,
metricInventoryPerPeriod as Array<Array<GroupMetric>>,
periodCount,
);
}
return;
}

async getSkynetGroupMetricPerPeriod(
ctx: Context,
metricInventoryPerPeriod: Array<Array<GroupMetric>>,
periodCount: number,
): Promise<Array<number>> {
ctx.logger.debug(`Getting skynet group metric per period for ${periodCount} periods`, {
metricInventoryPerPeriod,
});

return metricInventoryPerPeriod
.slice(0, periodCount)
.map((groupMetrics: Array<GroupMetric>) => groupMetrics[0]?.value ?? 0);
}

async getAvailableMetricPerPeriod(
ctx: Context,
metricInventoryPerPeriod: Array<Array<InstanceMetric>>,
Expand Down
Loading