Skip to content

Commit

Permalink
Merge pull request #170 from airtai/169-move-faststream-gen-related-f…
Browse files Browse the repository at this point in the history
…iles-into-faststream-gen-repo-and-fix-docs-build-scripts

Move faststream gen related files into faststream gen repo
  • Loading branch information
harishmohanraj authored Oct 9, 2023
2 parents a8cea81 + b474b01 commit 2dd261f
Show file tree
Hide file tree
Showing 124 changed files with 2,793 additions and 9 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
exclude: ^search/examples/
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: "v4.4.0"
Expand Down
15 changes: 13 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@
"path": "detect_secrets.filters.heuristic.is_templated_secret"
}
],
"results": {},
"generated_at": "2023-08-08T10:13:31Z"
"results": {
"search/docs/getting-started/config/index.md": [
{
"type": "Basic Auth Credentials",
"filename": "search/docs/getting-started/config/index.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 95,
"is_secret": false
}
]
},
"generated_at": "2023-10-09T11:12:30Z"
}
2 changes: 1 addition & 1 deletion faststream_gen/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.1.6"
__version__ = "0.1.7rc0"
2 changes: 1 addition & 1 deletion faststream_gen/_code_generator/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def _get_relevant_document(query: str) -> str:
The content of the most relevant document as a string.
"""
db_path = get_root_data_path() / "docs"
db = FAISS.load_local(db_path, OpenAIEmbeddings()) # type: ignore
db = FAISS.load_local(db_path, OpenAIEmbeddings())
results = db.max_marginal_relevance_search(query, k=1, fetch_k=3)
results_str = "\n".join([result.page_content for result in results])
return results_str
Expand Down
2 changes: 1 addition & 1 deletion faststream_gen/_code_generator/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def get_relevant_prompt_examples(query: str) -> Dict[str, str]:
The dictionary of the most relevant examples for each step.
"""
db_path = get_root_data_path() / "examples"
db = FAISS.load_local(db_path, OpenAIEmbeddings()) # type: ignore
db = FAISS.load_local(db_path, OpenAIEmbeddings())
results = db.similarity_search(query, k=3, fetch_k=5)
results_page_content = [r.page_content for r in results]
prompt_examples = _format_examples(results_page_content)
Expand Down
2 changes: 1 addition & 1 deletion nbs/Chat.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@
" The content of the most relevant document as a string.\n",
" \"\"\"\n",
" db_path = get_root_data_path() / \"docs\"\n",
" db = FAISS.load_local(db_path, OpenAIEmbeddings()) # type: ignore\n",
" db = FAISS.load_local(db_path, OpenAIEmbeddings())\n",
" results = db.max_marginal_relevance_search(query, k=1, fetch_k=3)\n",
" results_str = \"\\n\".join([result.page_content for result in results])\n",
" return results_str"
Expand Down
2 changes: 1 addition & 1 deletion nbs/Helper.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@
" The dictionary of the most relevant examples for each step.\n",
" \"\"\"\n",
" db_path = get_root_data_path() / \"examples\"\n",
" db = FAISS.load_local(db_path, OpenAIEmbeddings()) # type: ignore\n",
" db = FAISS.load_local(db_path, OpenAIEmbeddings())\n",
" results = db.similarity_search(query, k=3, fetch_k=5)\n",
" results_page_content = [r.page_content for r in results]\n",
" prompt_examples = _format_examples(results_page_content)\n",
Expand Down
Empty file added search/examples/__init__.py
Empty file.
Empty file.
55 changes: 55 additions & 0 deletions search/examples/example_add_and_publish_with_key/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from datetime import datetime
from typing import List

from pydantic import BaseModel, Field

from faststream import Context, ContextRepo, FastStream, Logger
from faststream.kafka import KafkaBroker


class Point(BaseModel):
x: float = Field(
..., examples=[0.5], description="The X Coordinate in the coordinate system"
)
y: float = Field(
..., examples=[0.5], description="The Y Coordinate in the coordinate system"
)
time: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


