Skip to content

Commit

Permalink
Feat: Add support for rendering records as LLM documents (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Feb 27, 2024
1 parent 1964a8b commit 8ebc28f
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 163 deletions.
100 changes: 100 additions & 0 deletions airbyte/_util/document_rendering.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Methods for converting Airbyte records into documents."""
from __future__ import annotations

from typing import TYPE_CHECKING, Any

import yaml
from pydantic import BaseModel

from airbyte.documents import Document


if TYPE_CHECKING:
from collections.abc import Iterable


def _to_title_case(name: str, /) -> str:
"""Convert a string to title case.
Unlike Python's built-in `str.title` method, this function doesn't lowercase the rest of the
string. This is useful for converting "snake_case" to "Title Case" without negatively affecting
strings that are already in title case or camel case.
"""
return " ".join(word[0].upper() + word[1:] for word in name.split("_"))


class CustomRenderingInstructions(BaseModel):
"""Instructions for rendering a stream's records as documents."""

title_property: str | None
content_properties: list[str]
frontmatter_properties: list[str]
metadata_properties: list[str]


class DocumentRenderer(BaseModel):
"""Instructions for rendering a stream's records as documents."""

title_property: str | None
content_properties: list[str] | None
metadata_properties: list[str] | None
render_metadata: bool = False

# TODO: Add primary key and cursor key support:
# primary_key_properties: list[str]
# cursor_property: str | None

def render_document(self, record: dict[str, Any]) -> Document:
"""Render a record as a document.
The document will be rendered as a markdown document, with content, frontmatter, and an
optional title. If there are multiple properties to render as content, they will be rendered
beneath H2 section headers. If there is only one property to render as content, it will be
rendered without a section header. If a title property is specified, it will be rendered as
an H1 header at the top of the document.
Returns:
A tuple of (content: str, metadata: dict).
"""
content = ""
if not self.metadata_properties:
self.metadata_properties = [
key
for key in record
if key not in (self.content_properties or []) and key != self.title_property
]
if self.title_property:
content += f"# {record[self.title_property]}\n\n"
if self.render_metadata or not self.content_properties:
content += "```yaml\n"
content += yaml.dump({key: record[key] for key in self.metadata_properties})
content += "```\n"

# TODO: Add support for primary key and doc ID generation:
# doc_id: str = (
# "-".join(str(record[key]) for key in self.primary_key_properties)
# if self.primary_key_properties
# else str(hash(record))
# )

if not self.content_properties:
pass
elif len(self.content_properties) == 1:
# Only one property to render as content; no need for section headers.
content += str(record[self.content_properties[0]])
else:
# Multiple properties to render as content; use H2 section headers.
content += "\n".join(
f"## {_to_title_case(key)}\n\n{record[key]}\n\n" for key in self.content_properties
)

return Document(
# id=doc_id, # TODD: Add support for primary key and doc ID generation.
content=content,
metadata={key: record[key] for key in self.metadata_properties},
)

def render_documents(self, records: Iterable[dict[str, Any]]) -> Iterable[Document]:
"""Render an iterable of records as documents."""
yield from (self.render_document(record=record) for record in records)
41 changes: 38 additions & 3 deletions airbyte/datasets/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,28 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections.abc import Iterator, Mapping
from typing import Any, cast
from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING, Any, cast

from pandas import DataFrame

from airbyte._util.document_rendering import DocumentRenderer


if TYPE_CHECKING:
from airbyte_protocol.models import ConfiguredAirbyteStream

from airbyte.documents import Document


class DatasetBase(ABC):
"""Base implementation for all datasets."""

def __init__(self, stream_metadata: ConfiguredAirbyteStream) -> None:
self._stream_metadata = stream_metadata

@abstractmethod
def __iter__(self) -> Iterator[Mapping[str, Any]]:
def __iter__(self) -> Iterator[dict[str, Any]]:
"""Return the iterator of records."""
raise NotImplementedError

Expand All @@ -25,3 +36,27 @@ def to_pandas(self) -> DataFrame:
# expects an iterator of dict objects. This cast is safe because we know
# duck typing is correct for this use case.
return DataFrame(cast(Iterator[dict[str, Any]], self))

def to_documents(
self,
title_property: str | None = None,
content_properties: list[str] | None = None,
metadata_properties: list[str] | None = None,
*,
render_metadata: bool = False,
) -> Iterable[Document]:
"""Return the iterator of documents.
If metadata_properties is not set, all properties that are not content will be added to
the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the
the main content. Otherwise, metadata will be attached to the document but not rendered.
"""
renderer = DocumentRenderer(
title_property=title_property,
content_properties=content_properties,
metadata_properties=metadata_properties,
render_metadata=render_metadata,
)
yield from renderer.render_documents(self)
13 changes: 9 additions & 4 deletions airbyte/datasets/_lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,24 @@
if TYPE_CHECKING:
from collections.abc import Iterator, Mapping

from airbyte_protocol.models import ConfiguredAirbyteStream


class LazyDataset(DatasetBase):
"""A dataset that is loaded incrementally from a source or a SQL query."""

def __init__(
self,
iterator: Iterator[Mapping[str, Any]],
iterator: Iterator[dict[str, Any]],
stream_metadata: ConfiguredAirbyteStream,
) -> None:
self._iterator: Iterator[Mapping[str, Any]] = iterator
super().__init__()
self._iterator: Iterator[dict[str, Any]] = iterator
super().__init__(
stream_metadata=stream_metadata,
)

@overrides
def __iter__(self) -> Iterator[Mapping[str, Any]]:
def __iter__(self) -> Iterator[dict[str, Any]]:
return self._iterator

def __next__(self) -> Mapping[str, Any]:
Expand Down
17 changes: 12 additions & 5 deletions airbyte/datasets/_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from __future__ import annotations

from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, cast

from overrides import overrides
Expand Down Expand Up @@ -39,18 +38,22 @@ def __init__(
self._cache: CacheBase = cache
self._stream_name: str = stream_name
self._query_statement: Selectable = query_statement
super().__init__()
super().__init__(
stream_metadata=cache.processor._get_stream_config( # noqa: SLF001 # Member is private until we have a public API for it.
stream_name=stream_name
),
)

@property
def stream_name(self) -> str:
return self._stream_name

def __iter__(self) -> Iterator[Mapping[str, Any]]:
def __iter__(self) -> Iterator[dict[str, Any]]:
with self._cache.processor.get_sql_connection() as conn:
for row in conn.execute(self._query_statement):
# Access to private member required because SQLAlchemy doesn't expose a public API.
# https://pydoc.dev/sqlalchemy/latest/sqlalchemy.engine.row.RowMapping.html
yield cast(Mapping[str, Any], row._mapping) # noqa: SLF001
yield cast(dict[str, Any], row._mapping) # noqa: SLF001

def __len__(self) -> int:
"""Return the number of records in the dataset.
Expand Down Expand Up @@ -100,7 +103,11 @@ class CachedDataset(SQLDataset):
underlying table as a SQLAlchemy Table object.
"""

def __init__(self, cache: CacheBase, stream_name: str) -> None:
def __init__(
self,
cache: CacheBase,
stream_name: str,
) -> None:
"""We construct the query statement by selecting all columns from the table.
This prevents the need to scan the table schema to construct the query statement.
Expand Down
48 changes: 48 additions & 0 deletions airbyte/documents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Methods for converting Airbyte records into documents."""
from __future__ import annotations

from typing import TYPE_CHECKING, Any

from pydantic import BaseModel


if TYPE_CHECKING:
import datetime


MAX_SINGLE_LINE_LENGTH = 60
AIRBYTE_DOCUMENT_RENDERING = "airbyte_document_rendering"
TITLE_PROPERTY = "title_property"
CONTENT_PROPS = "content_properties"
METADATA_PROPERTIES = "metadata_properties"


class Document(BaseModel):
"""A PyAirbyte document is a specific projection on top of a record.
Documents have the following structure:
- id (str): A unique string identifier for the document.
- content (str): A string representing the record when rendered as a document.
- metadata (dict[str, Any]): Associated metadata about the document, such as the record's IDs
and/or URLs.
This class is duck-typed to be compatible with LangChain project's `Document` class.
"""

id: str | None = None
content: str
metadata: dict[str, Any]
last_modified: datetime.datetime | None = None

def __str__(self) -> str:
return self.content

@property
def page_content(self) -> str:
"""Return the content of the document.
This is an alias for the `content` property, and is provided for duck-type compatibility
with the LangChain project's `Document` class.
"""
return self.content
30 changes: 29 additions & 1 deletion airbyte/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

from airbyte._executor import Executor
from airbyte.caches import CacheBase
from airbyte.documents import Document


@contextmanager
Expand Down Expand Up @@ -351,7 +352,34 @@ def _with_missing_columns(records: Iterable[dict[str, Any]]) -> Iterator[dict[st
),
)
)
return LazyDataset(iterator)
return LazyDataset(
iterator,
stream_metadata=configured_stream,
)

def get_documents(
self,
stream: str,
title_property: str | None = None,
content_properties: list[str] | None = None,
metadata_properties: list[str] | None = None,
*,
render_metadata: bool = False,
) -> Iterable[Document]:
"""Read a stream from the connector and return the records as documents.
If metadata_properties is not set, all properties that are not content will be added to
the metadata.
If render_metadata is True, metadata will be rendered in the document, as well as the
the main content.
"""
return self.get_records(stream).to_documents(
title_property=title_property,
content_properties=content_properties,
metadata_properties=metadata_properties,
render_metadata=render_metadata,
)

def check(self) -> None:
"""Call check on the connector.
Expand Down
32 changes: 32 additions & 0 deletions examples/run_get_documents_from_github.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
"""This examples script demonstrates how to render documents from a source."""
from __future__ import annotations

import rich

import airbyte as ab


def main() -> None:
read_result = ab.get_source(
"source-github",
config={
"repositories": ["airbytehq/quickstarts"],
"credentials": {"personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN")},
},
streams=["issues"],
).read()

for doc in read_result["issues"].to_documents(
title_property="title",
content_properties=["body"],
metadata_properties=["state", "url", "number"],
# primary_key_properties=["id"],
# cursor_property="updated_at",
render_metadata=True,
):
rich.print(rich.markdown.Markdown(str(doc) + "\n\n" + str("-" * 40)))


if __name__ == "__main__":
main()
Loading

0 comments on commit 8ebc28f

Please sign in to comment.