Skip to content

Commit

Permalink
RA-26080: remove dyanamically added property
Browse files Browse the repository at this point in the history
  • Loading branch information
nareshkharola123 committed Oct 30, 2020
1 parent 64bc0c3 commit cb775b1
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 19 deletions.
7 changes: 4 additions & 3 deletions newReindexAuditLogs.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const elasticClient = require('./elastic-config');
const redisClient = require('./redis');
const utils = require('./newUtils');

const MAX_DOC_COUNT = 50;
const MAX_DOC_COUNT = 20;


async function callBulkAPI(elindex) {
Expand All @@ -13,6 +13,8 @@ async function callBulkAPI(elindex) {
filterPath:'items.index._id,errors'
});
if(bulkResponse.errors) {
console.error('ERROR RESPONSE_____________________________');
console.error(JSON.stringify(bulkResponse));
throw new Error(JSON.stringify(bulkResponse));
}
return bulkResponse;
Expand All @@ -35,9 +37,8 @@ async function bulkIndexAuditLogs(rideId, auditLogs, rideRange) {
try {
await callBulkAPI(elindex);
} catch(err) {
console.error(err)
elasticClient.index({
index: 'elastic_cleanup_errors',
index: 'elastic_cleanup_audit_log_errors',
type: 'doc',
body: {
ride_id: rideId,
Expand Down
130 changes: 130 additions & 0 deletions newReindexRideOrderData.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@

const elasticClient = require('./elastic-config');
const redisClient = require('./redis');
const utils = require('./newUtils');

const MAX_DOC_COUNT = 20;

async function callBulkAPI(elindex) {
const bulkResponse = await elasticClient.bulk({
body: elindex,
timeout: '5m',
filterPath:'items.index._id,errors'
});
if(bulkResponse.errors) {
throw new Error(JSON.stringify(bulkResponse))
}
return bulkResponse;
}

async function bulkIndexLogs(logs, rideRange) {
const elindex = [];
let rideId;
logs.forEach((value, index) => {
const { _id: id, _source: source} = value;
if(index === logs.length - 1) {
rideId = id;
}
elindex.push({
index: {
_index: rideRange.index,
_type: 'doc',
_id: value._id,
},
});
elindex.push({...source, index_type: 'rideOrder'});
});
try {
await callBulkAPI(elindex);
} catch(err) {
console.error(err)
elasticClient.index({
index: 'elastic_cleanup_rideorder_errors',
type: 'doc',
body: {
ride_id: rideId,
body: err
},
});
}
console.error('rideId', rideId);
redisClient.set(`elasticCleanUp:rideorder:${rideRange.index}:rideId`, rideId);
}

async function reindexRidesData(rideRange) {
const rideDataQuery = {
index: 'rides',
size: MAX_DOC_COUNT,
scroll: '1m',
body: {
"query": {
"bool": {
"must": [
{
"match": {
"_type": "rideorder"
}
},
{
"range": {
"external_order_id": {
"gte": rideRange.startRideId,
"lte": rideRange.endRideId
}
}
}
]
}
},
"sort": [
{
"id": {
"order": "asc"
}
}
]
},
};
let { hits: { total: count, hits: logs }, _scroll_id: scrollId } = await elasticClient.search(rideDataQuery);
if(logs && logs.length) {
await bulkIndexLogs(logs, rideRange);
}
while(scrollId && count > MAX_DOC_COUNT) {
const { hits: { hits: logs }, _scroll_id: newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId});
scrollId = newScrollId;
if (logs && logs.length) {
await bulkIndexLogs(logs, rideRange);
} else {
break;
}
}
}

async function reindexJob() {
try {
const [ startRideId, endRideId ] = process.argv.slice(2);
const rangeIndex = 'rides_modified';
let lastProcessedRideId = await redisClient.get(`elasticCleanUp:rideorder:${rangeIndex}:rideId`);
const rideRangeEvent = {
index: rangeIndex,
startRideId: parseInt(startRideId, 10),
endRideId: parseInt(endRideId, 10)
};
if(parseInt(lastProcessedRideId, 10)) {
rideRangeEvent.startRideId = parseInt(lastProcessedRideId, 10) + 1;
}
const isIndexExists = await elasticClient.indices.exists({index: rangeIndex})
if(!isIndexExists) {
await utils.createIndex(rangeIndex, 'rides');
}
if(rideRangeEvent.startRideId < parseInt(rideRangeEvent.endRideId, 10)) {
await reindexRidesData(rideRangeEvent);
}
} catch(err) {
console.error(err);
}

console.log('::__reindexJob finished__::')
}

reindexJob();
40 changes: 24 additions & 16 deletions newUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ const createIndex = async function createIndex(indexName, fromIndex) {
}
});

removeProperty(properties);
filterObject(properties, 'legToFlatFeeMapping')
filterObject(properties, 'changed_legs')
filterObject(properties, 'fare_breakdown')

await elasticClient.indices.putMapping({
index: indexName, type: 'doc', body: { properties, dynamic: false },
Expand Down Expand Up @@ -127,7 +129,9 @@ const createLogstashIndex = async function createLogstashIndex(indexName, fromIn
}
});

removeProperty(properties);

filterObject(properties, 'fare_breakdown')

await elasticClient.indices.putMapping({
index: indexName, type: 'doc', body: { properties, dynamic: false },
});
Expand Down Expand Up @@ -158,22 +162,26 @@ function filterObject(obj, key) {
}


function removeProperty(obj){
if(obj?.meta?.data?.vendor_lists?.latest_flat_rate?.legToFlatFeeMapping){
delete obj.meta.data.vendor_lists.latest_flat_rate.legToFlatFeeMapping
}
// function removeProperty(obj){
// console.log('Ima here')

if(obj?.meta?.data?.orderUpdateDiff?.changed_legs){
delete obj.meta.data.orderUpdateDiff.changed_legs
}
// if(obj?.meta?.properties?.data?.properties?.vendor_lists?.properties?.latest_flat_rate?.properties?.legToFlatFeeMapping){
// delete obj.meta.properties.data.properties.vendor_lists.properties.latest_flat_rate.properties.legToFlatFeeMapping
// }

if(obj?.meta?.billingWriteBack?.legs?.fare_breakdown){
delete obj.meta.billingWriteBack.legs.fare_breakdown
}

if(obj?.receipt?.fare_breakdown){
delete obj.receipt.fare_breakdown
}
}
// if(obj?.meta?.properties?.data?.properties?.orderUpdateDiff?.properties?.changed_legs){
// delete obj.meta.properties.data.properties.orderUpdateDiff.properties.changed_legs
// }

// if(obj?.meta?.properties?.billingWriteBack?.properties?.legs?.properties?.fare_breakdown){
// delete obj.meta.properties.billingWriteBack.properties.legs.properties.fare_breakdown
// }

// if(obj?.properties?.receipt?.properties?.fare_breakdown){
// console.log('hello')
// delete obj.properties.receipt.properties.fare_breakdown
// }
// }

module.exports = {createIndex, createLogstashIndex, filterObject};

0 comments on commit cb775b1

Please sign in to comment.