From 34b13e49851d4a90c74e875c174b1254e18d2b2b Mon Sep 17 00:00:00 2001 From: Masaharu TASHIRO Date: Tue, 14 Feb 2023 19:56:21 +0900 Subject: [PATCH] propagate error between steams --- .../repositories/parsers/parseCsv/index.ts | 11 ++++++---- src/record/import/utils/file.ts | 20 ++++++++++++++----- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/src/record/import/repositories/parsers/parseCsv/index.ts b/src/record/import/repositories/parsers/parseCsv/index.ts index b45bf69bdb..980c2b7fac 100644 --- a/src/record/import/repositories/parsers/parseCsv/index.ts +++ b/src/record/import/repositories/parsers/parseCsv/index.ts @@ -1,8 +1,5 @@ -import type { CsvRow } from "../../../../../kintone/types"; -import type { LocalRecord } from "../../../types/record"; import type { RecordSchema } from "../../../types/schema"; -// import csvParse from "csv-parse/lib/sync"; import csvParse from "csv-parse"; import { convertRecord, recordReader } from "./record"; @@ -17,6 +14,7 @@ export async function* csvReader( schema: RecordSchema ): ReturnType { try { + const sourceStream = source(); const csvStream = source().pipe( csvParse({ columns: true, @@ -24,6 +22,9 @@ export async function* csvReader( delimiter: SEPARATOR, }) ); + sourceStream.on("error", (e) => { + csvStream.destroy(e); + }); for await (const recordRows of recordReader(csvStream)) { yield convertRecord(recordRows, schema); @@ -45,6 +46,9 @@ export const countRecordsFromCsv = async ( delimiter: SEPARATOR, }) ); + source.on("error", (e) => { + csvStream.destroy(e); + }); let count = 0; for await (const recordRows of recordReader(csvStream)) { @@ -52,7 +56,6 @@ export const countRecordsFromCsv = async ( } return count; } catch (e) { - console.error(e); throw new ParserError(e); } }; diff --git a/src/record/import/utils/file.ts b/src/record/import/utils/file.ts index 9833f9424b..890569062d 100644 --- a/src/record/import/utils/file.ts +++ b/src/record/import/utils/file.ts @@ -1,13 +1,14 @@ import fs from "fs"; import path from "path"; import iconv from "iconv-lite"; +import { Transform } from "stream"; export type SupportedImportEncoding = "utf8" | "sjis"; export const openFsStreamWithEncode: ( filePath: string, encoding?: SupportedImportEncoding -) => { stream: NodeJS.ReadWriteStream; format: string } = ( +) => { stream: NodeJS.ReadableStream; format: string } = ( filePath, encoding = "utf8" ) => { @@ -15,10 +16,19 @@ export const openFsStreamWithEncode: ( if (format === "json" && encoding !== "utf8") { throw new Error("source file is JSON and JSON MUST be encoded with UTF-8"); } - const stream = fs - .createReadStream(filePath) - .pipe(iconv.decodeStream(encoding)); - return { stream, format }; + const stream = fs.createReadStream(filePath); + + const decodedStream = stream.pipe( + Transform.from(iconv.decodeStream(encoding)) + ); + stream.on("error", (e) => { + decodedStream.destroy(e); + }); + + decodedStream.on("error", () => { + /* noop. but this is necessary :) */ + }); + return { stream: decodedStream, format }; }; export const readFile: (