Skip to content

Commit

Permalink
RA-26080: updated new script as will not change the name of ES index
Browse files Browse the repository at this point in the history
  • Loading branch information
nareshkharola123 committed Oct 22, 2020
1 parent 6efcc94 commit 23b289e
Show file tree
Hide file tree
Showing 4 changed files with 558 additions and 0 deletions.
111 changes: 111 additions & 0 deletions newReindexAuditLogs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@

const elasticClient = require('./elastic-config');
const redisClient = require('./redis');
const utils = require('./newUtils');

const MAX_DOC_COUNT = 50;


async function callBulkAPI(elindex) {
const bulkResponse = await elasticClient.bulk({
body: elindex,
timeout: '5m',
filterPath:'items.index._id,errors'
});
if(bulkResponse.errors) {
throw new Error(JSON.stringify(bulkResponse));
}
return bulkResponse;
}

async function bulkIndexAuditLogs(rideId, auditLogs, rideRange) {
const elindex = [];
auditLogs.forEach((value) => {
utils.filterObject(value, 'legToFlatFeeMapping ');
utils.filterObject(value, 'changed_legs');

elindex.push({
index: {
_index: rideRange.index,
_type: 'doc',
_id: value._id,
},
});
const source = {...value._source, index_type: 'audit_log' }
elindex.push(source);
});
try {
await callBulkAPI(elindex);
} catch(err) {
console.error(err)
elasticClient.index({
index: 'elastic_cleanup_errors',
type: 'doc',
body: {
ride_id: rideId,
err: err
},
});
}
}

async function reindexAuditLogs(rideRange) {
for (let rideIndex = rideRange.startRideId; rideIndex <= rideRange.endRideId; rideIndex++) {
const auditLogQuery = {
index: 'rides',
size: MAX_DOC_COUNT,
scroll: '1m',
body: {
query: {
match: {
'ride_details_id': rideIndex,
},
},
},
};
let { hits: { total: count, hits: auditLogs }, _scroll_id: scrollId } = await elasticClient.search(auditLogQuery);
if(auditLogs && auditLogs.length) {
await bulkIndexAuditLogs(rideIndex, auditLogs, rideRange);
}
while(scrollId && count > MAX_DOC_COUNT) {
const { hits: { hits: auditLogs }, _scroll_id: newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId});
scrollId = newScrollId;
if (auditLogs && auditLogs.length) {
await bulkIndexAuditLogs(rideIndex, auditLogs, rideRange);
} else {
break;
}
}
redisClient.set(`elasticCleanUp:${rideRange.index}:rideId`, rideIndex);
}
}

async function reindexJob() {
try {
const [ startRideId, endRideId ] = process.argv.slice(2);
const rangeIndex = 'rides_modified';
let lastProcessedRideId = await redisClient.get(`elasticCleanUp:${rangeIndex}:rideId`);
const rideRangeEvent = {
index: rangeIndex,
startRideId: parseInt(startRideId, 10),
endRideId: parseInt(endRideId, 10)
};
const isIndexExists = await elasticClient.indices.exists({index: rangeIndex})
if(!isIndexExists) {
console.error('Creating Index');
await utils.createIndex(rangeIndex, 'rides');
console.error('Index Created');
}
if(parseInt(lastProcessedRideId, 10)) {
rideRangeEvent.startRideId = parseInt(lastProcessedRideId, 10) + 1;
}
if(rideRangeEvent.startRideId <= parseInt(rideRangeEvent.endRideId, 10)) {
await reindexAuditLogs(rideRangeEvent);
}
} catch(err) {
console.error(err);
}

console.log('::__reIndexJob finished__::')
}
reindexJob();
159 changes: 159 additions & 0 deletions newReindexLogstashLogs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
const moment = require('moment');
const elasticClient = require('./elastic-config');
const redisClient = require('./redis');
const utils = require('./newUtils');

const INDEX_NAME = 'logstash';
const MAX_DOC_COUNT = 50;
async function callBulkAPI(elindex) {
if (elindex.length) {
const bulkResponse = await elasticClient.bulk({
body: elindex,
timeout: '5m',
filterPath:'items.index._id,errors'
});
if(bulkResponse.errors) {
console.error('ERROR RESPONSE_____________________________');
console.error(JSON.stringify(bulkResponse));
throw new Error(JSON.stringify(bulkResponse));
}
return bulkResponse;
}
return null;
}

