Skip to content

Commit

Permalink
call client.end
Browse files Browse the repository at this point in the history
  • Loading branch information
EllAchE committed Apr 1, 2024
1 parent 01a7635 commit a08b963
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
4 changes: 2 additions & 2 deletions src/run_metrics_on_file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ export async function main(path: string) {

// Send the task to the queue server
client.write(JSON.stringify({ results: fileResults, analysisKey }));

console.log('results sent');
client.end();
}

/**
Expand Down Expand Up @@ -89,5 +89,5 @@ async function gameIterator(path: string) {

// for use with zst_decompresser.js
if (require.main === module) {
main(process.argv[2]).then((results) => {});
main(process.argv[2]);
}
37 changes: 22 additions & 15 deletions src/zst_decompressor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ const decompressAndAnalyze = async (file, start = 0) => {

let fileLength = 0;
let batch_files_total_decompressed_size = 0;
let analysisPromises = [];
let filesBeingAnalyzed = new Set<fs.PathLike>();

result.on('error', (err) => {
return reject(err);
Expand Down Expand Up @@ -228,7 +226,6 @@ const decompressAndAnalyze = async (file, start = 0) => {
// Stop decompression if the size of the combined decompressed files exceeds the decompressed total combined files size limit
if (batch_files_total_decompressed_size >= decompressedSizeLimit) {
console.log(`Decompression limit met. Ending decompression...`);
console.log(`Temp files being analyzed: ${filesBeingAnalyzed}`);
result.removeAllListeners('data');
result.removeAllListeners('error');
result.end();
Expand All @@ -237,29 +234,39 @@ const decompressAndAnalyze = async (file, start = 0) => {
});

result.on('end', async () => {
// create chunks of size 10 from filesProduced
const filesProducedArray = Array.from(filesProduced);
const fileGroups: string[][] = [];
for (let i = 0; i < filesProducedArray.length; i += 10) {
fileGroups.push(filesProducedArray.slice(i, i + 10));
}
console.log(`file groups: ${fileGroups.length}`);

// When all data is decompressed, run the analysis on the produced files concurrently
for (const file of Array.from(filesProduced).slice(0, 10)) {
for (const group of fileGroups) {
console.log(
`Running analysis on ${group.length} files... in group`
);
// TODO: This is debug code, I believe
// TODO: this won't work out of the box for a large number of files as there is no max concurrency. But the sample only produces 4 decompressed files
// I'm slicing to test this with a smaller number of files

analysisPromises.push(runAnalysis(file));
filesBeingAnalyzed.add(newFilePath);
}

// When all analyses are done, delete the files from the set
Promise.allSettled(analysisPromises)
.then(() => {
const analysisPromises = group.map((file) => runAnalysis(file));
// When all analyses are done, delete the files from the set
try {
console.log('Awaiting all analyses to complete...');
await Promise.all(analysisPromises);
console.log('All analyses completed');
for (const file of filesBeingAnalyzed) {
for (const file of group) {
if (fs.existsSync(file)) {
fs.unlinkSync(file);
console.log(`File ${file} has been deleted.`);
}
}
filesBeingAnalyzed.clear();
})
.catch(console.error);
} catch (error) {
console.error(`Error running analysis: ${error.message}`);
}
}

resolve();
});
Expand Down

0 comments on commit a08b963

Please sign in to comment.