Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new transform to ingest markdown (.md) files and convert to parquet format #364

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 65 additions & 15 deletions tools/ingest2parquet/src/ingest2parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,52 +74,102 @@ def zip_to_table(data_access: DataAccess, file_path, detect_prog_lang: Any) -> p
return table


def md_to_table(data_access: DataAccess, file_path: str, detect_prog_lang: Any) -> List[dict]:
"""
Extracts contents from an MD file and converts them into a list of dictionaries.

:param data_access: DataAccess object for accessing data
:param file_path: Path to the MD file
:param detect_prog_lang: Object for detecting programming language from file extension
:return: List of dictionaries containing extracted data
"""
data = []
md_name = os.path.basename(file_path)

# Open the file and read its content
with open(file_path, "r") as file:
try:
markdown_content = file.read()
if len(markdown_content) > 0:
ext = ".md"
file_data = {
"title": os.path.basename(file_path),
"document": md_name,
"contents": markdown_content,
"document_id": str(uuid.uuid4()),
"ext": ext,
"hash": TransformUtils.str_to_hash(markdown_content),
"size": TransformUtils.deep_get_size(markdown_content),
"date_acquired": datetime.now().isoformat(),
}
# Detect programming language if applicable
if detect_prog_lang:
lang = detect_prog_lang.get_lang_from_ext(ext)
file_data["programming_language"] = lang

data.append(file_data)
else:
raise Exception("No content in the file")
except Exception as e:
print(f"Skipping {os.path.basename(file_path)} due to error: {str(e)}")

return data


def raw_to_parquet(
data_access_factory: DataAccessFactory,
file_path,
file_path: str,
detect_prog_lang: Any,
snapshot: str,
domain: str,
) -> tuple[bool, dict[str:Any]]:
) -> Tuple[bool, Dict[str, Any]]:
"""
Converts raw data file (ZIP) to Parquet format and saves it.
Converts raw data file (ZIP or MD) to Parquet format and saves it.

:param data_access_factory: DataAccessFactory object for accessing data
:param data_access_factory: DataAccessFactory object for creating DataAccess
:param file_path: Path to the raw data file
:param detect_prog_lang: Object for detecting programming language from file extension
:param snapshot: Snapshot identifier to be added to the table
:param domain: Domain identifier to be added to the table
:return: Tuple indicating success (True/False) and additional metadata
"""

try:
# Create a DataAccess object for accessing data
data_access = data_access_factory.create_data_access()

# Get the file extension
ext = TransformUtils.get_file_extension(file_path)[1]
# Convert the file to a table based on its extension
if ext == ".zip":
table = zip_to_table(data_access, file_path, detect_prog_lang)
if table.num_rows > 0:
snapshot_column = [snapshot] * table.num_rows
table = TransformUtils.add_column(table=table, name="snapshot", content=snapshot_column)
domain_column = [domain] * table.num_rows
table = TransformUtils.add_column(table=table, name="domain", content=domain_column)
elif ext == ".md":
table = md_to_table(data_access, file_path, detect_prog_lang)
else:
raise Exception(f"Got {ext} file, not supported")
raise Exception(f"File type '{ext}' is not supported")

if table.num_rows > 0:
# Add snapshot and domain columns to the table
snapshot_column = [snapshot] * table.num_rows
table = TransformUtils.add_column(table=table, name="snapshot", content=snapshot_column)
domain_column = [domain] * table.num_rows
table = TransformUtils.add_column(table=table, name="domain", content=domain_column)

# Get the output file name for the Parquet file
output_file_name = data_access.get_output_location(file_path).replace(".zip", ".parquet")
output_file_name = data_access.get_output_location(file_path).rsplit(".", 1)[0] + ".parquet"

# Save the PyArrow table as a Parquet file and get metadata
l, metadata, _ = data_access.save_table(output_file_name, table)
bytes_in_memory, metadata, _ = data_access.save_table(output_file_name, table)
if metadata:
return (
True,
{
"path": file_path,
"bytes_in_memory": l,
"bytes_in_memory": bytes_in_memory,
"row_count": table.num_rows,
},
)
else:
raise Exception("Failed to upload")
raise Exception("Failed to upload metadata")

except Exception as e:
return False, {"path": file_path, "error": str(e)}
Expand Down