async function bulkIndexLogstashLogs(logstashLogs, indexName, startMoment) {
const startDate = startMoment.format('YYYY-MM-DD HH:mm:ss.SSSZ');
const { _source: { createdAt : latestLogCreatedAt }} = logstashLogs.reduce((prevVal, currentVal) => {
const { _source: { createdAt: prevCreatedAt }} = prevVal;
const { _source: { createdAt: currentCreatedAt }} = currentVal
const diffValue = moment(prevCreatedAt).diff(moment(currentCreatedAt));
if(diffValue < 0) {
return currentVal;
}
return prevVal;
}, { _source: { createdAt: startDate }});
try {
const elindex = [];
logstashLogs.forEach((value) => {
const {_source: source, _type: type, _id: id} = value;
elindex.push({
index: {
_index: indexName,
_type: 'doc',
_id: id
},
});
const logValue = JSON.parse(JSON.stringify(source));
logValue.index_type = type;
elindex.push(logValue);
});
const retValue = await callBulkAPI(elindex);
if(retValue) {
for(let bulkResIndex = 0; bulkResIndex < retValue.items.length; bulkResIndex+=1) {
if (retValue.items[bulkResIndex].errors) {
const { index: {_id: logstashId } } = retValue.items[bulkResIndex]
elasticClient.index({
index: 'elastic_cleanup_logstash_errors',
type: 'doc',
body: {
logstash_id: logstashId,
meta: JSON.stringify(retValue.items[bulkResIndex]),
},
});
}
}
}
return latestLogCreatedAt;
} catch (err) {
if(logstashLogs.length) {
logstashLogs.forEach((value, key) => {
elasticClient.index({
index: 'elastic_cleanup_logstash_errors',
type: 'doc',
body: {
logstash_id: value._id,
meta: JSON.stringify(err),
},
});
});
} else {
elasticClient.index({
index: 'elastic_cleanup_logstash_errors',
type: 'doc',
body: {
meta: JSON.stringify(error),
},
});
}
}
}

const reindexLogstashLogs = async function reindexLogstash(year, month) {
const startDateInRedis = await redisClient.get(`elasticCleanUp:logstashLastProcessedDate${month}${year}`);
let startMoment = moment(`${year}-${month}`,'YYYY-MM').startOf('month');
const endMoment = moment(`${year}-${month}`,'YYYY-MM').endOf('month');
const monthName = moment().month(month - 1).format('MMMM').toLowerCase();
const indexName = 'logstash_modified';
// Check for index existence otherwise create one.
try {
const indexExists = await elasticClient.indices.exists({ index: indexName });
if(!indexExists) {
await utils.createLogstashIndex(indexName, 'logstash');
}
console.error(startDateInRedis);
if(startDateInRedis) {
startMoment = moment(startDateInRedis);
}
const logstashQuery = {
index: INDEX_NAME,
scroll:'1m',
body: {
size: MAX_DOC_COUNT,
query: {
range: {
createdAt: {
gte: startMoment.format('YYYY-MM-DDTHH:mm:ss.SSSZ'),
lte: endMoment.format('YYYY-MM-DDTHH:mm:ss.SSSZ'),
format: `yyyy-MM-dd'T'HH:mm:ss.SSSZ`,
},
},
},
sort: [
{
createdAt: 'asc',
},
],
},
};
const res = await elasticClient.search(logstashQuery);
let { hits: { hits: logstashLogs, total: count }, _scroll_id : scrollId } = res;
if (logstashLogs && logstashLogs.length) {
let lastProcessedDate = await bulkIndexLogstashLogs(logstashLogs, indexName, startMoment);
await redisClient.set(`elasticCleanUp:logstashLastProcessedDate${month}${year}`, lastProcessedDate);
if(count > MAX_DOC_COUNT) {
while(scrollId && logstashLogs && logstashLogs.length) {
const { hits: { hits: logs }, _scroll_id : newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId });
scrollId = newScrollId;
if (logs && logs.length) {
lastProcessedDate = await bulkIndexLogstashLogs(logs, indexName, startMoment);
} else {
break;
}
await redisClient.set(`elasticCleanUp:logstashLastProcessedDate${month}${year}`, lastProcessedDate);
}
}
}
} catch(error) {
console.error(error);
throw error;
}

console.log('::__reindexLogstashLogs finished__::')
}

async function reindexJob() {
const [year, month ] = process.argv.slice(2);
await reindexLogstashLogs(year, month);
}
reindexJob();
Loading

0 comments on commit 23b289e

Please sign in to comment.