Skip to content

Commit

Permalink
[backend] Improve CSV feed rolling max results (#4639)
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-julien authored Dec 4, 2023
1 parent 6cbc3e1 commit a1f2ceb
Show file tree
Hide file tree
Showing 30 changed files with 100 additions and 115 deletions.
25 changes: 14 additions & 11 deletions opencti-platform/opencti-graphql/src/database/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -2286,17 +2286,17 @@ export const elPaginate = async (context, user, indexName, options = {}) => {
}
);
};
export const elList = async (context, user, indexName, options = {}, noFiltersChecking = false) => {
const convertedFilters = (noFiltersChecking || !options.filters)
? options.filters
: checkAndConvertFilters(options.filters);
const { first = MAX_SEARCH_SIZE, infinite = false } = options;
export const elList = async (context, user, indexName, opts = {}) => {
const { first = MAX_SEARCH_SIZE, maxSize = undefined } = opts;
const { filters, noFiltersChecking = false } = opts;
const convertedFilters = noFiltersChecking ? filters : checkAndConvertFilters(filters);
let emitSize = 0;
let hasNextPage = true;
let continueProcess = true;
let searchAfter = options.after;
let searchAfter = opts.after;
const listing = [];
const publish = async (elements) => {
const { callback } = options;
const { callback } = opts;
if (callback) {
const callbackResult = await callback(elements);
continueProcess = callbackResult === true || callbackResult === undefined;
Expand All @@ -2306,15 +2306,18 @@ export const elList = async (context, user, indexName, options = {}, noFiltersCh
};
while (continueProcess && hasNextPage) {
// Force options to prevent connection format and manage search after
const opts = { ...options, filters: convertedFilters, first, after: searchAfter, connectionFormat: false };
const elements = await elPaginate(context, user, indexName, opts);
if (!infinite && (elements.length === 0 || elements.length < first)) {
const paginateOpts = { ...opts, filters: convertedFilters, first, after: searchAfter, connectionFormat: false };
const elements = await elPaginate(context, user, indexName, paginateOpts);
emitSize += elements.length;
const noMoreElements = elements.length === 0 || elements.length < (first ?? MAX_SEARCH_SIZE);
const moreThanMax = maxSize ? emitSize >= maxSize : false;
if (noMoreElements || moreThanMax) {
if (elements.length > 0) {
await publish(elements);
}
hasNextPage = false;
} else if (elements.length > 0) {
const { sort } = R.last(elements);
const { sort } = elements[elements.length - 1];
searchAfter = offsetToCursor(sort);
await publish(elements);
}
Expand Down
58 changes: 11 additions & 47 deletions opencti-platform/opencti-graphql/src/database/middleware-loader.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import * as R from 'ramda';
import {
buildPagination,
offsetToCursor,
READ_DATA_INDICES,
READ_DATA_INDICES_WITHOUT_INFERRED,
READ_ENTITIES_INDICES,
READ_RELATIONSHIPS_INDICES,
} from './utils';
import { elAggregationsList, elCount, elFindByIds, elLoadById, elPaginate } from './engine';
import { elAggregationsList, elCount, elFindByIds, elList, elLoadById, elPaginate } from './engine';
import { buildRefRelationKey } from '../schema/general';
import type { AuthContext, AuthUser } from '../types/user';
import type {
Expand All @@ -33,8 +32,6 @@ import {
} from '../schema/stixRefRelationship';
import { ENTITY_TYPE_WORKSPACE } from '../modules/workspace/workspace-types';

const MAX_SEARCH_SIZE = 5000;

export interface FiltersWithNested extends Filter {
nested?: Array<{
key: string; // nested filters handle special cases for elastic, it's an internal format
Expand All @@ -55,7 +52,6 @@ export interface ListFilter<T extends BasicStoreCommon> {
useWildcardPrefix?: boolean;
useWildcardSuffix?: boolean;
first?: number | null;
infinite?: boolean;
after?: string | undefined | null;
orderBy?: any,
orderMode?: InputMaybe<OrderingMode>;
Expand All @@ -65,7 +61,7 @@ export interface ListFilter<T extends BasicStoreCommon> {
}

type InternalListEntities = <T extends BasicStoreCommon>
(context: AuthContext, user: AuthUser, entityTypes: Array<string>, args: EntityOptions<T>, noFiltersChecking?: boolean) => Promise<Array<T>>;
(context: AuthContext, user: AuthUser, entityTypes: Array<string>, args: EntityOptions<T>) => Promise<Array<T>>;

// entities
interface EntityFilters<T extends BasicStoreCommon> extends ListFilter<T> {
Expand All @@ -85,44 +81,12 @@ interface EntityFilters<T extends BasicStoreCommon> extends ListFilter<T> {
}

export interface EntityOptions<T extends BasicStoreCommon> extends EntityFilters<T> {
ids?: Array<string>;
indices?: Array<string>;
ids?: Array<string>
indices?: Array<string>
noFiltersChecking?: boolean
includeAuthorities?: boolean | null
}

export const elList = async <T extends BasicStoreCommon>(context: AuthContext, user: AuthUser, indices: Array<string>, options: ListFilter<T> = {}): Promise<Array<T>> => {
const { first = MAX_SEARCH_SIZE, infinite = false } = options;
let hasNextPage = true;
let continueProcess = true;
let searchAfter = options.after;
const listing: Array<T> = [];
const publish = async (elements: Array<T>) => {
const { callback } = options;
if (callback) {
const callbackResult = await callback(elements);
continueProcess = callbackResult || callbackResult === undefined;
} else {
listing.push(...elements);
}
};
while (continueProcess && hasNextPage) {
// Force options to prevent connection format and manage search after
const opts = { ...options, first, after: searchAfter, connectionFormat: false };
const elements = await elPaginate(context, user, indices, opts);
if (!infinite && (elements.length === 0 || elements.length < (first ?? MAX_SEARCH_SIZE))) {
if (elements.length > 0) {
await publish(elements);
}
hasNextPage = false;
} else if (elements.length > 0) {
const { sort } = elements[elements.length - 1];
searchAfter = offsetToCursor(sort);
await publish(elements);
}
}
return listing;
};

// relations
interface RelationFilters<T extends BasicStoreCommon> extends ListFilter<T> {
connectionFormat?: boolean;
Expand Down Expand Up @@ -456,10 +420,10 @@ export const listAllEntitiesForFilter = async (context: AuthContext, user: AuthU
return buildPagination(0, null, nodeElements, nodeElements.length);
};

export const listEntities: InternalListEntities = async (context, user, entityTypes, args = {}, noFiltersChecking = false) => {
export const listEntities: InternalListEntities = async (context, user, entityTypes, args = {}) => {
const { indices = READ_ENTITIES_INDICES } = args;
const { filters } = args;
const convertedFilters = (noFiltersChecking || !filters) ? filters : checkAndConvertFilters(filters);
const { filters, noFiltersChecking } = args;
const convertedFilters = noFiltersChecking ? filters : checkAndConvertFilters(filters);
// TODO Reactivate this test after global migration to typescript
// if (connectionFormat !== false) {
// throw UnsupportedError('List connection require connectionFormat option to false');
Expand All @@ -468,9 +432,9 @@ export const listEntities: InternalListEntities = async (context, user, entityTy
return elPaginate(context, user, indices, paginateArgs);
};
export const listAllEntities = async <T extends BasicStoreEntity>(context: AuthContext, user: AuthUser, entityTypes: Array<string>,
args: EntityOptions<T> = {}, noFiltersChecking = false): Promise<Array<T>> => {
const { indices = READ_ENTITIES_INDICES } = args;
const filters = (noFiltersChecking || !args.filters) ? args.filters : checkAndConvertFilters(args.filters);
args: EntityOptions<T> = {}): Promise<Array<T>> => {
const { indices = READ_ENTITIES_INDICES, noFiltersChecking = false } = args;
const filters = noFiltersChecking ? args.filters : checkAndConvertFilters(args.filters);
const paginateArgs = buildEntityFilters({ entityTypes, filters, ...args });
return elList(context, user, indices, paginateArgs);
};
Expand Down
23 changes: 13 additions & 10 deletions opencti-platform/opencti-graphql/src/database/middleware.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,9 @@ const batchListThrough = async (context, user, sources, sourceSide, relationType
filters,
types: [relationType],
connectionFormat: false,
noFiltersChecking: true,
search,
}, true);
});
// For each relation resolved the target entity
// Filter on element id if necessary
let targetIds = R.uniq(relations.map((s) => s[`${opposite}Id`]));
Expand Down Expand Up @@ -391,11 +392,9 @@ export const listThings = async (context, user, thingsTypes, args = {}) => {
const paginateArgs = buildEntityFilters({ types: thingsTypes, ...args, filters: convertedFilters });
return elPaginate(context, user, indices, paginateArgs);
};
export const listAllThings = async (context, user, thingsTypes, args = {}, noFiltersChecking = false) => {
const { indices = READ_DATA_INDICES } = args;
const convertedFilters = (noFiltersChecking || !args.filters)
? args.filters
: checkAndConvertFilters(args.filters);
export const listAllThings = async (context, user, thingsTypes, args = {}) => {
const { indices = READ_DATA_INDICES, noFiltersChecking = false } = args;
const convertedFilters = noFiltersChecking ? args.filters : checkAndConvertFilters(args.filters);
const paginateArgs = buildEntityFilters({ types: thingsTypes, ...args, filters: convertedFilters });
return elList(context, user, indices, paginateArgs);
};
Expand Down Expand Up @@ -1055,8 +1054,9 @@ const listEntitiesByHashes = async (context, user, type, hashes) => {
filters: [{ key: 'hashes.*', values: searchHashes, operator: 'wildcard' }],
filterGroups: [],
},
noFiltersChecking: true,
connectionFormat: false,
}, true);
});
};
export const hashMergeValidation = (instances) => {
// region Specific check for observables with hashes
Expand Down Expand Up @@ -1879,9 +1879,10 @@ export const updateAttributeMetaResolved = async (context, user, initial, inputs
filters: [{ key: 'user_email', values: [initial.contact_information] }],
filterGroups: [],
},
noFiltersChecking: true,
connectionFormat: false
};
const users = await listEntities(context, SYSTEM_USER, [ENTITY_TYPE_USER], args, true);
const users = await listEntities(context, SYSTEM_USER, [ENTITY_TYPE_USER], args);
if (users.length > 0) {
throw FunctionalError('Cannot update an individual corresponding to a user');
}
Expand Down Expand Up @@ -2108,9 +2109,10 @@ export const updateAttributeMetaResolved = async (context, user, initial, inputs
filters: [{ key: 'contact_information', values: [updatedInstance.user_email] }],
filterGroups: [],
},
noFiltersChecking: true,
connectionFormat: false
};
const individuals = await listEntities(context, user, [ENTITY_TYPE_IDENTITY_INDIVIDUAL], args, true);
const individuals = await listEntities(context, user, [ENTITY_TYPE_IDENTITY_INDIVIDUAL], args);
if (individuals.length > 0) {
const individualId = R.head(individuals).id;
const patch = {
Expand Down Expand Up @@ -3388,9 +3390,10 @@ export const internalDeleteElementById = async (context, user, id, opts = {}) =>
filters: [{ key: 'user_email', values: [element.contact_information] }],
filterGroups: [],
},
noFiltersChecking: true,
connectionFormat: false
};
const users = await listEntities(context, SYSTEM_USER, [ENTITY_TYPE_USER], args, true);
const users = await listEntities(context, SYSTEM_USER, [ENTITY_TYPE_USER], args);
if (users.length > 0) {
throw FunctionalError('Cannot delete an individual corresponding to a user');
}
Expand Down
4 changes: 2 additions & 2 deletions opencti-platform/opencti-graphql/src/domain/backgroundTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ export const findById = async (context, user, taskId) => {
return storeLoadById(context, user, taskId, ENTITY_TYPE_BACKGROUND_TASK);
};

export const findAll = (context, user, args, noFiltersChecking = false) => {
return listEntities(context, user, [ENTITY_TYPE_BACKGROUND_TASK], args, noFiltersChecking);
export const findAll = (context, user, args) => {
return listEntities(context, user, [ENTITY_TYPE_BACKGROUND_TASK], args);
};

const buildQueryFiltersContent = (adaptedFiltersGroup) => {
Expand Down
4 changes: 2 additions & 2 deletions opencti-platform/opencti-graphql/src/domain/user.js
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,8 @@ export const buildCompleteUser = async (context, client) => {
filters: [{ key: 'contact_information', values: [client.user_email] }],
filterGroups: [],
};
const args = { filters: contactInformationFilter, connectionFormat: false };
const individualsPromise = listEntities(context, SYSTEM_USER, [ENTITY_TYPE_IDENTITY_INDIVIDUAL], args, true);
const args = { filters: contactInformationFilter, connectionFormat: false, noFiltersChecking: true };
const individualsPromise = listEntities(context, SYSTEM_USER, [ENTITY_TYPE_IDENTITY_INDIVIDUAL], args);
const organizationsPromise = batchOrganizations(context, SYSTEM_USER, client.id, { ...batchOpts, withInferences: false });
const userGroupsPromise = listThroughGetTo(context, SYSTEM_USER, client.id, RELATION_MEMBER_OF, ENTITY_TYPE_GROUP);
const settings = await getEntityFromCache(context, SYSTEM_USER, ENTITY_TYPE_SETTINGS);
Expand Down
7 changes: 4 additions & 3 deletions opencti-platform/opencti-graphql/src/http/httpRollingFeed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { ForbiddenAccess } from '../config/errors';
import { BYPASS, executionContext, SYSTEM_USER } from '../utils/access';
import { findById as findFeed } from '../domain/feed';
import type { AuthUser } from '../types/user';
import { listThings } from '../database/middleware';
import { listAllThings } from '../database/middleware';
import { minutesAgo } from '../utils/format';
import { isNotEmptyField } from '../database/utils';
import { convertFiltersToQueryOptions } from '../utils/filtering/filtering-resolution';
Expand Down Expand Up @@ -69,8 +69,9 @@ const initHttpRollingFeeds = (app: Express.Application) => {
const field = feed.feed_date_attribute ?? 'created_at';
const extraOptions = { defaultTypes: feed.feed_types, field, orderMode: 'desc', after: fromDate };
const options = await convertFiltersToQueryOptions(context, user, filters, extraOptions);
const args = { connectionFormat: false, first: SIZE_LIMIT, ...options };
const elements = await listThings(context, user, feed.feed_types, args);
const args = { connectionFormat: false, maxSize: SIZE_LIMIT, ...options };
const paginateElements = await listAllThings(context, user, feed.feed_types, args);
const elements = R.take(SIZE_LIMIT, paginateElements); // Due to pagination, number of results can be slightly superior
if (feed.include_header) {
res.write(`${feed.feed_attributes.map((a) => a.attribute).join(',')}\r\n`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ const initActivityManager = () => {
filters: [{ key: ['event_access'], values: ['EXISTS'] }],
filterGroups: [],
},
}, true);
noFiltersChecking: true
});
let lastEventId = '0-0';
if (histoElements.length > 0) {
const histoDate = histoElements[0].timestamp;
Expand Down
4 changes: 2 additions & 2 deletions opencti-platform/opencti-graphql/src/manager/cacheManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ const platformRunningPlaybooks = (context: AuthContext) => {
filters: [{ key: ['playbook_running'], values: ['true'] }],
filterGroups: [],
};
const opts = { filters, connectionFormat: false };
return findAllPlaybooks(context, SYSTEM_USER, opts, true);
const opts = { filters, noFiltersChecking: true, connectionFormat: false };
return findAllPlaybooks(context, SYSTEM_USER, opts);
};
return { values: null, fn: reloadPlaybooks };
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import { lockResource, redisDeleteWorks, redisGetConnectorStatus, redisGetWork }
import conf, { booleanConf, logApp } from '../config/conf';
import { TYPE_LOCK_ERROR } from '../config/errors';
import { connectors } from '../database/repository';
import { elDeleteInstances, elUpdate } from '../database/engine';
import { elList } from '../database/middleware-loader';
import { elDeleteInstances, elList, elUpdate } from '../database/engine';
import { executionContext, SYSTEM_USER } from '../utils/access';
import { INDEX_HISTORY } from '../database/utils';
import { now, sinceNowInDays } from '../utils/format';
Expand Down Expand Up @@ -66,6 +65,7 @@ const closeOldWorks = async (context, connector) => {
};
await elList(context, SYSTEM_USER, [INDEX_HISTORY], {
filters,
noFiltersChecking: true,
types: ['Work'],
connectionFormat: false,
callback: queryCallback
Expand Down Expand Up @@ -93,6 +93,7 @@ const deleteCompletedWorks = async (context, connector) => {
await elList(context, SYSTEM_USER, [INDEX_HISTORY], {
filters,
types: ['Work'],
noFiltersChecking: true,
connectionFormat: false,
callback: queryCallback
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ const revokedInstances = async (context) => {
],
filterGroups: [],
};
const opts = { filters, connectionFormat: false, callback };
const opts = { filters, noFiltersChecking: true, connectionFormat: false, callback };
await elList(context, SYSTEM_USER, READ_DATA_INDICES, opts);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ const initHistoryManager = () => {
filters: [{ key: ['event_access'], values: [], operator: FilterOperator.Nil }],
filterGroups: [],
},
}, true);
noFiltersChecking: true
});
let lastEventId = '0-0';
if (histoElements.length > 0) {
const histoDate = histoElements[0].timestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ const rssExecutor = async (context: AuthContext, turndownService: TurndownServic
filters: [{ key: 'ingestion_running', values: [true] }],
filterGroups: [],
};
const ingestions = await findAllRssIngestions(context, SYSTEM_USER, { filters, connectionFormat: false }, true);
const opts = { filters, connectionFormat: false, noFiltersChecking: true };
const ingestions = await findAllRssIngestions(context, SYSTEM_USER, opts);
const ingestionPromises = [];
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
Expand Down Expand Up @@ -249,7 +250,8 @@ const taxiiExecutor = async (context: AuthContext) => {
filters: [{ key: 'ingestion_running', values: [true] }],
filterGroups: [],
};
const ingestions = await findAllTaxiiIngestions(context, SYSTEM_USER, { filters, connectionFormat: false }, true);
const opts = { filters, connectionFormat: false, noFiltersChecking: true };
const ingestions = await findAllTaxiiIngestions(context, SYSTEM_USER, opts);
const ingestionPromises = [];
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ export const buildTargetEvents = async (
const { data: { data }, event: eventType } = streamEvent;
const { event_types, notifiers, instance_trigger, filters } = trigger;
let finalFilters = filters ? JSON.parse(filters) : null;
if (useSideEventMatching) { // modifiy filters to look for instance trigger side events
if (useSideEventMatching) { // modify filters to look for instance trigger side events
finalFilters = replaceFilterKey(JSON.parse(trigger.filters), CONNECTED_TO_INSTANCE_FILTER, CONNECTED_TO_INSTANCE_SIDE_EVENTS_FILTER);
}
let triggerEventTypes = event_types;
Expand Down
Loading

0 comments on commit a1f2ceb

Please sign in to comment.