From bb995a378944923d748d12947dc7419920deb57b Mon Sep 17 00:00:00 2001 From: Naresh Singh Date: Tue, 27 Oct 2020 23:57:30 +0530 Subject: [PATCH] RA-26080: remove dyanamically added property --- newReindexAuditLogs.js | 7 +- newReindexRideOrderData.js | 130 +++++++++++++++++++++++++++++++++++++ newUtils.js | 40 +++++++----- 3 files changed, 158 insertions(+), 19 deletions(-) create mode 100644 newReindexRideOrderData.js diff --git a/newReindexAuditLogs.js b/newReindexAuditLogs.js index aa92be7..c13e16c 100644 --- a/newReindexAuditLogs.js +++ b/newReindexAuditLogs.js @@ -3,7 +3,7 @@ const elasticClient = require('./elastic-config'); const redisClient = require('./redis'); const utils = require('./newUtils'); -const MAX_DOC_COUNT = 50; +const MAX_DOC_COUNT = 20; async function callBulkAPI(elindex) { @@ -13,6 +13,8 @@ async function callBulkAPI(elindex) { filterPath:'items.index._id,errors' }); if(bulkResponse.errors) { + console.error('ERROR RESPONSE_____________________________'); + console.error(JSON.stringify(bulkResponse)); throw new Error(JSON.stringify(bulkResponse)); } return bulkResponse; @@ -35,9 +37,8 @@ async function bulkIndexAuditLogs(rideId, auditLogs, rideRange) { try { await callBulkAPI(elindex); } catch(err) { - console.error(err) elasticClient.index({ - index: 'elastic_cleanup_errors', + index: 'elastic_cleanup_audit_log_errors', type: 'doc', body: { ride_id: rideId, diff --git a/newReindexRideOrderData.js b/newReindexRideOrderData.js new file mode 100644 index 0000000..05b1a3d --- /dev/null +++ b/newReindexRideOrderData.js @@ -0,0 +1,130 @@ + +const elasticClient = require('./elastic-config'); +const redisClient = require('./redis'); +const utils = require('./newUtils'); + +const MAX_DOC_COUNT = 20; + +async function callBulkAPI(elindex) { + const bulkResponse = await elasticClient.bulk({ + body: elindex, + timeout: '5m', + filterPath:'items.index._id,errors' + }); + if(bulkResponse.errors) { + throw new Error(JSON.stringify(bulkResponse)) + } + return bulkResponse; +} + +async function bulkIndexLogs(logs, rideRange) { + const elindex = []; + let rideId; + logs.forEach((value, index) => { + const { _id: id, _source: source} = value; + if(index === logs.length - 1) { + rideId = id; + } + elindex.push({ + index: { + _index: rideRange.index, + _type: 'doc', + _id: value._id, + }, + }); + elindex.push({...source, index_type: 'rideorder'}); + }); + try { + await callBulkAPI(elindex); + } catch(err) { + console.error(err) + elasticClient.index({ + index: 'elastic_cleanup_rideorder_errors', + type: 'doc', + body: { + ride_id: rideId, + body: err + }, + }); + } + console.error('rideId', rideRange.startRideId); + redisClient.set(`elasticCleanUp:rideorder:${rideRange.index}:rideId`, rideRange.startRideId); +} + +async function reindexRidesData(rideRange) { + const rideDataQuery = { + index: 'rides', + size: MAX_DOC_COUNT, + scroll: '1m', + body: { + "query": { + "bool": { + "must": [ + { + "match": { + "_type": "rideorder" + } + }, + { + "range": { + "external_order_id": { + "gte": rideRange.startRideId, + "lte": rideRange.endRideId + } + } + } + ] + } + }, + "sort": [ + { + "external_order_id": { + "order": "asc" + } + } + ] + }, + }; + let { hits: { total: count, hits: logs }, _scroll_id: scrollId } = await elasticClient.search(rideDataQuery); + if(logs && logs.length) { + await bulkIndexLogs(logs, rideRange); + } + while(scrollId && count > MAX_DOC_COUNT) { + const { hits: { hits: logs }, _scroll_id: newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId}); + scrollId = newScrollId; + if (logs && logs.length) { + await bulkIndexLogs(logs, rideRange); + } else { + break; + } + } +} + +async function reindexJob() { + try { + const [ startRideId, endRideId ] = process.argv.slice(2); + const rangeIndex = 'rides_modified'; + let lastProcessedRideId = await redisClient.get(`elasticCleanUp:rideorder:${rangeIndex}:rideId`); + const rideRangeEvent = { + index: rangeIndex, + startRideId: parseInt(startRideId, 10), + endRideId: parseInt(endRideId, 10) + }; + if(parseInt(lastProcessedRideId, 10)) { + rideRangeEvent.startRideId = parseInt(lastProcessedRideId, 10) + 1; + } + const isIndexExists = await elasticClient.indices.exists({index: rangeIndex}) + if(!isIndexExists) { + await utils.createIndex(rangeIndex, 'rides'); + } + if(rideRangeEvent.startRideId < parseInt(rideRangeEvent.endRideId, 10)) { + await reindexRidesData(rideRangeEvent); + } + } catch(err) { + console.error(err); + } + + console.log('::__reindexJob finished__::') +} + +reindexJob(); \ No newline at end of file diff --git a/newUtils.js b/newUtils.js index c80d878..5d0edd8 100644 --- a/newUtils.js +++ b/newUtils.js @@ -72,7 +72,9 @@ const createIndex = async function createIndex(indexName, fromIndex) { } }); - removeProperty(properties); + filterObject(properties, 'legToFlatFeeMapping') + filterObject(properties, 'changed_legs') + filterObject(properties, 'fare_breakdown') await elasticClient.indices.putMapping({ index: indexName, type: 'doc', body: { properties, dynamic: false }, @@ -127,7 +129,9 @@ const createLogstashIndex = async function createLogstashIndex(indexName, fromIn } }); - removeProperty(properties); + + filterObject(properties, 'fare_breakdown') + await elasticClient.indices.putMapping({ index: indexName, type: 'doc', body: { properties, dynamic: false }, }); @@ -158,22 +162,26 @@ function filterObject(obj, key) { } -function removeProperty(obj){ - if(obj?.meta?.data?.vendor_lists?.latest_flat_rate?.legToFlatFeeMapping){ - delete obj.meta.data.vendor_lists.latest_flat_rate.legToFlatFeeMapping - } +// function removeProperty(obj){ +// console.log('Ima here') - if(obj?.meta?.data?.orderUpdateDiff?.changed_legs){ - delete obj.meta.data.orderUpdateDiff.changed_legs - } +// if(obj?.meta?.properties?.data?.properties?.vendor_lists?.properties?.latest_flat_rate?.properties?.legToFlatFeeMapping){ +// delete obj.meta.properties.data.properties.vendor_lists.properties.latest_flat_rate.properties.legToFlatFeeMapping +// } - if(obj?.meta?.billingWriteBack?.legs?.fare_breakdown){ - delete obj.meta.billingWriteBack.legs.fare_breakdown - } - if(obj?.receipt?.fare_breakdown){ - delete obj.receipt.fare_breakdown - } -} +// if(obj?.meta?.properties?.data?.properties?.orderUpdateDiff?.properties?.changed_legs){ +// delete obj.meta.properties.data.properties.orderUpdateDiff.properties.changed_legs +// } + +// if(obj?.meta?.properties?.billingWriteBack?.properties?.legs?.properties?.fare_breakdown){ +// delete obj.meta.properties.billingWriteBack.properties.legs.properties.fare_breakdown +// } + +// if(obj?.properties?.receipt?.properties?.fare_breakdown){ +// console.log('hello') +// delete obj.properties.receipt.properties.fare_breakdown +// } +// } module.exports = {createIndex, createLogstashIndex, filterObject}; \ No newline at end of file