diff --git a/newReindexAuditLogs.js b/newReindexAuditLogs.js new file mode 100644 index 0000000..c1fb5a2 --- /dev/null +++ b/newReindexAuditLogs.js @@ -0,0 +1,111 @@ + +const elasticClient = require('./elastic-config'); +const redisClient = require('./redis'); +const utils = require('./newUtils'); + +const MAX_DOC_COUNT = 50; + + +async function callBulkAPI(elindex) { + const bulkResponse = await elasticClient.bulk({ + body: elindex, + timeout: '5m', + filterPath:'items.index._id,errors' + }); + if(bulkResponse.errors) { + throw new Error(JSON.stringify(bulkResponse)); + } + return bulkResponse; +} + +async function bulkIndexAuditLogs(rideId, auditLogs, rideRange) { + const elindex = []; + auditLogs.forEach((value) => { + utils.filterObject(value, 'legToFlatFeeMapping '); + utils.filterObject(value, 'changed_legs'); + + elindex.push({ + index: { + _index: rideRange.index, + _type: 'doc', + _id: value._id, + }, + }); + const source = {...value._source, index_type: 'audit_log' } + elindex.push(source); + }); + try { + await callBulkAPI(elindex); + } catch(err) { + console.error(err) + elasticClient.index({ + index: 'elastic_cleanup_errors', + type: 'doc', + body: { + ride_id: rideId, + err: err + }, + }); + } +} + +async function reindexAuditLogs(rideRange) { + for (let rideIndex = rideRange.startRideId; rideIndex <= rideRange.endRideId; rideIndex++) { + const auditLogQuery = { + index: 'rides', + size: MAX_DOC_COUNT, + scroll: '1m', + body: { + query: { + match: { + 'ride_details_id': rideIndex, + }, + }, + }, + }; + let { hits: { total: count, hits: auditLogs }, _scroll_id: scrollId } = await elasticClient.search(auditLogQuery); + if(auditLogs && auditLogs.length) { + await bulkIndexAuditLogs(rideIndex, auditLogs, rideRange); + } + while(scrollId && count > MAX_DOC_COUNT) { + const { hits: { hits: auditLogs }, _scroll_id: newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId}); + scrollId = newScrollId; + if (auditLogs && auditLogs.length) { + await bulkIndexAuditLogs(rideIndex, auditLogs, rideRange); + } else { + break; + } + } + redisClient.set(`elasticCleanUp:${rideRange.index}:rideId`, rideIndex); + } +} + +async function reindexJob() { + try { + const [ startRideId, endRideId ] = process.argv.slice(2); + const rangeIndex = 'rides_modified'; + let lastProcessedRideId = await redisClient.get(`elasticCleanUp:${rangeIndex}:rideId`); + const rideRangeEvent = { + index: rangeIndex, + startRideId: parseInt(startRideId, 10), + endRideId: parseInt(endRideId, 10) + }; + const isIndexExists = await elasticClient.indices.exists({index: rangeIndex}) + if(!isIndexExists) { + console.error('Creating Index'); + await utils.createIndex(rangeIndex, 'rides'); + console.error('Index Created'); + } + if(parseInt(lastProcessedRideId, 10)) { + rideRangeEvent.startRideId = parseInt(lastProcessedRideId, 10) + 1; + } + if(rideRangeEvent.startRideId <= parseInt(rideRangeEvent.endRideId, 10)) { + await reindexAuditLogs(rideRangeEvent); + } + } catch(err) { + console.error(err); + } + + console.log('::__reIndexJob finished__::') +} +reindexJob(); \ No newline at end of file diff --git a/newReindexLogstashLogs.js b/newReindexLogstashLogs.js new file mode 100644 index 0000000..a091e0d --- /dev/null +++ b/newReindexLogstashLogs.js @@ -0,0 +1,159 @@ +const moment = require('moment'); +const elasticClient = require('./elastic-config'); +const redisClient = require('./redis'); +const utils = require('./newUtils'); + +const INDEX_NAME = 'logstash'; +const MAX_DOC_COUNT = 50; +async function callBulkAPI(elindex) { + if (elindex.length) { + const bulkResponse = await elasticClient.bulk({ + body: elindex, + timeout: '5m', + filterPath:'items.index._id,errors' + }); + if(bulkResponse.errors) { + console.error('ERROR RESPONSE_____________________________'); + console.error(JSON.stringify(bulkResponse)); + throw new Error(JSON.stringify(bulkResponse)); + } + return bulkResponse; + } + return null; +} + +async function bulkIndexLogstashLogs(logstashLogs, indexName, startMoment) { + const startDate = startMoment.format('YYYY-MM-DD HH:mm:ss.SSSZ'); + const { _source: { createdAt : latestLogCreatedAt }} = logstashLogs.reduce((prevVal, currentVal) => { + const { _source: { createdAt: prevCreatedAt }} = prevVal; + const { _source: { createdAt: currentCreatedAt }} = currentVal + const diffValue = moment(prevCreatedAt).diff(moment(currentCreatedAt)); + if(diffValue < 0) { + return currentVal; + } + return prevVal; + }, { _source: { createdAt: startDate }}); + try { + const elindex = []; + logstashLogs.forEach((value) => { + const {_source: source, _type: type, _id: id} = value; + elindex.push({ + index: { + _index: indexName, + _type: 'doc', + _id: id + }, + }); + const logValue = JSON.parse(JSON.stringify(source)); + logValue.index_type = type; + elindex.push(logValue); + }); + const retValue = await callBulkAPI(elindex); + if(retValue) { + for(let bulkResIndex = 0; bulkResIndex < retValue.items.length; bulkResIndex+=1) { + if (retValue.items[bulkResIndex].errors) { + const { index: {_id: logstashId } } = retValue.items[bulkResIndex] + elasticClient.index({ + index: 'elastic_cleanup_logstash_errors', + type: 'doc', + body: { + logstash_id: logstashId, + meta: JSON.stringify(retValue.items[bulkResIndex]), + }, + }); + } + } + } + return latestLogCreatedAt; + } catch (err) { + if(logstashLogs.length) { + logstashLogs.forEach((value, key) => { + elasticClient.index({ + index: 'elastic_cleanup_logstash_errors', + type: 'doc', + body: { + logstash_id: value._id, + meta: JSON.stringify(err), + }, + }); + }); + } else { + elasticClient.index({ + index: 'elastic_cleanup_logstash_errors', + type: 'doc', + body: { + meta: JSON.stringify(error), + }, + }); + } + } +} + +const reindexLogstashLogs = async function reindexLogstash(year, month) { + const startDateInRedis = await redisClient.get(`elasticCleanUp:logstashLastProcessedDate${month}${year}`); + let startMoment = moment(`${year}-${month}`,'YYYY-MM').startOf('month'); + const endMoment = moment(`${year}-${month}`,'YYYY-MM').endOf('month'); + const monthName = moment().month(month - 1).format('MMMM').toLowerCase(); + const indexName = 'logstash_modified'; + // Check for index existence otherwise create one. + try { + const indexExists = await elasticClient.indices.exists({ index: indexName }); + if(!indexExists) { + await utils.createLogstashIndex(indexName, 'logstash'); + } + console.error(startDateInRedis); + if(startDateInRedis) { + startMoment = moment(startDateInRedis); + } + const logstashQuery = { + index: INDEX_NAME, + scroll:'1m', + body: { + size: MAX_DOC_COUNT, + query: { + range: { + createdAt: { + gte: startMoment.format('YYYY-MM-DDTHH:mm:ss.SSSZ'), + lte: endMoment.format('YYYY-MM-DDTHH:mm:ss.SSSZ'), + format: `yyyy-MM-dd'T'HH:mm:ss.SSSZ`, + }, + }, + }, + sort: [ + { + createdAt: 'asc', + }, + ], + }, + }; + const res = await elasticClient.search(logstashQuery); + let { hits: { hits: logstashLogs, total: count }, _scroll_id : scrollId } = res; + if (logstashLogs && logstashLogs.length) { + let lastProcessedDate = await bulkIndexLogstashLogs(logstashLogs, indexName, startMoment); + await redisClient.set(`elasticCleanUp:logstashLastProcessedDate${month}${year}`, lastProcessedDate); + if(count > MAX_DOC_COUNT) { + while(scrollId && logstashLogs && logstashLogs.length) { + const { hits: { hits: logs }, _scroll_id : newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId }); + scrollId = newScrollId; + if (logs && logs.length) { + lastProcessedDate = await bulkIndexLogstashLogs(logs, indexName, startMoment); + } else { + break; + } + await redisClient.set(`elasticCleanUp:logstashLastProcessedDate${month}${year}`, lastProcessedDate); + } + } + } + } catch(error) { + console.error(error); + throw error; + } + + console.log('::__reindexLogstashLogs finished__::') +} + +async function reindexJob() { + const [year, month ] = process.argv.slice(2); + await reindexLogstashLogs(year, month); +} +reindexJob(); diff --git a/newReindexRidesData.js b/newReindexRidesData.js new file mode 100644 index 0000000..1839252 --- /dev/null +++ b/newReindexRidesData.js @@ -0,0 +1,130 @@ + +const elasticClient = require('./elastic-config'); +const redisClient = require('./redis'); +const utils = require('./newUtils'); + +const MAX_DOC_COUNT = 20; + +async function callBulkAPI(elindex) { + const bulkResponse = await elasticClient.bulk({ + body: elindex, + timeout: '5m', + filterPath:'items.index._id,errors' + }); + if(bulkResponse.errors) { + throw new Error(JSON.stringify(bulkResponse)) + } + return bulkResponse; +} + +async function bulkIndexLogs(logs, rideRange) { + const elindex = []; + let rideId; + logs.forEach((value, index) => { + const { _id: id, _source: source} = value; + if(index === logs.length - 1) { + rideId = id; + } + elindex.push({ + index: { + _index: rideRange.index, + _type: 'doc', + _id: value._id, + }, + }); + elindex.push({...source, index_type: 'ride_details'}); + }); + try { + await callBulkAPI(elindex); + } catch(err) { + console.error(err) + elasticClient.index({ + index: 'elastic_cleanup_errors', + type: 'doc', + body: { + ride_id: rideId, + body: err + }, + }); + } + console.error('rideId', rideId); + redisClient.set(`elasticCleanUp:ridedetails:${rideRange.index}:rideId`, rideId); +} + +async function reindexRidesData(rideRange) { + const rideDataQuery = { + index: 'rides', + size: MAX_DOC_COUNT, + scroll: '1m', + body: { + "query": { + "bool": { + "must": [ + { + "match": { + "_type": "ride_details" + } + }, + { + "range": { + "id": { + "gte": rideRange.startRideId, + "lte": rideRange.endRideId + } + } + } + ] + } + }, + "sort": [ + { + "id": { + "order": "asc" + } + } + ] + }, + }; + let { hits: { total: count, hits: logs }, _scroll_id: scrollId } = await elasticClient.search(rideDataQuery); + if(logs && logs.length) { + await bulkIndexLogs(logs, rideRange); + } + while(scrollId && count > MAX_DOC_COUNT) { + const { hits: { hits: logs }, _scroll_id: newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId}); + scrollId = newScrollId; + if (logs && logs.length) { + await bulkIndexLogs(logs, rideRange); + } else { + break; + } + } +} + +async function reindexJob() { + try { + const [ startRideId, endRideId ] = process.argv.slice(2); + const rangeIndex = 'rides_modified'; + let lastProcessedRideId = await redisClient.get(`elasticCleanUp:ridedetails:${rangeIndex}:rideId`); + const rideRangeEvent = { + index: rangeIndex, + startRideId: parseInt(startRideId, 10), + endRideId: parseInt(endRideId, 10) + }; + if(parseInt(lastProcessedRideId, 10)) { + rideRangeEvent.startRideId = parseInt(lastProcessedRideId, 10) + 1; + } + const isIndexExists = await elasticClient.indices.exists({index: rangeIndex}) + if(!isIndexExists) { + await utils.createIndex(rangeIndex, 'rides'); + } + if(rideRangeEvent.startRideId < parseInt(rideRangeEvent.endRideId, 10)) { + await reindexRidesData(rideRangeEvent); + } + } catch(err) { + console.error(err); + } + + console.log('::__reindexJob finished__::') +} + +reindexJob(); \ No newline at end of file diff --git a/newUtils.js b/newUtils.js new file mode 100644 index 0000000..46d1382 --- /dev/null +++ b/newUtils.js @@ -0,0 +1,158 @@ +const elasticClient = require('./elastic-config'); + + +const exceptionMappings = [ + 'book_now', + 'provider_request_id', + 'flag', + 'ignoreWarning', + 'override_cost_token', + 'ignoreAddressWarning', + 'previousMessageByPatient' +]; + +const logstashExceptionMappings = [ + 'updated_to', + 'book_now', + 'provider_request_id', + 'flag', + 'ignoreWarning', + 'override_cost_token', + 'ignoreAddressWarning', + 'statusCode', + 'Lyft fare' +]; + + +const createIndex = async function createIndex(indexName, fromIndex) { + await elasticClient.indices.create({ + index: indexName, + body: { + settings: { + index: { + 'mapping.total_fields.limit': 70000, + number_of_shards: 1, + number_of_replicas: 0, + refresh_interval: -1, + }, + }, + }, + }); + const result = await elasticClient.indices.getMapping({ index: fromIndex }); + const { mappings } = result[fromIndex]; + const properties = {}; + properties.index_type = { type: 'keyword' }; + Object.entries(mappings).forEach(([key, value]) => { + if (value.properties) { + Object.entries(value.properties).forEach(([propKey, propValue]) => { + if (properties[propKey]) { + const prevProps = properties[propKey]; + const newProps = { properties: {} }; + if (propValue && (!propValue.type || propValue.type === 'nested') && prevProps.properties && propValue.properties) { + updateMappingType(propValue.properties, exceptionMappings); + newProps.properties = { ...prevProps.properties, ...propValue.properties }; + properties[propKey] = newProps; + } else { + if(exceptionMappings.includes(propKey)) { + propValue.type = 'text'; + } + properties[propKey] = propValue; + } + } else { + if(exceptionMappings.includes(propKey)) { + propValue.type = 'text'; + } + if(!propValue.type || propValue.type === 'nested') { + updateMappingType(propValue.properties, exceptionMappings) + } + properties[propKey] = propValue; + } + + }); + } + }); + + filterObject(properties, 'legToFlatFeeMapping'); + filterObject(properties, 'changed_legs'); + + await elasticClient.indices.putMapping({ + index: indexName, type: 'doc', body: { properties, dynamic: false }, + }); +} + +const createLogstashIndex = async function createLogstashIndex(indexName, fromIndex) { + await elasticClient.indices.create({ + index: indexName, + body: { + settings: { + index: { + 'mapping.total_fields.limit': 70000, + number_of_shards: 1, + number_of_replicas: 0, + refresh_interval: '1s', + }, + }, + }, + }); + const result = await elasticClient.indices.getMapping({ index: fromIndex }); + const { mappings } = result[fromIndex]; + const properties = {}; + properties.index_type = { type: 'keyword' }; + Object.entries(mappings).forEach(([key, value]) => { + if (value.properties) { + Object.entries(value.properties).forEach(([propKey, propValue]) => { + if (properties[propKey]) { + const prevProps = properties[propKey]; + const newProps = { properties: {} }; + if (propValue && (!propValue.type || propValue.type === 'nested') && prevProps.properties && propValue.properties) { + updateMappingType(propValue.properties, logstashExceptionMappings); + newProps.properties = { ...prevProps.properties, ...propValue.properties }; + properties[propKey] = newProps; + } else { + if(logstashExceptionMappings.includes(propKey)) { + propValue.type = 'text'; + } + properties[propKey] = propValue; + } + } else { + if(logstashExceptionMappings.includes(propKey)) { + propValue.type = 'text'; + } + if(!propValue.type || propValue.type === 'nested') { + updateMappingType(propValue.properties, logstashExceptionMappings) + } + properties[propKey] = propValue; + } + + }); + } + }); + await elasticClient.indices.putMapping({ + index: indexName, type: 'doc', body: { properties, dynamic: false }, + }); +} + +function updateMappingType(obj, exceptionMappings) { + for (let mappingKey in obj) { + if (!obj.hasOwnProperty(mappingKey)) continue; + if (typeof obj[mappingKey] == 'object' && obj[mappingKey].properties) { + updateMappingType(obj[mappingKey].properties, exceptionMappings) + } + if (exceptionMappings.includes(mappingKey)) { + obj[mappingKey] = {type: 'text'}; + } + } +} + +function filterObject(obj, key) { + for (var i in obj) { + if (!obj.hasOwnProperty(i)) continue; + if (i == key) { + delete obj[key]; + } else if (typeof obj[i] == 'object') { + filterObject(obj[i], key); + } + } +} + +module.exports = {createIndex, createLogstashIndex, filterObject}; \ No newline at end of file