From 8530ad6127115ef5fe6f3fe5e3906c84527b5926 Mon Sep 17 00:00:00 2001 From: Stanislas Polu Date: Wed, 3 Jan 2024 19:26:34 +0100 Subject: [PATCH] tar processing --- connectors/package-lock.json | 27 +++ connectors/package.json | 2 + connectors/src/admin/cli.ts | 52 +++++ .../src/connectors/github/lib/github_api.ts | 177 ++++++++++++++++-- 4 files changed, 245 insertions(+), 13 deletions(-) diff --git a/connectors/package-lock.json b/connectors/package-lock.json index 8ae8944c7df0..ef8a9ac6f482 100644 --- a/connectors/package-lock.json +++ b/connectors/package-lock.json @@ -22,6 +22,7 @@ "@types/minimist": "^1.2.2", "@types/uuid": "^9.0.2", "axios": "^1.5.1", + "blake3": "^2.1.7", "body-parser": "^1.20.2", "dd-trace": "^3.16.0", "eventsource-parser": "^1.0.0", @@ -57,6 +58,7 @@ "@types/fast-levenshtein": "^0.0.2", "@types/node": "^18.15.5", "@types/p-queue": "^3.2.1", + "@types/tar": "^6.1.10", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", "eslint": "^8.36.0", @@ -1841,6 +1843,25 @@ "@types/node": "*" } }, + "node_modules/@types/tar": { + "version": "6.1.10", + "resolved": "https://registry.npmjs.org/@types/tar/-/tar-6.1.10.tgz", + "integrity": "sha512-60ZO+W0tRKJ3ggdzJKp75xKVlNogKYMqGvr2bMH/+k3T0BagfYTnbmVDFMJB1BFttz6yRgP5MDGP27eh7brrqw==", + "dev": true, + "dependencies": { + "@types/node": "*", + "minipass": "^4.0.0" + } + }, + "node_modules/@types/tar/node_modules/minipass": { + "version": "4.2.8", + "resolved": "https://registry.npmjs.org/minipass/-/minipass-4.2.8.tgz", + "integrity": "sha512-fNzuVyifolSLFL4NzpF+wEF4qrgqaaKX0haXPQEdQ7NKAN+WecoKMHV09YcuL/DHxrUsYQOK3MiuDf7Ip2OXfQ==", + "dev": true, + "engines": { + "node": ">=8" + } + }, "node_modules/@types/unist": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/@types/unist/-/unist-3.0.2.tgz", @@ -2424,6 +2445,12 @@ "file-uri-to-path": "1.0.0" } }, + "node_modules/blake3": { + "version": "2.1.7", + "resolved": "https://registry.npmjs.org/blake3/-/blake3-2.1.7.tgz", + "integrity": "sha512-5d+TdKJvju96IyEaGJ0eO6CHbckWi+NBrCezGYM/WsnI3R03aLL2TWfsuZSh1rs0fTv/L3ps/r0vykjYurcIwA==", + "hasInstallScript": true + }, "node_modules/body-parser": { "version": "1.20.2", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-1.20.2.tgz", diff --git a/connectors/package.json b/connectors/package.json index c3155004f356..39d3b2629d41 100644 --- a/connectors/package.json +++ b/connectors/package.json @@ -28,6 +28,7 @@ "@types/uuid": "^9.0.2", "axios": "^1.5.1", "body-parser": "^1.20.2", + "blake3": "^2.1.7", "dd-trace": "^3.16.0", "eventsource-parser": "^1.0.0", "express": "^4.18.2", @@ -62,6 +63,7 @@ "@types/fast-levenshtein": "^0.0.2", "@types/node": "^18.15.5", "@types/p-queue": "^3.2.1", + "@types/tar": "^6.1.10", "@typescript-eslint/eslint-plugin": "^5.56.0", "@typescript-eslint/parser": "^5.56.0", "eslint": "^8.36.0", diff --git a/connectors/src/admin/cli.ts b/connectors/src/admin/cli.ts index 65b59f51d503..e9f33a5b9393 100644 --- a/connectors/src/admin/cli.ts +++ b/connectors/src/admin/cli.ts @@ -28,6 +28,10 @@ import { NotionDatabase, NotionPage } from "@connectors/lib/models/notion"; import { SlackConfiguration } from "@connectors/lib/models/slack"; import { nango_client } from "@connectors/lib/nango_client"; import { Result } from "@connectors/lib/result"; +import { + cleanUpProcessRepository, + processRepository, +} from "@connectors/connectors/github/lib/github_api"; const { NANGO_SLACK_CONNECTOR_ID } = process.env; @@ -99,6 +103,51 @@ const connectors = async (command: string, args: parseArgs.ParsedArgs) => { } }; +const github = async (command: string, args: parseArgs.ParsedArgs) => { + switch (command) { + case "test-repo": { + if (!args.wId) { + throw new Error("Missing --wId argument"); + } + if (!args.dataSourceName) { + throw new Error("Missing --dataSourceName argument"); + } + if (!args.owner) { + throw new Error("Missing --owner argument"); + } + if (!args.repo) { + throw new Error("Missing --repo argument"); + } + + const connector = await Connector.findOne({ + where: { + type: "github", + workspaceId: args.wId, + dataSourceName: args.dataSourceName, + }, + }); + + if (!connector) { + throw new Error( + `Could not find connector for workspace ${args.wId}, data source ${args.dataSourceName}` + ); + } + + const installationId = connector.connectionId; + const { tempDir, files } = await processRepository( + installationId, + args.owner, + args.repo, + "999" + ); + + console.log(files); + + await cleanUpProcessRepository(tempDir); + } + } +}; + const notion = async (command: string, args: parseArgs.ParsedArgs) => { switch (command) { case "restart-all": { @@ -568,6 +617,9 @@ const main = async () => { case "notion": await notion(command, argv); return; + case "github": + await github(command, argv); + return; case "google": await google(command, argv); return; diff --git a/connectors/src/connectors/github/lib/github_api.ts b/connectors/src/connectors/github/lib/github_api.ts index 1b01e19f2de4..569266f00af7 100644 --- a/connectors/src/connectors/github/lib/github_api.ts +++ b/connectors/src/connectors/github/lib/github_api.ts @@ -1,13 +1,15 @@ import { createAppAuth } from "@octokit/auth-app"; +import { hash as blake3 } from "blake3"; import { isLeft } from "fp-ts/lib/Either"; import { createWriteStream } from "fs"; -import { mkdtemp, readdir, rmdir } from "fs/promises"; +import { mkdtemp, readdir, rm } from "fs/promises"; import fs from "fs-extra"; import * as reporter from "io-ts-reporters"; import { Octokit } from "octokit"; import { tmpdir } from "os"; import { join, resolve } from "path"; import { pipeline } from "stream"; +import { Readable } from "stream"; import { extract } from "tar"; import { promisify } from "util"; @@ -534,20 +536,94 @@ async function getOctokit(installationId: string): Promise { }); } +// Repository processing + const asyncPipeline = promisify(pipeline); -export async function downloadRepository( +const EXTENSION_WHITELIST = [ + ".js", + ".ts", + ".tsx", + ".jsx", + ".rb", + ".py", + ".rs", + ".go", + ".swift", + ".css", + ".html", + ".less", + ".sass", + ".scss", + ".php", + ".java", + ".yaml", + ".yml", + ".md", +]; + +const FILENAME_WHITELIST = [ + "README", + "Dockerfile", + "package.json", + "Cargo.toml", +]; + +const DIRECTORY_BLACKLIST = [ + "node_modules", + "vendor", + "dist", + "build", + "coverage", + "pkg", + "bundle", + "built", + "eggs", + "downloads", + "env", + "venv", + "tmp", + "temp", + "debug", + "target", +]; + +async function* getFiles(dir: string): AsyncGenerator { + const dirents = await readdir(dir, { withFileTypes: true }); + for (const dirent of dirents) { + const res = resolve(dir, dirent.name); + if (dirent.isDirectory()) { + // blacklist + if (DIRECTORY_BLACKLIST.includes(dirent.name)) { + continue; + } + yield* getFiles(res); + } else { + yield res; + } + } +} + +export async function processRepository( installationId: string, login: string, - repoName: string + repoName: string, + repoId: string ) { const octokit = await getOctokit(installationId); - const { data: tarballStream } = await octokit.request( - "GET /repos/{owner}/{repo}/tarball", + const { data } = await octokit.rest.repos.get({ + owner: login, + repo: repoName, + }); + const defaultBranch = data.default_branch; + + let { data: tarballStream } = await octokit.request( + "GET /repos/{owner}/{repo}/tarball/{ref}", { owner: login, repo: repoName, + ref: defaultBranch, } ); @@ -555,27 +631,102 @@ export async function downloadRepository( const tempDir = await mkdtemp(join(tmpdir(), "repo-")); const tarPath = resolve(tempDir, "repo.tar.gz"); + // Convert ArrayBuffer to stream if necessary + if (tarballStream instanceof ArrayBuffer) { + // Wrap ArrayBuffer with a stream + const stream = new Readable(); + stream.push(Buffer.from(tarballStream)); + stream.push(null); // Signal the end of the stream + tarballStream = stream; + } + // Save the tarball to the temp directory. - await asyncPipeline(tarballStream, createWriteStream(tarPath)); - console.log("Downloaded: ", tarPath); + await asyncPipeline(tarballStream as Readable, createWriteStream(tarPath)); // Extract the tarball. await extract({ file: tarPath, cwd: tempDir, }); - console.log("Extracted: ", tarPath); // Delete the tarball. await fs.unlink(tarPath); + const files: { + fileName: string; + filePath: string[]; + sourceUrl: string; + sizeBytes: number; + documentId: string; + parentInternalId: string | null; + parents: string[]; + localFilePath: string; + }[] = []; + // Iterate over the files in the temp directory. - const files = await readdir(tempDir); - for (const file of files) { - console.log("FILE: ", file); + for await (const file of getFiles(tempDir)) { + console.log(file); + // get file extension + const ext = file.split(".").pop(); + // get file size + const { size } = await fs.stat(file); + + const isWithelisted = + EXTENSION_WHITELIST.includes(`.${ext}`) || + FILENAME_WHITELIST.includes(file); + + const isUnderLimite = size < 1024 * 1024; + + if (isWithelisted && isUnderLimite) { + const path = file + .substring(tempDir.length + 1) + .split("/") + .slice(1, -1); + + const pathInternalIds = []; + for (let i = 0; i < path.length; i++) { + const p = `github-code-${repoId}-dir-${path.slice(0, i + 1).join("/")}`; + pathInternalIds.push( + `github-code-${repoId}-dir-${blake3(p) + .toString("hex") + .substring(0, 16)}` + ); + } + + const documentId = `github-code-${repoId}-file-${blake3( + `github-code-${repoId}-file-${path.join("/")}/${file}` + ) + .toString("hex") + .substring(0, 16)}`; + + const fileName = file.split("/").pop() || ""; + const parentInternalId = + pathInternalIds.length === 0 + ? null + : (pathInternalIds[pathInternalIds.length - 1] as string); + + files.push({ + fileName, + filePath: path, + sourceUrl: `https://github.com/${login}/${repoName}/blob/${defaultBranch}/${path.join( + "/" + )}/${fileName}`, + sizeBytes: size, + documentId, + parentInternalId, + parents: pathInternalIds, + localFilePath: file, + }); + } } + return { + tempDir, + files, + }; +} + +export async function cleanUpProcessRepository(tempDir: string) { // Delete the temp directory. - await rmdir(tempDir, { recursive: true }); - console.log("Cleaned up: ", tempDir); + await rm(tempDir, { recursive: true }); }