Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backend] update or index internalFile to avoid duplicates (#8235) #8299

Merged
merged 8 commits into from
Sep 16, 2024
8 changes: 6 additions & 2 deletions opencti-platform/opencti-graphql/src/database/engine.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export const ES_MAX_PAGINATION = conf.get('elasticsearch:max_pagination_result')
export const MAX_BULK_OPERATIONS = conf.get('elasticsearch:max_bulk_operations') || 5000;
export const MAX_RUNTIME_RESOLUTION_SIZE = conf.get('elasticsearch:max_runtime_resolutions') || 5000;
export const MAX_RELATED_CONTAINER_RESOLUTION = conf.get('elasticsearch:max_container_resolutions') || 1000;
const ES_INDEX_PATTERN_SUFFIX = conf.get('elasticsearch:index_creation_pattern');
export const ES_INDEX_PATTERN_SUFFIX = conf.get('elasticsearch:index_creation_pattern');
const ES_MAX_RESULT_WINDOW = conf.get('elasticsearch:max_result_window') || 100000;
const ES_INDEX_SHARD_NUMBER = conf.get('elasticsearch:number_of_shards');
const ES_INDEX_REPLICA_NUMBER = conf.get('elasticsearch:number_of_replicas');
Expand Down Expand Up @@ -556,7 +556,11 @@ const buildUserMemberAccessFilter = (user, opts) => {

export const elIndexExists = async (indexName) => {
const existIndex = await engine.indices.exists({ index: indexName });
return oebp(existIndex) === true;
return existIndex === true || oebp(existIndex) === true || existIndex.body === true;
};
export const elIndexGetAlias = async (indexName) => {
const indexAlias = await engine.indices.getAlias({ index: indexName });
return oebp(indexAlias);
};
export const elPlatformIndices = async () => {
const listIndices = await engine.cat.indices({ index: `${ES_INDEX_PREFIX}*`, format: 'JSON' });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ export const moveAllFilesFromEntityToAnother = async (context: AuthContext, user
const sourceFileS3Id = `${sourcePath}/${sourceFileDocument.name}`;
const targetFileS3Id = `${targetPath}/${sourceFileDocument.name}`;
logApp.info(`[FILE STORAGE] Moving from ${sourceFileS3Id} to: ${targetFileS3Id}`);
const newFile = await copyFile(sourceFileS3Id, targetFileS3Id, sourceFileDocument, targetEntity.internal_id);
const copyProps = { sourceId: sourceFileS3Id, targetId: targetFileS3Id, sourceDocument: sourceFileDocument, targetEntityId: targetEntity.internal_id };
const newFile = await copyFile(context, copyProps);
if (newFile) {
const newFileForEntity = storeFileConverter(user, newFile);
updatedXOpenctiFiles.push(newFileForEntity);
Expand Down
16 changes: 8 additions & 8 deletions opencti-platform/opencti-graphql/src/database/file-storage.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ export const downloadFile = async (id) => {
/**
* - Copy file from a place to another in S3
* - Store file in documents
* @param sourceId
* @param targetId
* @param sourceDocument
* @param targetEntityId
* @returns {Promise<null|void>} the document entity on success, null on errors.
* @param context
* @param user
* @param {{sourceId: string, targetId: string, sourceDocument: BasicStoreEntityDocument, targetEntityId: string}} copyProps
* @returns {Promise<null|File>} the document entity on success, null on errors.
*/
export const copyFile = async (sourceId, targetId, sourceDocument, targetEntityId) => {
export const copyFile = async (context, copyProps) => {
const { sourceId, targetId, sourceDocument, targetEntityId } = copyProps;
try {
const input = {
Bucket: bucketName,
Expand All @@ -198,7 +198,7 @@ export const copyFile = async (sourceId, targetId, sourceDocument, targetEntityI
metaData: targetMetadata,
uploadStatus: 'complete',
};
await indexFileToDocument(file);
await indexFileToDocument(context, file);
logApp.info('[FILE STORAGE] Copy file to S3 in success', { document: file, sourceId, targetId });
return file;
} catch (err) {
Expand Down Expand Up @@ -494,7 +494,7 @@ export const upload = async (context, user, filePath, fileUpload, opts) => {
metaData: { ...fullMetadata, messages: [], errors: [], file_markings },
uploadStatus: 'complete',
};
await indexFileToDocument(file);
await indexFileToDocument(context, file);

const isFilePathForImportEnrichment = filePath.startsWith('import/') && !filePath.startsWith('import/pending');
if (!noTriggerImport && isFilePathForImportEnrichment) {
Expand Down
4 changes: 4 additions & 0 deletions opencti-platform/opencti-graphql/src/database/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ export const INDEX_INFERRED_RELATIONSHIPS = `${ES_INDEX_PREFIX}_inferred_relatio
export const READ_INDEX_INFERRED_RELATIONSHIPS = `${INDEX_INFERRED_RELATIONSHIPS}*`;
export const isInferredIndex = (index) => index.startsWith(INDEX_INFERRED_ENTITIES) || index.startsWith(INDEX_INFERRED_RELATIONSHIPS);

// indices that we only use as read only, not created anymore on new platforms
export const DEPRECATED_INDICES = [
INDEX_STIX_CYBER_OBSERVABLE_RELATIONSHIPS,
];
export const WRITE_PLATFORM_INDICES = [
INDEX_DELETED_OBJECTS,
INDEX_FILES,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { groupBy } from 'ramda';
import { logApp } from '../config/conf';
import { listAllEntities } from '../database/middleware-loader';
import { ENTITY_TYPE_INTERNAL_FILE } from '../schema/internalObject';
import { elDeleteInstances, elIndexGetAlias } from '../database/engine';
import { executionContext, SYSTEM_USER } from '../utils/access';
import { READ_INDEX_INTERNAL_OBJECTS } from '../database/utils';

const message = '[MIGRATION] Delete potential files duplicates after index rollover';

export const up = async (next) => {
logApp.info(`${message} > started`);
const context = executionContext('migration');
// test if there are multiple indices for internal objects
const internalObjectsIndexAlias = await elIndexGetAlias(READ_INDEX_INTERNAL_OBJECTS);
if (internalObjectsIndexAlias && Object.keys(internalObjectsIndexAlias).length > 1) {
logApp.info(`${message} > multiple indices found for internal objects, running migration`);
const allFiles = await listAllEntities(
context,
SYSTEM_USER,
[ENTITY_TYPE_INTERNAL_FILE],
{ indices: [READ_INDEX_INTERNAL_OBJECTS], baseData: true, baseFields: ['internal_id', '_index', 'lastModified'] }
);
const filesGroupedById = Object.entries(groupBy((f) => f.internal_id, allFiles));
const filesToDelete = [];
filesGroupedById.forEach(([_, filesList]) => {
if (filesList.length > 1) { // if a duplicate exists
const sortedFileList = filesList.sort((a, b) => new Date(b.lastModified) - new Date(a.lastModified));
filesToDelete.push(...sortedFileList.slice(1));
}
});
const finalFilesToDelete = filesToDelete.map((h) => ({ _index: h._index, internal_id: h.internal_id }));
logApp.info(`Deleting ${finalFilesToDelete.length} files that have duplicates.`);
// delete the files
await elDeleteInstances(finalFilesToDelete);
} else {
logApp.info(`${message} > no multiple indices found for internal objects, no need to run migration`);
}
logApp.info(`${message} > done`);
next();
};

export const down = async (next) => {
next();
};
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import moment from 'moment';
import { generateFileIndexId } from '../../../schema/identifier';
import { ENTITY_TYPE_INTERNAL_FILE } from '../../../schema/internalObject';
import { elAggregationCount, elCount, elDeleteInstances, elIndex } from '../../../database/engine';
import { elAggregationCount, elCount, elDeleteInstances, elIndex, elUpdate } from '../../../database/engine';
import { INDEX_INTERNAL_OBJECTS, isEmptyField, isNotEmptyField, READ_INDEX_INTERNAL_OBJECTS } from '../../../database/utils';
import { type EntityOptions, type FilterGroupWithNested, internalLoadById, listAllEntities, listEntitiesPaginated, storeLoadById } from '../../../database/middleware-loader';
import type { AuthContext, AuthUser } from '../../../types/user';
Expand Down Expand Up @@ -48,9 +48,14 @@
};
};

export const indexFileToDocument = async (file: any) => {
export const indexFileToDocument = async (context: AuthContext, file: any) => {
const data = buildFileDataForIndexing(file);
await elIndex(INDEX_INTERNAL_OBJECTS, data);
const internalFile = await storeLoadById(context, SYSTEM_USER, data.internal_id, ENTITY_TYPE_INTERNAL_FILE);
if (internalFile) {
// update existing internalFile (if file has been saved in another index)
return elUpdate(internalFile._index, internalFile.internal_id, data);
}

Check warning on line 57 in opencti-platform/opencti-graphql/src/modules/internal/document/document-domain.ts

View check run for this annotation

Codecov / codecov/patch

opencti-platform/opencti-graphql/src/modules/internal/document/document-domain.ts#L56-L57

Added lines #L56 - L57 were not covered by tests
return elIndex(INDEX_INTERNAL_OBJECTS, data);
};

export const deleteDocumentIndex = async (context: AuthContext, user: AuthUser, id: string) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,24 @@ import {
elIndex,
elIndexElements,
elIndexExists,
elIndexGetAlias,
elLoadById,
elPaginate,
elRebuildRelation,
ES_INDEX_PATTERN_SUFFIX,
ES_MAX_PAGINATION,
searchEngineInit
} from '../../../src/database/engine';
import {
DEPRECATED_INDICES,
ES_INDEX_PREFIX,
INDEX_INTERNAL_OBJECTS,
READ_DATA_INDICES,
READ_ENTITIES_INDICES,
READ_INDEX_INTERNAL_OBJECTS,
READ_INDEX_INTERNAL_RELATIONSHIPS,
READ_INDEX_STIX_CORE_RELATIONSHIPS,
READ_INDEX_STIX_CYBER_OBSERVABLE_RELATIONSHIPS,
READ_INDEX_STIX_CYBER_OBSERVABLES,
READ_INDEX_STIX_DOMAIN_OBJECTS,
READ_INDEX_STIX_META_OBJECTS,
READ_INDEX_STIX_META_RELATIONSHIPS,
READ_INDEX_STIX_SIGHTING_RELATIONSHIPS,
READ_RELATIONSHIPS_INDICES
READ_RELATIONSHIPS_INDICES,
WRITE_PLATFORM_INDICES
} from '../../../src/database/utils';
import { utcDate } from '../../../src/utils/format';
import { ADMIN_USER, buildStandardUser, testContext, TESTING_GROUPS, TESTING_ROLES, TESTING_USERS } from '../../utils/testQuery';
Expand All @@ -50,17 +48,27 @@ const elWhiteUser = async () => {
};

describe('Elasticsearch configuration test', () => {
it('should configuration correct', () => {
it('should configuration correct', async () => {
expect(searchEngineInit()).resolves.toBeTruthy();
expect(elIndexExists(READ_INDEX_INTERNAL_OBJECTS)).toBeTruthy();
expect(elIndexExists(READ_INDEX_STIX_SIGHTING_RELATIONSHIPS)).toBeTruthy();
expect(elIndexExists(READ_INDEX_STIX_CORE_RELATIONSHIPS)).toBeTruthy();
expect(elIndexExists(READ_INDEX_STIX_DOMAIN_OBJECTS)).toBeTruthy();
expect(elIndexExists(READ_INDEX_STIX_META_OBJECTS)).toBeTruthy();
expect(elIndexExists(READ_INDEX_STIX_META_RELATIONSHIPS)).toBeTruthy();
expect(elIndexExists(READ_INDEX_STIX_CYBER_OBSERVABLE_RELATIONSHIPS)).toBeTruthy();
expect(elIndexExists(READ_INDEX_INTERNAL_RELATIONSHIPS)).toBeTruthy();
expect(elIndexExists(READ_INDEX_STIX_CYBER_OBSERVABLES)).toBeTruthy();
// check all WRITE_PLATFORM_INDICES creation
for (let i = 0; i < WRITE_PLATFORM_INDICES.length; i += 1) {
const indexName = WRITE_PLATFORM_INDICES[i];
const indexExists = await elIndexExists(indexName);
expect(indexExists).toBeTruthy();
}
for (let i = 0; i < DEPRECATED_INDICES.length; i += 1) {
const indexName = DEPRECATED_INDICES[i];
expect(await elIndexExists(indexName)).toBeFalsy();
}
});
it('should get internal object index with alias', async () => {
const internalObjectsIndexAlias = await elIndexGetAlias(READ_INDEX_INTERNAL_OBJECTS);
// internalObjectsIndexAlias = {"test_internal_objects-000001":{"aliases":{"test_internal_objects":{}}}}
const numberOfIndices = Object.keys(internalObjectsIndexAlias).length;
expect(internalObjectsIndexAlias).toBeDefined();
expect(numberOfIndices).toEqual(1);
expect(Object.entries(internalObjectsIndexAlias)[0][0]).toEqual(`${INDEX_INTERNAL_OBJECTS}${ES_INDEX_PATTERN_SUFFIX}`);
expect(Object.entries(internalObjectsIndexAlias)[0][1].aliases).toEqual({ [INDEX_INTERNAL_OBJECTS]: {} });
});
});

Expand Down