Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RA-25033:Added reindex script for rides index #1

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# package directories
node_modules
jspm_packages
.vscode
.env
event.json
17 changes: 17 additions & 0 deletions elastic-config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
var elastic = require("elasticsearch");
var Bluebird = require('bluebird');

const dotenv = require('dotenv');

dotenv.config();

// config @ https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html

var host = {
host: process.env.elasticHost,
defer: function () {
return Bluebird.defer();
}
};
var elasticClient = new elastic.Client(host);
module.exports = elasticClient;
174 changes: 174 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"name": "elastic-reindex-scripts",
"version": "1.0.0",
"description": "",
"main": "elastic-config.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"debug_old": "node ./reindexAuditLogs.js rides_6 500001 600000",
"debug": "node ./reindexAuditLogs.js rides_4 318000 400000",
"debug:ride": "node ./reindexRidesData.js rides_1 1 100000",
"debug:logstash": "node ./reindexLogstashLogs.js 2020 01"
},
"author": "",
"license": "ISC",
"dependencies": {
"bluebird": "^3.7.2",
"dotenv": "^8.2.0",
"elasticsearch": "^16.7.1",
"ioredis": "^4.17.3",
"moment": "^2.27.0"
}
}
54 changes: 54 additions & 0 deletions redis.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
const bluebird = require('bluebird');
const Redis = require('ioredis');

Redis.Promise = bluebird;

const dotenv = require('dotenv');

dotenv.config();

const redisObj = {
get: () => {
console.log('Creating new redis instance...');
console.log('Creating new redis instance...', process.env.redisHost);
let rclient;
const commonSettings = {
showFriendlyErrorStack: false,
port: 6379,
host: process.env.redisHost,
db: '0',
password: process.env.redisAuth,

reconnectOnError: (err) => {
const targetError = 'READONLY';
if (err.message.slice(0, targetError.length) === targetError) {
// Only reconnect when the error starts with "READONLY"
return true; // or `return 1;`
}
return false;
}
};
rclient = new Redis({ ...commonSettings });
rclient.instance_status = true;
rclient.on('error', (err) => {
const errorList = ['ECONNREFUSED', 'READONLY'];
if (new RegExp(errorList.join("|")).test(err.message)) {
rclient.instance_status = false;
console.log(err);
}
return err;
});
rclient.on('connect', () => {
rclient.instance_status = true;
});

rclient.checkStatus = () => rclient.instance_status;

console.log('created new instance')

return rclient;
},
};
var rclient = redisObj.get();

module.exports = rclient;
105 changes: 105 additions & 0 deletions reindexAuditLogs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@

const elasticClient = require('./elastic-config');
const redisClient = require('./redis');
const utils = require('./utils');

const MAX_DOC_COUNT = 50;


async function callBulkAPI(elindex) {
const bulkResponse = await elasticClient.bulk({
body: elindex,
timeout: '5m',
filterPath:'items.index._id,errors'
});
if(bulkResponse.errors) {
throw new Error(JSON.stringify(bulkResponse));
}
return bulkResponse;
}

async function bulkIndexAuditLogs(rideId, auditLogs, rideRange) {
const elindex = [];
auditLogs.forEach((value) => {
elindex.push({
index: {
_index: rideRange.index,
_type: 'doc',
_id: value._id,
},
});
const source = {...value._source, index_type: 'audit_log' }
elindex.push(source);
});
try {
await callBulkAPI(elindex);
} catch(err) {
console.error(err)
elasticClient.index({
index: 'elastic_cleanup_errors',
type: 'doc',
body: {
ride_id: rideId,
err: err
},
});
}
}

async function reindexAuditLogs(rideRange) {
for (let rideIndex = rideRange.startRideId; rideIndex <= rideRange.endRideId; rideIndex++) {
const auditLogQuery = {
index: 'rides',
size: MAX_DOC_COUNT,
scroll: '1m',
body: {
query: {
match: {
'ride_details_id': rideIndex,
},
},
},
};
let { hits: { total: count, hits: auditLogs }, _scroll_id: scrollId } = await elasticClient.search(auditLogQuery);
if(auditLogs && auditLogs.length) {
await bulkIndexAuditLogs(rideIndex, auditLogs, rideRange);
}
while(scrollId && count > MAX_DOC_COUNT) {
const { hits: { hits: auditLogs }, _scroll_id: newScrollId } = await elasticClient.scroll({scroll: '1m', scrollId});
scrollId = newScrollId;
if (auditLogs && auditLogs.length) {
await bulkIndexAuditLogs(rideIndex, auditLogs, rideRange);
} else {
break;
}
}
redisClient.set(`elasticCleanUp:${rideRange.index}:rideId`, rideIndex);
}
}

async function reindexJob() {
try {
const [ rangeIndex, startRideId, endRideId ] = process.argv.slice(2);
let lastProcessedRideId = await redisClient.get(`elasticCleanUp:${rangeIndex}:rideId`);
const rideRangeEvent = {
index: rangeIndex,
startRideId: parseInt(startRideId, 10),
endRideId: parseInt(endRideId, 10)
};
const isIndexExists = await elasticClient.indices.exists({index: rangeIndex})
if(!isIndexExists) {
console.error('Creating Index');
await utils.createIndex(rangeIndex, 'rides');
console.error('Index Created');
}
if(parseInt(lastProcessedRideId, 10)) {
rideRangeEvent.startRideId = parseInt(lastProcessedRideId, 10) + 1;
}
if(rideRangeEvent.startRideId <= parseInt(rideRangeEvent.endRideId, 10)) {
await reindexAuditLogs(rideRangeEvent);
}
} catch(err) {
console.error(err);
}
}
reindexJob();
Loading