Skip to content

Commit

Permalink
Added error logging for logstash logs
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhisood committed Aug 17, 2020
1 parent 97cc15c commit 8f3a197
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 113 deletions.
2 changes: 2 additions & 0 deletions reindexAuditLogs.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ async function reindexJob() {
};
const isIndexExists = await elasticClient.indices.exists({index: rangeIndex})
if(!isIndexExists) {
console.error('Creating Index');
await utils.createIndex(rangeIndex, 'rides');
console.error('Index Created');
}
if(parseInt(lastProcessedRideId, 10)) {
rideRangeEvent.startRideId = parseInt(lastProcessedRideId, 10) + 1;
Expand Down
174 changes: 67 additions & 107 deletions reindexLogstashLogs.js
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
const moment = require('moment');
const elasticClient = require('./elastic-config');
const redisClient = require('./redis');
const utils = require('./utils');

const INDEX_NAME = 'logstash';
const MAX_DOC_COUNT = 20;
const MAX_DOC_COUNT = 50;
async function callBulkAPI(elindex) {
if (elindex.length) {
const bulkResponse = await elasticClient.bulk({
body: elindex,
timeout: '5m',
});
if(bulkResponse.errors) {
console.error('ERROR RESPONSE_____________________________');
console.error(JSON.stringify(bulkResponse));
throw new Error(JSON.stringify(bulkResponse));
}
return bulkResponse;
}
return null;
}

async function bulkIndexLogstashLogs(logstashLogs, indexName, startMoment) {
const startDate = startMoment.format('YYYY-MM-DD HH:mm:ss.SSSZ');
const { _source: { createdAt : latestLogCreatedAt }} = logstashLogs.reduce((prevVal, currentVal) => {
Expand Down Expand Up @@ -59,137 +66,90 @@ async function bulkIndexLogstashLogs(logstashLogs, indexName, startMoment) {
}
return latestLogCreatedAt;
} catch (err) {
console.error('handled error logged in elastic for this batch');
throw err;
}
}
async function createIndex(indexName) {
console.log('info', `Creating index with name ${indexName}`);
await elasticClient.indices.create({
index: indexName,
body: {
settings: {
index: {
'mapping.total_fields.limit': 70000,
number_of_shards: 1,
number_of_replicas: 0,
refresh_interval: -1,
if(logstashLogs.length) {
logstashLogs.forEach((value, key) => {
elasticClient.index({
index: 'elastic_cleanup_logstash_errors',
type: 'doc',
body: {
logstash_id: value._id,
meta: JSON.stringify(err),
},
});
});
} else {
elasticClient.index({
index: 'elastic_cleanup_logstash_errors',
type: 'doc',
body: {
meta: JSON.stringify(error),
},
},
},
});
const result = await elasticClient.indices.getMapping({ index: INDEX_NAME });
console.log('info', 'got result', result);
const { mappings } = result[INDEX_NAME];
console.log('info', 'mappings', mappings);
const properties = {};
properties.index_type = { type: 'keyword' };
Object.entries(mappings).forEach(([key, value]) => {
if (value.properties) {
Object.entries(value.properties).forEach(([propKey, propValue]) => {
if (properties[propKey]) {
const prevProps = properties[propKey];
const newProps = { properties: {} };
if (propValue && (!propValue.type || propValue.type === 'nested') && prevProps.properties && propValue.properties) {
newProps.properties = { ...prevProps.properties, ...propValue.properties };
properties[propKey] = newProps;
} else {
properties[propKey] = { ...prevProps, ...propValue };
}
} else {
properties[propKey] = propValue;
}
});
}
});
await elasticClient.indices.putMapping({
index: indexName, type: 'doc', body: { properties, dynamic: false },
});
console.log('info', 'After putting mappings');
}
}

const reindexLogstashLogs = async function reindexLogstash(year, month) {
const startDateInRedis = await redisClient.get(`elasticCleanUp:logstash${month}${year}`);
const startDateInRedis = await redisClient.get(`elasticCleanUp:logstashLastProcessedDate${month}${year}`);
let startMoment = moment(`${year}-${month}`,'YYYY-MM').startOf('month');
const endMoment = moment(`${year}-${month}`,'YYYY-MM').endOf('month');
const monthName = moment().month(month - 1).format('MMMM').toLowerCase();
const indexName = `${INDEX_NAME}_${monthName.toString()}_${year}`;
// Check for index existence otherwise create one.
try {
const indexExists = await elasticClient.indices.exists({ index: indexName });
if(!indexExists) {
await createIndex(indexName);
await utils.createLogstashIndex(indexName, 'logstash');
}
console.error(startDateInRedis);
if(startDateInRedis) {
startMoment = moment(startDateInRedis);
}
let logstashLogs = [];

try {
const logstashQuery = {
index: INDEX_NAME,
scroll:'1m',
body: {
size: MAX_DOC_COUNT,
query: {
range: {
createdAt: {
gte: startMoment.format('YYYY-MM-DDTHH:mm:ss.SSSZ'),
lte: endMoment.format('YYYY-MM-DDTHH:mm:ss.SSSZ'),
format: `yyyy-MM-dd'T'HH:mm:ss.SSSZ`,
},
},
const logstashQuery = {
index: INDEX_NAME,
scroll:'1m',
body: {
size: MAX_DOC_COUNT,
query: {
range: {
createdAt: {
gte: startMoment.format('YYYY-MM-DDTHH:mm:ss.SSSZ'),
lte: endMoment.format('YYYY-MM-DDTHH:mm:ss.SSSZ'),
format: `yyyy-MM-dd'T'HH:mm:ss.SSSZ`,
},
sort: [
{
createdAt: 'asc',
},
],
},
};
const res = await elasticClient.search(logstashQuery);
let { hits: { hits: logstashLogs, total: count }, _scroll_id : scrollId } = res;
if (logstashLogs && logstashLogs.length) {
let lastProcessedDate = await bulkIndexLogstashLogs(logstashLogs, indexName, startMoment);
await redisClient.set(`elasticCleanUp:logstashLastProcessedDate${month}${year}`, lastProcessedDate);
if(count > MAX_DOC_COUNT) {
while(scrollId && logstashLogs && logstashLogs.length) {
({ hits: { hits: logstashLogs }, _scroll_id : newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId }));
scrollId = newScrollId;
if (logstashLogs && logstashLogs.length) {
lastProcessedDate = await bulkIndexLogstashLogs(logstashLogs, indexName, startMoment);
} else {
break;
}
await redisClient.set(`elasticCleanUp:logstashLastProcessedDate${month}${year}`, lastProcessedDate);
}
},
sort: [
{
createdAt: 'asc',
},
],
},
};
const res = await elasticClient.search(logstashQuery);
let { hits: { hits: logstashLogs, total: count }, _scroll_id : scrollId } = res;
if (logstashLogs && logstashLogs.length) {
let lastProcessedDate = await bulkIndexLogstashLogs(logstashLogs, indexName, startMoment);
await redisClient.set(`elasticCleanUp:logstashLastProcessedDate${month}${year}`, lastProcessedDate);
if(count > MAX_DOC_COUNT) {
while(scrollId && logstashLogs && logstashLogs.length) {
const { hits: { hits: logs }, _scroll_id : newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId });
scrollId = newScrollId;
if (logs && logs.length) {
lastProcessedDate = await bulkIndexLogstashLogs(logs, indexName, startMoment);
} else {
break;
}
await redisClient.set(`elasticCleanUp:logstashLastProcessedDate${month}${year}`, lastProcessedDate);
}
}
console.error('Job completed');
return;
}
} catch(error) {
console.error(error);
if(logstashLogs.length) {
logstashLogs.forEach((value, key) => {
elasticClient.index({
index: 'elastic_cleanup_logstash_errors',
type: 'doc',
body: {
logstash_id: value._id,
meta: JSON.stringify(error),
},
});
});
} else {
elasticClient.index({
index: 'elastic_cleanup_logstash_errors',
type: 'doc',
body: {
meta: JSON.stringify(error),
},
});
}
throw error;
}
}

async function reindexJob() {
const [year, month ] = process.argv.slice(2);
await reindexLogstashLogs(year, month);
Expand Down
77 changes: 71 additions & 6 deletions utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,20 @@ const exceptionMappings = [
'ignoreWarning',
'override_cost_token',
'ignoreAddressWarning'
]
];

const logstashExceptionMappings = [
'updated_to',
'book_now',
'provider_request_id',
'flag',
'ignoreWarning',
'override_cost_token',
'ignoreAddressWarning',
'statusCode',
'Lyft fare'
];


const createIndex = async function createIndex(indexName, fromIndex) {
await elasticClient.indices.create({
Expand All @@ -35,7 +48,7 @@ const createIndex = async function createIndex(indexName, fromIndex) {
const prevProps = properties[propKey];
const newProps = { properties: {} };
if (propValue && (!propValue.type || propValue.type === 'nested') && prevProps.properties && propValue.properties) {
updateMappingType(propValue.properties);
updateMappingType(propValue.properties, exceptionMappings);
newProps.properties = { ...prevProps.properties, ...propValue.properties };
properties[propKey] = newProps;
} else {
Expand All @@ -49,7 +62,7 @@ const createIndex = async function createIndex(indexName, fromIndex) {
propValue.type = 'text';
}
if(!propValue.type || propValue.type === 'nested') {
updateMappingType(propValue.properties)
updateMappingType(propValue.properties, exceptionMappings)
}
properties[propKey] = propValue;
}
Expand All @@ -61,16 +74,68 @@ const createIndex = async function createIndex(indexName, fromIndex) {
index: indexName, type: 'doc', body: { properties, dynamic: false },
});
}

const createLogstashIndex = async function createLogstashIndex(indexName, fromIndex) {
await elasticClient.indices.create({
index: indexName,
body: {
settings: {
index: {
'mapping.total_fields.limit': 70000,
number_of_shards: 1,
number_of_replicas: 0,
refresh_interval: -1,
},
},
},
});
const result = await elasticClient.indices.getMapping({ index: fromIndex });
const { mappings } = result[fromIndex];
const properties = {};
properties.index_type = { type: 'keyword' };
Object.entries(mappings).forEach(([key, value]) => {
if (value.properties) {
Object.entries(value.properties).forEach(([propKey, propValue]) => {
if (properties[propKey]) {
const prevProps = properties[propKey];
const newProps = { properties: {} };
if (propValue && (!propValue.type || propValue.type === 'nested') && prevProps.properties && propValue.properties) {
updateMappingType(propValue.properties, logstashExceptionMappings);
newProps.properties = { ...prevProps.properties, ...propValue.properties };
properties[propKey] = newProps;
} else {
if(logstashExceptionMappings.includes(propKey)) {
propValue.type = 'text';
}
properties[propKey] = propValue;
}
} else {
if(logstashExceptionMappings.includes(propKey)) {
propValue.type = 'text';
}
if(!propValue.type || propValue.type === 'nested') {
updateMappingType(propValue.properties, logstashExceptionMappings)
}
properties[propKey] = propValue;
}

});
}
});
await elasticClient.indices.putMapping({
index: indexName, type: 'doc', body: { properties, dynamic: false },
});
}

function updateMappingType(obj) {
function updateMappingType(obj, exceptionMappings) {
for (let mappingKey in obj) {
if (!obj.hasOwnProperty(mappingKey)) continue;
if (typeof obj[mappingKey] == 'object' && obj[mappingKey].properties) {
updateMappingType(obj[mappingKey].properties)
updateMappingType(obj[mappingKey].properties, exceptionMappings)
}
if (exceptionMappings.includes(mappingKey)) {
obj[mappingKey] = {type: 'text'};
}
}
}
module.exports = {createIndex};
module.exports = {createIndex, createLogstashIndex};

0 comments on commit 8f3a197

Please sign in to comment.