diff --git a/src/run_metrics_on_file.ts b/src/run_metrics_on_file.ts index cfbca47..9d336e1 100644 --- a/src/run_metrics_on_file.ts +++ b/src/run_metrics_on_file.ts @@ -40,8 +40,8 @@ export async function main(path: string) { // Send the task to the queue server client.write(JSON.stringify({ results: fileResults, analysisKey })); - console.log('results sent'); + client.end(); } /** @@ -89,5 +89,5 @@ async function gameIterator(path: string) { // for use with zst_decompresser.js if (require.main === module) { - main(process.argv[2]).then((results) => {}); + main(process.argv[2]); } diff --git a/src/zst_decompressor.ts b/src/zst_decompressor.ts index d3847e0..1b5117a 100644 --- a/src/zst_decompressor.ts +++ b/src/zst_decompressor.ts @@ -175,8 +175,6 @@ const decompressAndAnalyze = async (file, start = 0) => { let fileLength = 0; let batch_files_total_decompressed_size = 0; - let analysisPromises = []; - let filesBeingAnalyzed = new Set(); result.on('error', (err) => { return reject(err); @@ -228,7 +226,6 @@ const decompressAndAnalyze = async (file, start = 0) => { // Stop decompression if the size of the combined decompressed files exceeds the decompressed total combined files size limit if (batch_files_total_decompressed_size >= decompressedSizeLimit) { console.log(`Decompression limit met. Ending decompression...`); - console.log(`Temp files being analyzed: ${filesBeingAnalyzed}`); result.removeAllListeners('data'); result.removeAllListeners('error'); result.end(); @@ -237,29 +234,39 @@ const decompressAndAnalyze = async (file, start = 0) => { }); result.on('end', async () => { + // create chunks of size 10 from filesProduced + const filesProducedArray = Array.from(filesProduced); + const fileGroups: string[][] = []; + for (let i = 0; i < filesProducedArray.length; i += 10) { + fileGroups.push(filesProducedArray.slice(i, i + 10)); + } + console.log(`file groups: ${fileGroups.length}`); + // When all data is decompressed, run the analysis on the produced files concurrently - for (const file of Array.from(filesProduced).slice(0, 10)) { + for (const group of fileGroups) { + console.log( + `Running analysis on ${group.length} files... in group` + ); // TODO: This is debug code, I believe // TODO: this won't work out of the box for a large number of files as there is no max concurrency. But the sample only produces 4 decompressed files // I'm slicing to test this with a smaller number of files - analysisPromises.push(runAnalysis(file)); - filesBeingAnalyzed.add(newFilePath); - } - - // When all analyses are done, delete the files from the set - Promise.allSettled(analysisPromises) - .then(() => { + const analysisPromises = group.map((file) => runAnalysis(file)); + // When all analyses are done, delete the files from the set + try { + console.log('Awaiting all analyses to complete...'); + await Promise.all(analysisPromises); console.log('All analyses completed'); - for (const file of filesBeingAnalyzed) { + for (const file of group) { if (fs.existsSync(file)) { fs.unlinkSync(file); console.log(`File ${file} has been deleted.`); } } - filesBeingAnalyzed.clear(); - }) - .catch(console.error); + } catch (error) { + console.error(`Error running analysis: ${error.message}`); + } + } resolve(); });