to_output_data = broker.publisher("output_data")


@app.on_startup
async def app_setup(context: ContextRepo):
message_history: List[Point] = []
context.set_global("message_history", message_history)


@broker.subscriber("input_data")
async def on_input_data(
msg: Point,
logger: Logger,
message_history: List[Point] = Context(),
key: bytes = Context("message.raw_message.key"),
) -> None:
logger.info(f"{msg=}")

message_history.append(msg)

x_sum = 0
y_sum = 0
for msg in message_history:
x_sum += msg.x
y_sum += msg.y

point_sum = Point(x=x_sum, y=y_sum, time=datetime.now())
await to_output_data.publish(point_sum, key=key)
58 changes: 58 additions & 0 deletions search/examples/example_add_and_publish_with_key/app_skeleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from datetime import datetime
from typing import List

from pydantic import BaseModel, Field

from faststream import Context, ContextRepo, FastStream, Logger
from faststream.kafka import KafkaBroker


class Point(BaseModel):
x: float = Field(
..., examples=[0.5], description="The X Coordinate in the coordinate system"
)
y: float = Field(
..., examples=[0.5], description="The Y Coordinate in the coordinate system"
)
time: datetime = Field(
...,
examples=["2020-04-23 10:20:30.400000"],
description="The timestamp of the record",
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


to_output_data = broker.publisher("output_data")


@app.on_startup
async def app_setup(context: ContextRepo):
"""
Set all necessary global variables inside ContextRepo object:
Set message_history for storing all input messages
"""
raise NotImplementedError()


@broker.subscriber("input_data")
async def on_input_data(
msg: Point,
logger: Logger,
message_history: List[Point] = Context(),
key: bytes = Context("message.raw_message.key"),
) -> None:
"""
Processes a message from the 'input_data' topic.
Add all x elements from the memory (x_sum) and all y from the memory (y_sum) and publish the message with x_sum and y_sum to the output_data topic.
The same partition key should be used in the input_data and output_data topic.
Instructions:
1. Consume a message from 'input_data' topic.
2. Create a new message object (do not directly modify the original).
3. Add all x elements from the memory (x_sum) and all y from the memory (y_sum)
4. Publish the message with x_sum and y_sum to the output_data topic. (The same partition key should be used in the input_data and output_data topic).
"""
raise NotImplementedError()
12 changes: 12 additions & 0 deletions search/examples/example_add_and_publish_with_key/description.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Develop a FastStream application using localhost kafka broker.
The app should consume messages from the input_data topic.
The input message is a JSON encoded object including two attributes:
- x: float
- y: float
- time: datetime

input_data topic should use partition key.

Keep all the previous messages in the memory.
While consuming the message, add all x elements from the memory (x_sum) and all y from the memory (y_sum) and publish the message with x_sum and y_sum to the output_data topic.
The same partition key should be used in the input_data and output_data topic.
33 changes: 33 additions & 0 deletions search/examples/example_add_and_publish_with_key/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from datetime import datetime

import pytest
from freezegun import freeze_time

from faststream import Context, TestApp
from faststream._compat import model_to_jsonable
from faststream.kafka import TestKafkaBroker

from .app import Point, app, broker


@broker.subscriber("output_data")
async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.key")):
pass


# Feeze time so the datetime always uses the same time
@freeze_time("2023-01-01")
@pytest.mark.asyncio
async def test_point_was_incremented():
async with TestKafkaBroker(broker):
async with TestApp(app):
time = datetime.now()
await broker.publish(
Point(x=1.0, y=2.0, time=time), "input_data", key=b"point_key"
)
await broker.publish(
Point(x=1.0, y=2.0, time=time), "input_data", key=b"point_key"
)

point_json = model_to_jsonable(Point(x=2.0, y=4.0, time=time))
on_output_data.mock.assert_called_with(point_json)
Empty file.
54 changes: 54 additions & 0 deletions search/examples/example_add_and_publish_with_key2/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import List

from pydantic import BaseModel, Field

from faststream import Context, ContextRepo, FastStream, Logger
from faststream.kafka import KafkaBroker


class Point(BaseModel):
x: float = Field(
..., examples=[0.5], description="The X Coordinate in the coordinate system"
)
y: float = Field(
..., examples=[0.5], description="The Y Coordinate in the coordinate system"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


to_output_data = broker.publisher("output_data")


@app.on_startup
async def app_setup(context: ContextRepo):
message_history: List[Point] = []
context.set_global("message_history", message_history)


@broker.subscriber("input_data")
async def on_input_data(
msg: Point,
logger: Logger,
message_history: List[Point] = Context(),
key: bytes = Context("message.raw_message.key"),
) -> None:
logger.info(f"{msg=}")

message_history.append(msg)

if len(message_history) > 100:
message_history.pop(0)

last_100_messages = message_history[-10:]

x_sum = 0
y_sum = 0
for msg in last_100_messages:
x_sum += msg.x
y_sum += msg.y

point_sum = Point(x=x_sum, y=y_sum)
await to_output_data.publish(point_sum, key=key)
54 changes: 54 additions & 0 deletions search/examples/example_add_and_publish_with_key2/app_skeleton.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import List

from pydantic import BaseModel, Field

from faststream import Context, ContextRepo, FastStream, Logger
from faststream.kafka import KafkaBroker


class Point(BaseModel):
x: float = Field(
..., examples=[0.5], description="The X Coordinate in the coordinate system"
)
y: float = Field(
..., examples=[0.5], description="The Y Coordinate in the coordinate system"
)


broker = KafkaBroker("localhost:9092")
app = FastStream(broker)


to_output_data = broker.publisher("output_data")


@app.on_startup
async def app_setup(context: ContextRepo):
"""
Set all necessary global variables inside ContextRepo object:
Set message_history for storing all input messages
"""
raise NotImplementedError()


@broker.subscriber("input_data")
async def on_input_data(
msg: Point,
logger: Logger,
message_history: List[Point] = Context(),
key: bytes = Context("message.raw_message.key"),
) -> None:
"""
Processes a message from the 'input_data' topic.
Keep only the last 100 messages in memory.
Add all x elements from the memory (x_sum) and all y from the memory (y_sum) and publish the message with x_sum and y_sum to the output_data topic.
The same partition key should be used in the input_data and output_data topic.
Instructions:
1. Consume a message from 'input_data' topic.
2. Keep only the last 100 messages in memory.
3. Create a new message object (do not directly modify the original).
4. Add all x elements from the memory (x_sum) and all y from the memory (y_sum)
5. Publish the message with x_sum and y_sum to the output_data topic. (The same partition key should be used in the input_data and output_data topic).
"""
raise NotImplementedError()
11 changes: 11 additions & 0 deletions search/examples/example_add_and_publish_with_key2/description.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Develop a FastStream application using localhost kafka broker.
The app should consume messages from the input_data topic.
The input message is a JSON encoded object including two attributes:
- x: float
- y: float

input_data topic should use partition key.

Keep only the last 100 messages in memory.
While consuming the message, add all x elements from the memory (x_sum) and all y from the memory (y_sum) and publish the message with x_sum and y_sum to the output_data topic.
The same partition key should be used in the input_data and output_data topic.
21 changes: 21 additions & 0 deletions search/examples/example_add_and_publish_with_key2/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pytest

from faststream import Context, TestApp
from faststream.kafka import TestKafkaBroker

from .app import Point, app, broker


@broker.subscriber("output_data")
async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.key")):
pass


@pytest.mark.asyncio
async def test_point_was_incremented():
async with TestKafkaBroker(broker):
async with TestApp(app):
await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"point_key")
await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"point_key")

on_output_data.mock.assert_called_with(dict(Point(x=2.0, y=4.0)))
Empty file.
Loading

0 comments on commit 2dd261f

Please sign in to comment.