diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index d5752f2..f8c0bf1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,6 +2,7 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks +exclude: ^search/examples/ repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: "v4.4.0" diff --git a/.secrets.baseline b/.secrets.baseline index 434c1d6..eabcebe 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -111,6 +111,17 @@ "path": "detect_secrets.filters.heuristic.is_templated_secret" } ], - "results": {}, - "generated_at": "2023-08-08T10:13:31Z" + "results": { + "search/docs/getting-started/config/index.md": [ + { + "type": "Basic Auth Credentials", + "filename": "search/docs/getting-started/config/index.md", + "hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450", + "is_verified": false, + "line_number": 95, + "is_secret": false + } + ] + }, + "generated_at": "2023-10-09T11:12:30Z" } diff --git a/faststream_gen/__init__.py b/faststream_gen/__init__.py index 32efefd..5c796c0 100644 --- a/faststream_gen/__init__.py +++ b/faststream_gen/__init__.py @@ -1 +1 @@ -__version__ = "0.1.6" \ No newline at end of file +__version__ = "0.1.7rc0" \ No newline at end of file diff --git a/faststream_gen/_code_generator/chat.py b/faststream_gen/_code_generator/chat.py index 5f0098a..8c0599e 100644 --- a/faststream_gen/_code_generator/chat.py +++ b/faststream_gen/_code_generator/chat.py @@ -98,7 +98,7 @@ def _get_relevant_document(query: str) -> str: The content of the most relevant document as a string. """ db_path = get_root_data_path() / "docs" - db = FAISS.load_local(db_path, OpenAIEmbeddings()) # type: ignore + db = FAISS.load_local(db_path, OpenAIEmbeddings()) results = db.max_marginal_relevance_search(query, k=1, fetch_k=3) results_str = "\n".join([result.page_content for result in results]) return results_str diff --git a/faststream_gen/_code_generator/helper.py b/faststream_gen/_code_generator/helper.py index 74cebbc..edbe6d9 100644 --- a/faststream_gen/_code_generator/helper.py +++ b/faststream_gen/_code_generator/helper.py @@ -197,7 +197,7 @@ def get_relevant_prompt_examples(query: str) -> Dict[str, str]: The dictionary of the most relevant examples for each step. """ db_path = get_root_data_path() / "examples" - db = FAISS.load_local(db_path, OpenAIEmbeddings()) # type: ignore + db = FAISS.load_local(db_path, OpenAIEmbeddings()) results = db.similarity_search(query, k=3, fetch_k=5) results_page_content = [r.page_content for r in results] prompt_examples = _format_examples(results_page_content) diff --git a/nbs/Chat.ipynb b/nbs/Chat.ipynb index a667c70..2bb1694 100644 --- a/nbs/Chat.ipynb +++ b/nbs/Chat.ipynb @@ -230,7 +230,7 @@ " The content of the most relevant document as a string.\n", " \"\"\"\n", " db_path = get_root_data_path() / \"docs\"\n", - " db = FAISS.load_local(db_path, OpenAIEmbeddings()) # type: ignore\n", + " db = FAISS.load_local(db_path, OpenAIEmbeddings())\n", " results = db.max_marginal_relevance_search(query, k=1, fetch_k=3)\n", " results_str = \"\\n\".join([result.page_content for result in results])\n", " return results_str" diff --git a/nbs/Helper.ipynb b/nbs/Helper.ipynb index 73551da..cbb031a 100644 --- a/nbs/Helper.ipynb +++ b/nbs/Helper.ipynb @@ -594,7 +594,7 @@ " The dictionary of the most relevant examples for each step.\n", " \"\"\"\n", " db_path = get_root_data_path() / \"examples\"\n", - " db = FAISS.load_local(db_path, OpenAIEmbeddings()) # type: ignore\n", + " db = FAISS.load_local(db_path, OpenAIEmbeddings())\n", " results = db.similarity_search(query, k=3, fetch_k=5)\n", " results_page_content = [r.page_content for r in results]\n", " prompt_examples = _format_examples(results_page_content)\n", diff --git a/search/examples/__init__.py b/search/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_add_and_publish_with_key/__init__.py b/search/examples/example_add_and_publish_with_key/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_add_and_publish_with_key/app.py b/search/examples/example_add_and_publish_with_key/app.py new file mode 100644 index 0000000..bf92ecc --- /dev/null +++ b/search/examples/example_add_and_publish_with_key/app.py @@ -0,0 +1,55 @@ +from datetime import datetime +from typing import List + +from pydantic import BaseModel, Field + +from faststream import Context, ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Point(BaseModel): + x: float = Field( + ..., examples=[0.5], description="The X Coordinate in the coordinate system" + ) + y: float = Field( + ..., examples=[0.5], description="The Y Coordinate in the coordinate system" + ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@app.on_startup +async def app_setup(context: ContextRepo): + message_history: List[Point] = [] + context.set_global("message_history", message_history) + + +@broker.subscriber("input_data") +async def on_input_data( + msg: Point, + logger: Logger, + message_history: List[Point] = Context(), + key: bytes = Context("message.raw_message.key"), +) -> None: + logger.info(f"{msg=}") + + message_history.append(msg) + + x_sum = 0 + y_sum = 0 + for msg in message_history: + x_sum += msg.x + y_sum += msg.y + + point_sum = Point(x=x_sum, y=y_sum, time=datetime.now()) + await to_output_data.publish(point_sum, key=key) diff --git a/search/examples/example_add_and_publish_with_key/app_skeleton.py b/search/examples/example_add_and_publish_with_key/app_skeleton.py new file mode 100644 index 0000000..c123128 --- /dev/null +++ b/search/examples/example_add_and_publish_with_key/app_skeleton.py @@ -0,0 +1,58 @@ +from datetime import datetime +from typing import List + +from pydantic import BaseModel, Field + +from faststream import Context, ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Point(BaseModel): + x: float = Field( + ..., examples=[0.5], description="The X Coordinate in the coordinate system" + ) + y: float = Field( + ..., examples=[0.5], description="The Y Coordinate in the coordinate system" + ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@app.on_startup +async def app_setup(context: ContextRepo): + """ + Set all necessary global variables inside ContextRepo object: + Set message_history for storing all input messages + """ + raise NotImplementedError() + + +@broker.subscriber("input_data") +async def on_input_data( + msg: Point, + logger: Logger, + message_history: List[Point] = Context(), + key: bytes = Context("message.raw_message.key"), +) -> None: + """ + Processes a message from the 'input_data' topic. + Add all x elements from the memory (x_sum) and all y from the memory (y_sum) and publish the message with x_sum and y_sum to the output_data topic. + The same partition key should be used in the input_data and output_data topic. + + Instructions: + 1. Consume a message from 'input_data' topic. + 2. Create a new message object (do not directly modify the original). + 3. Add all x elements from the memory (x_sum) and all y from the memory (y_sum) + 4. Publish the message with x_sum and y_sum to the output_data topic. (The same partition key should be used in the input_data and output_data topic). + """ + raise NotImplementedError() diff --git a/search/examples/example_add_and_publish_with_key/description.txt b/search/examples/example_add_and_publish_with_key/description.txt new file mode 100644 index 0000000..7768996 --- /dev/null +++ b/search/examples/example_add_and_publish_with_key/description.txt @@ -0,0 +1,12 @@ +Develop a FastStream application using localhost kafka broker. +The app should consume messages from the input_data topic. +The input message is a JSON encoded object including two attributes: + - x: float + - y: float + - time: datetime + +input_data topic should use partition key. + +Keep all the previous messages in the memory. +While consuming the message, add all x elements from the memory (x_sum) and all y from the memory (y_sum) and publish the message with x_sum and y_sum to the output_data topic. +The same partition key should be used in the input_data and output_data topic. diff --git a/search/examples/example_add_and_publish_with_key/test_app.py b/search/examples/example_add_and_publish_with_key/test_app.py new file mode 100644 index 0000000..db9741a --- /dev/null +++ b/search/examples/example_add_and_publish_with_key/test_app.py @@ -0,0 +1,33 @@ +from datetime import datetime + +import pytest +from freezegun import freeze_time + +from faststream import Context, TestApp +from faststream._compat import model_to_jsonable +from faststream.kafka import TestKafkaBroker + +from .app import Point, app, broker + + +@broker.subscriber("output_data") +async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.key")): + pass + + +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") +@pytest.mark.asyncio +async def test_point_was_incremented(): + async with TestKafkaBroker(broker): + async with TestApp(app): + time = datetime.now() + await broker.publish( + Point(x=1.0, y=2.0, time=time), "input_data", key=b"point_key" + ) + await broker.publish( + Point(x=1.0, y=2.0, time=time), "input_data", key=b"point_key" + ) + + point_json = model_to_jsonable(Point(x=2.0, y=4.0, time=time)) + on_output_data.mock.assert_called_with(point_json) diff --git a/search/examples/example_add_and_publish_with_key2/__init__.py b/search/examples/example_add_and_publish_with_key2/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_add_and_publish_with_key2/app.py b/search/examples/example_add_and_publish_with_key2/app.py new file mode 100644 index 0000000..1735051 --- /dev/null +++ b/search/examples/example_add_and_publish_with_key2/app.py @@ -0,0 +1,54 @@ +from typing import List + +from pydantic import BaseModel, Field + +from faststream import Context, ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Point(BaseModel): + x: float = Field( + ..., examples=[0.5], description="The X Coordinate in the coordinate system" + ) + y: float = Field( + ..., examples=[0.5], description="The Y Coordinate in the coordinate system" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@app.on_startup +async def app_setup(context: ContextRepo): + message_history: List[Point] = [] + context.set_global("message_history", message_history) + + +@broker.subscriber("input_data") +async def on_input_data( + msg: Point, + logger: Logger, + message_history: List[Point] = Context(), + key: bytes = Context("message.raw_message.key"), +) -> None: + logger.info(f"{msg=}") + + message_history.append(msg) + + if len(message_history) > 100: + message_history.pop(0) + + last_100_messages = message_history[-10:] + + x_sum = 0 + y_sum = 0 + for msg in last_100_messages: + x_sum += msg.x + y_sum += msg.y + + point_sum = Point(x=x_sum, y=y_sum) + await to_output_data.publish(point_sum, key=key) diff --git a/search/examples/example_add_and_publish_with_key2/app_skeleton.py b/search/examples/example_add_and_publish_with_key2/app_skeleton.py new file mode 100644 index 0000000..e118f64 --- /dev/null +++ b/search/examples/example_add_and_publish_with_key2/app_skeleton.py @@ -0,0 +1,54 @@ +from typing import List + +from pydantic import BaseModel, Field + +from faststream import Context, ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Point(BaseModel): + x: float = Field( + ..., examples=[0.5], description="The X Coordinate in the coordinate system" + ) + y: float = Field( + ..., examples=[0.5], description="The Y Coordinate in the coordinate system" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@app.on_startup +async def app_setup(context: ContextRepo): + """ + Set all necessary global variables inside ContextRepo object: + Set message_history for storing all input messages + """ + raise NotImplementedError() + + +@broker.subscriber("input_data") +async def on_input_data( + msg: Point, + logger: Logger, + message_history: List[Point] = Context(), + key: bytes = Context("message.raw_message.key"), +) -> None: + """ + Processes a message from the 'input_data' topic. + Keep only the last 100 messages in memory. + Add all x elements from the memory (x_sum) and all y from the memory (y_sum) and publish the message with x_sum and y_sum to the output_data topic. + The same partition key should be used in the input_data and output_data topic. + + Instructions: + 1. Consume a message from 'input_data' topic. + 2. Keep only the last 100 messages in memory. + 3. Create a new message object (do not directly modify the original). + 4. Add all x elements from the memory (x_sum) and all y from the memory (y_sum) + 5. Publish the message with x_sum and y_sum to the output_data topic. (The same partition key should be used in the input_data and output_data topic). + """ + raise NotImplementedError() diff --git a/search/examples/example_add_and_publish_with_key2/description.txt b/search/examples/example_add_and_publish_with_key2/description.txt new file mode 100644 index 0000000..caf61b3 --- /dev/null +++ b/search/examples/example_add_and_publish_with_key2/description.txt @@ -0,0 +1,11 @@ +Develop a FastStream application using localhost kafka broker. +The app should consume messages from the input_data topic. +The input message is a JSON encoded object including two attributes: + - x: float + - y: float + +input_data topic should use partition key. + +Keep only the last 100 messages in memory. +While consuming the message, add all x elements from the memory (x_sum) and all y from the memory (y_sum) and publish the message with x_sum and y_sum to the output_data topic. +The same partition key should be used in the input_data and output_data topic. diff --git a/search/examples/example_add_and_publish_with_key2/test_app.py b/search/examples/example_add_and_publish_with_key2/test_app.py new file mode 100644 index 0000000..028244f --- /dev/null +++ b/search/examples/example_add_and_publish_with_key2/test_app.py @@ -0,0 +1,21 @@ +import pytest + +from faststream import Context, TestApp +from faststream.kafka import TestKafkaBroker + +from .app import Point, app, broker + + +@broker.subscriber("output_data") +async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.key")): + pass + + +@pytest.mark.asyncio +async def test_point_was_incremented(): + async with TestKafkaBroker(broker): + async with TestApp(app): + await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"point_key") + await broker.publish(Point(x=1.0, y=2.0), "input_data", key=b"point_key") + + on_output_data.mock.assert_called_with(dict(Point(x=2.0, y=4.0))) diff --git a/search/examples/example_calculate_mean_temperature/__init__.py b/search/examples/example_calculate_mean_temperature/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_calculate_mean_temperature/app.py b/search/examples/example_calculate_mean_temperature/app.py new file mode 100644 index 0000000..1498fce --- /dev/null +++ b/search/examples/example_calculate_mean_temperature/app.py @@ -0,0 +1,53 @@ +from datetime import datetime +from statistics import mean +from typing import Dict, List + +from pydantic import BaseModel, Field, NonNegativeFloat + +from faststream import Context, ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +class Weather(BaseModel): + temperature: float = Field( + ..., examples=[20], description="Temperature in Celsius degrees" + ) + windspeed: NonNegativeFloat = Field( + ..., examples=[20], description="Wind speed in kilometers per hour" + ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +publisher = broker.publisher("temperature_mean") + + +@app.on_startup +async def app_setup(context: ContextRepo): + message_history: Dict[str, List[float]] = {} + context.set_global("message_history", message_history) + + +@broker.subscriber("weather") +async def on_weather( + msg: Weather, + logger: Logger, + message_history: Dict[str, List[float]] = Context(), + key: bytes = Context("message.raw_message.key"), +) -> None: + logger.info(f"Weather info {msg=}") + + weather_key = key.decode("utf-8") + if weather_key not in message_history: + message_history[weather_key] = [] + + message_history[weather_key].append(msg.temperature) + + mean_temperature = mean(message_history[weather_key][-5:]) + await publisher.publish(mean_temperature, key=key) diff --git a/search/examples/example_calculate_mean_temperature/app_skeleton.py b/search/examples/example_calculate_mean_temperature/app_skeleton.py new file mode 100644 index 0000000..e26adc7 --- /dev/null +++ b/search/examples/example_calculate_mean_temperature/app_skeleton.py @@ -0,0 +1,57 @@ +from datetime import datetime +from typing import Dict, List + +from pydantic import BaseModel, Field, NonNegativeFloat + +from faststream import Context, ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +class Weather(BaseModel): + temperature: float = Field( + ..., examples=[20], description="Temperature in Celsius degrees" + ) + windspeed: NonNegativeFloat = Field( + ..., examples=[20], description="Wind speed in kilometers per hour" + ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +publisher = broker.publisher("temperature_mean") + + +@app.on_startup +async def app_setup(context: ContextRepo): + """ + Set all necessary global variables inside ContextRepo object: + Set message_history for storing all input messages + """ + raise NotImplementedError() + + +@broker.subscriber("weather") +async def on_weather( + msg: Weather, + logger: Logger, + message_history: Dict[str, List[float]] = Context(), + key: bytes = Context("message.raw_message.key"), +) -> None: + """ + Processes a message from the 'weather' topic. This topic uses partition key. + Calculate the temperature mean of the last 5 messages for the given partition key + Publish the temperature price mean to the temperature_mean topic and use the same partition key which the weather topic is using. + + Instructions: + 1. Consume a message from 'weather' topic. + 2. Save each message to a dictionary (global variable) - partition key should be usded as a dictionary key and value should be a List of temperatures. + 3. Calculate the temperature mean of the last 5 messages for the given partition key + 4. Publish the temperature price mean to the temperature_mean topic and use the same partition key which the weather topic is using. + """ + raise NotImplementedError() diff --git a/search/examples/example_calculate_mean_temperature/description.txt b/search/examples/example_calculate_mean_temperature/description.txt new file mode 100644 index 0000000..aa70dbb --- /dev/null +++ b/search/examples/example_calculate_mean_temperature/description.txt @@ -0,0 +1,11 @@ +Create faststream application for consuming messages from the weather topic. +This topic needs to use partition key. + +weather messages use JSON with two attributes: + - temperature (type float) + - windspeed (type float) + - timestamp (type datetime) + +Application should save each message to a dictionary (global variable) - partition key should be usded as a dictionary key and value should be a List of temperatures. +Calculate the temperature mean of the last 5 messages for the given partition key +Publish the temperature price mean to the temperature_mean topic and use the same partition key which the weather topic is using. diff --git a/search/examples/example_calculate_mean_temperature/test_app.py b/search/examples/example_calculate_mean_temperature/test_app.py new file mode 100644 index 0000000..ddf934a --- /dev/null +++ b/search/examples/example_calculate_mean_temperature/test_app.py @@ -0,0 +1,47 @@ +from datetime import datetime + +import pytest +from freezegun import freeze_time + +from faststream import Context, TestApp +from faststream._compat import model_to_jsonable +from faststream.kafka import TestKafkaBroker + +from .app import Weather, app, broker, on_weather + + +@broker.subscriber("temperature_mean") +async def on_temperature_mean( + msg: float, key: bytes = Context("message.raw_message.key") +): + pass + + +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") +@pytest.mark.asyncio +async def test_point_was_incremented(): + async with TestKafkaBroker(broker): + async with TestApp(app): + timestamp = datetime.now() + await broker.publish( + Weather(temperature=20.5, windspeed=20, timestamp=timestamp), + "weather", + key=b"ZG", + ) + weather_json = model_to_jsonable( + Weather(temperature=20.5, windspeed=20, timestamp=timestamp) + ) + on_weather.mock.assert_called_with(weather_json) + + await broker.publish( + Weather(temperature=10.5, windspeed=20, timestamp=timestamp), + "weather", + key=b"ZG", + ) + weather_json = model_to_jsonable( + Weather(temperature=10.5, windspeed=20, timestamp=timestamp) + ) + on_weather.mock.assert_called_with(weather_json) + + on_temperature_mean.mock.assert_called_with(15.5) diff --git a/search/examples/example_consume_publish_with_key/__init__.py b/search/examples/example_consume_publish_with_key/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_consume_publish_with_key/app.py b/search/examples/example_consume_publish_with_key/app.py new file mode 100644 index 0000000..dec320b --- /dev/null +++ b/search/examples/example_consume_publish_with_key/app.py @@ -0,0 +1,36 @@ +from datetime import datetime + +from pydantic import BaseModel, Field + +from faststream import Context, FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Point(BaseModel): + x: float = Field( + ..., examples=[0.5], description="The X Coordinate in the coordinate system" + ) + y: float = Field( + ..., examples=[0.5], description="The Y Coordinate in the coordinate system" + ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@broker.subscriber("input_data") +async def on_input_data( + msg: Point, logger: Logger, key: bytes = Context("message.raw_message.key") +) -> None: + logger.info(f"{msg=}") + incremented_point = Point(x=msg.x + 1, y=msg.y + 1, time=datetime.now()) + await to_output_data.publish(incremented_point, key=key) diff --git a/search/examples/example_consume_publish_with_key/app_skeleton.py b/search/examples/example_consume_publish_with_key/app_skeleton.py new file mode 100644 index 0000000..15138a1 --- /dev/null +++ b/search/examples/example_consume_publish_with_key/app_skeleton.py @@ -0,0 +1,45 @@ +from datetime import datetime + +from pydantic import BaseModel, Field + +from faststream import Context, FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Point(BaseModel): + x: float = Field( + ..., examples=[0.5], description="The X Coordinate in the coordinate system" + ) + y: float = Field( + ..., examples=[0.5], description="The Y Coordinate in the coordinate system" + ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@broker.subscriber("input_data") +async def on_input_data( + msg: Point, logger: Logger, key: bytes = Context("message.raw_message.key") +) -> None: + """ + Processes a message from the 'input_data' topic. + Increment msg x and y attributes with 1 and publish that message to the output_data topic. + The same partition key should be used in the input_data and output_data topic. + + Instructions: + 1. Consume a message from 'input_data' topic. + 2. Create a new message object (do not directly modify the original). + 3. Increment msg x and y attributes with 1. + 4. Publish that message to the output_data topic (The same partition key should be used in the input_data and output_data topic). + """ + raise NotImplementedError() diff --git a/search/examples/example_consume_publish_with_key/description.txt b/search/examples/example_consume_publish_with_key/description.txt new file mode 100644 index 0000000..29fd9aa --- /dev/null +++ b/search/examples/example_consume_publish_with_key/description.txt @@ -0,0 +1,10 @@ +Develop a FastStream application using localhost kafka broker. +The app should consume messages from the input_data topic. +The input message is a JSON encoded object including two attributes: + - x: float + - y: float + - time: datetime + +input_data topic should use partition key. +While consuming the message, increment x and y attributes by 1 and publish that message to the output_data topic. +The same partition key should be used in the input_data and output_data topic. diff --git a/search/examples/example_consume_publish_with_key/test_app.py b/search/examples/example_consume_publish_with_key/test_app.py new file mode 100644 index 0000000..6e50e72 --- /dev/null +++ b/search/examples/example_consume_publish_with_key/test_app.py @@ -0,0 +1,30 @@ +from datetime import datetime + +import pytest +from freezegun import freeze_time + +from faststream import Context +from faststream._compat import model_to_jsonable +from faststream.kafka import TestKafkaBroker + +from .app import Point, broker, on_input_data + + +@broker.subscriber("output_data") +async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.key")): + pass + + +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") +@pytest.mark.asyncio +async def test_point_was_incremented(): + async with TestKafkaBroker(broker): + time = datetime.now() + await broker.publish(Point(x=1.0, y=2.0, time=time), "input_data", key=b"key") + + point_json = model_to_jsonable(Point(x=1.0, y=2.0, time=time)) + on_input_data.mock.assert_called_with(point_json) + + point_json = model_to_jsonable(Point(x=2.0, y=3.0, time=time)) + on_output_data.mock.assert_called_with(point_json) diff --git a/search/examples/example_course_updates/__init__.py b/search/examples/example_course_updates/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_course_updates/app.py b/search/examples/example_course_updates/app.py new file mode 100644 index 0000000..8516bca --- /dev/null +++ b/search/examples/example_course_updates/app.py @@ -0,0 +1,38 @@ +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class CourseUpdates(BaseModel): + course_name: str = Field(..., examples=["Biology"], description="Course example") + new_content: Optional[str] = Field( + default=None, examples=["New content"], description="Content example" + ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("notify_updates") +@broker.subscriber("course_updates") +async def on_course_update(msg: CourseUpdates, logger: Logger) -> CourseUpdates: + logger.info(msg) + + if msg.new_content: + logger.info(f"Course has new content {msg.new_content=}") + msg = CourseUpdates( + course_name=("Updated: " + msg.course_name), + new_content=msg.new_content, + timestamp=datetime.now(), + ) + return msg diff --git a/search/examples/example_course_updates/app_skeleton.py b/search/examples/example_course_updates/app_skeleton.py new file mode 100644 index 0000000..30fbe55 --- /dev/null +++ b/search/examples/example_course_updates/app_skeleton.py @@ -0,0 +1,40 @@ +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class CourseUpdates(BaseModel): + course_name: str = Field(..., examples=["Biology"], description="Course example") + new_content: Optional[str] = Field( + default=None, examples=["New content"], description="Content example" + ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("notify_updates") +@broker.subscriber("course_updates") +async def on_course_update(msg: CourseUpdates, logger: Logger) -> CourseUpdates: + """ + Processes a message from the 'course_updates' topic, If new_content attribute is set, then constructs a new message appending 'Updated: ' before the course_name attribute. + Finally, publishes the message to the 'notify_updates' topic. + + Instructions: + 1. Consume a message from 'course_updates' topic. + 2. Create a new message object (do not directly modify the original). + 3. Processes a message from the 'course_updates' topic, If new_content attribute is set, then constructs a new message appending 'Updated: ' before the course_name attribute. + 4. Publish the modified message to 'notify_updates' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_course_updates/description.txt b/search/examples/example_course_updates/description.txt new file mode 100644 index 0000000..76a216b --- /dev/null +++ b/search/examples/example_course_updates/description.txt @@ -0,0 +1,4 @@ +Develop a FastStream application using localhost broker. +It should consume messages from 'course_updates' topic where the message is a JSON encoded object including three attributes: course_name, new_content and timestamp. +If new_content attribute is set, then construct a new message appending 'Updated: ' before the course_name attribute. +Finally, publish this message to the 'notify_updates' topic. diff --git a/search/examples/example_course_updates/test_app.py b/search/examples/example_course_updates/test_app.py new file mode 100644 index 0000000..20a7936 --- /dev/null +++ b/search/examples/example_course_updates/test_app.py @@ -0,0 +1,64 @@ +from datetime import datetime + +import pytest +from freezegun import freeze_time + +from faststream._compat import model_to_jsonable +from faststream.kafka import TestKafkaBroker + +from .app import CourseUpdates, broker, on_course_update + + +@broker.subscriber("notify_updates") +async def on_notify_update(msg: CourseUpdates): + pass + + +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") +@pytest.mark.asyncio +async def test_app_without_new_content(): + async with TestKafkaBroker(broker): + timestamp = datetime.now() + await broker.publish( + CourseUpdates(course_name="Biology", timestamp=timestamp), "course_updates" + ) + + course_json = model_to_jsonable( + CourseUpdates(course_name="Biology", timestamp=timestamp) + ) + on_course_update.mock.assert_called_with(course_json) + on_notify_update.mock.assert_called_with(course_json) + + +# Feeze time so the datetime always uses the same time +@freeze_time("2023-01-01") +@pytest.mark.asyncio +async def test_app_with_new_content(): + async with TestKafkaBroker(broker): + timestamp = datetime.now() + await broker.publish( + CourseUpdates( + course_name="Biology", + new_content="We have additional classes...", + timestamp=timestamp, + ), + "course_updates", + ) + course_json = model_to_jsonable( + CourseUpdates( + course_name="Biology", + new_content="We have additional classes...", + timestamp=timestamp, + ) + ) + on_course_update.mock.assert_called_with(course_json) + + on_update_json = model_to_jsonable( + CourseUpdates( + course_name="Updated: Biology", + new_content="We have additional classes...", + timestamp=timestamp, + ) + ) + on_notify_update.mock.assert_called_with(on_update_json) diff --git a/search/examples/example_execute_trade/__init__.py b/search/examples/example_execute_trade/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_execute_trade/app.py b/search/examples/example_execute_trade/app.py new file mode 100644 index 0000000..ce6fb6e --- /dev/null +++ b/search/examples/example_execute_trade/app.py @@ -0,0 +1,33 @@ +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Trade(BaseModel): + trader_id: NonNegativeInt = Field(..., examples=[1], description="Int data example") + stock_symbol: str = Field(..., examples=["WS"], description="Stock example") + action: str = Field(..., examples=["Sell!!!"], description="Action example") + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_order_executed = broker.publisher("order_executed") + + +@broker.subscriber("execute_trade") +async def on_execute_trade(msg: Trade, logger: Logger) -> None: + logger.info(msg) + + if "Sell" in msg.action: + # price = retrieve_the_current_price(msg) + # Currently using random price + price = 5 + await to_order_executed.publish( + Trade( + trader_id=msg.trader_id, + stock_symbol=msg.stock_symbol, + action=(msg.action + f" Price = {price}"), + ) + ) diff --git a/search/examples/example_execute_trade/app_skeleton.py b/search/examples/example_execute_trade/app_skeleton.py new file mode 100644 index 0000000..774492f --- /dev/null +++ b/search/examples/example_execute_trade/app_skeleton.py @@ -0,0 +1,32 @@ +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Trade(BaseModel): + trader_id: NonNegativeInt = Field(..., examples=[1], description="Int data example") + stock_symbol: str = Field(..., examples=["WS"], description="Stock example") + action: str = Field(..., examples=["Sell!!!"], description="Action example") + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_order_executed = broker.publisher("order_executed") + + +@broker.subscriber("execute_trade") +async def on_execute_trade(msg: Trade, logger: Logger) -> None: + """ + Processes a message from the 'execute_trade' topic. + Upon reception, the function should verify if the action attribute contains 'Sell'. If yes, retrieve the current price and append this detail to the message and publish the updated message to the 'order_executed' topic. + + Instructions: + 1. Consume a message from 'execute_trade' topic. + 2. Create a new message object (do not directly modify the original). + 3. Check if the action attribute contains 'Sell'. + 4. If 3. is True, retrieve the current price and append this detail to the message and publish the updated message to the 'order_executed' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_execute_trade/description.txt b/search/examples/example_execute_trade/description.txt new file mode 100644 index 0000000..1f0b588 --- /dev/null +++ b/search/examples/example_execute_trade/description.txt @@ -0,0 +1,3 @@ +Develop a FastStream application with localhost broker for development. +The application should consume from the 'execute_trade' topic with messages including attributes: trader_id, stock_symbol, and action. +Upon reception, the function should verify if the action attribute contains 'Sell'. If yes, retrieve the current price and append this detail to the message and publish the updated message to the 'order_executed' topic. diff --git a/search/examples/example_execute_trade/test_app.py b/search/examples/example_execute_trade/test_app.py new file mode 100644 index 0000000..0bb5db2 --- /dev/null +++ b/search/examples/example_execute_trade/test_app.py @@ -0,0 +1,36 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Trade, broker, on_execute_trade + + +@broker.subscriber("order_executed") +async def on_order_executed(msg: Trade) -> None: + pass + + +@pytest.mark.asyncio +async def test_app_without_sell_action(): + async with TestKafkaBroker(broker): + await broker.publish( + Trade(trader_id=1, stock_symbol="WS", action="Nothing"), "execute_trade" + ) + on_execute_trade.mock.assert_called_with( + dict(Trade(trader_id=1, stock_symbol="WS", action="Nothing")) + ) + on_order_executed.mock.assert_not_called() + + +@pytest.mark.asyncio +async def test_app_with_sell_action(): + async with TestKafkaBroker(broker): + await broker.publish( + Trade(trader_id=1, stock_symbol="WS", action="Sell!"), "execute_trade" + ) + on_execute_trade.mock.assert_called_with( + dict(Trade(trader_id=1, stock_symbol="WS", action="Sell!")) + ) + on_order_executed.mock.assert_called_with( + dict(Trade(trader_id=1, stock_symbol="WS", action="Sell! Price = 5")) + ) diff --git a/search/examples/example_forward_to_another_topic/__init__.py b/search/examples/example_forward_to_another_topic/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_forward_to_another_topic/app.py b/search/examples/example_forward_to_another_topic/app.py new file mode 100644 index 0000000..7d9213d --- /dev/null +++ b/search/examples/example_forward_to_another_topic/app.py @@ -0,0 +1,24 @@ +from typing import Optional + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Document(BaseModel): + name: str = Field(..., examples=["doc_name.txt"], description="Name example") + content: Optional[str] = Field( + default=None, examples=["New content"], description="Content example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("document_backup") +@broker.subscriber("document") +async def on_document(msg: Document, logger: Logger) -> Document: + logger.info(msg) + return msg diff --git a/search/examples/example_forward_to_another_topic/app_skeleton.py b/search/examples/example_forward_to_another_topic/app_skeleton.py new file mode 100644 index 0000000..e7577f2 --- /dev/null +++ b/search/examples/example_forward_to_another_topic/app_skeleton.py @@ -0,0 +1,31 @@ +from typing import Optional + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Document(BaseModel): + name: str = Field(..., examples=["doc_name.txt"], description="Name example") + content: Optional[str] = Field( + default=None, examples=["New content"], description="Content example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("document_backup") +@broker.subscriber("document") +async def on_document(msg: Document, logger: Logger) -> Document: + """ + Processes a message from the 'document' topic and publishes the same message to the 'document_backup' topic. + + Instructions: + 1. Consume a message from 'document' topic. + 2. Publish the same message to 'document_backup' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_forward_to_another_topic/description.txt b/search/examples/example_forward_to_another_topic/description.txt new file mode 100644 index 0000000..4cd2643 --- /dev/null +++ b/search/examples/example_forward_to_another_topic/description.txt @@ -0,0 +1,2 @@ +Simple FastStream application which only forwards all messages from the 'document' topic to the 'document_backup' topic. +Each Document has two attributes: name and content. diff --git a/search/examples/example_forward_to_another_topic/test_app.py b/search/examples/example_forward_to_another_topic/test_app.py new file mode 100644 index 0000000..5b44b32 --- /dev/null +++ b/search/examples/example_forward_to_another_topic/test_app.py @@ -0,0 +1,24 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Document, broker, on_document + + +@broker.subscriber("document_backup") +async def on_document_backup(msg: Document): + pass + + +@pytest.mark.asyncio +async def test_app(): + async with TestKafkaBroker(broker): + await broker.publish( + Document(name="doc.txt", content="Introduction to FastStream"), "document" + ) + on_document.mock.assert_called_with( + dict(Document(name="doc.txt", content="Introduction to FastStream")) + ) + on_document_backup.mock.assert_called_with( + dict(Document(name="doc.txt", content="Introduction to FastStream")) + ) diff --git a/search/examples/example_forward_to_multiple_topics/__init__.py b/search/examples/example_forward_to_multiple_topics/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_forward_to_multiple_topics/app.py b/search/examples/example_forward_to_multiple_topics/app.py new file mode 100644 index 0000000..b1a4605 --- /dev/null +++ b/search/examples/example_forward_to_multiple_topics/app.py @@ -0,0 +1,14 @@ +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("output_1") +@broker.publisher("output_2") +@broker.publisher("output_3") +@broker.subscriber("input") +async def on_input(msg: str, logger: Logger) -> str: + logger.info(msg) + return msg diff --git a/search/examples/example_forward_to_multiple_topics/app_skeleton.py b/search/examples/example_forward_to_multiple_topics/app_skeleton.py new file mode 100644 index 0000000..56d19fb --- /dev/null +++ b/search/examples/example_forward_to_multiple_topics/app_skeleton.py @@ -0,0 +1,20 @@ +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("output_1") +@broker.publisher("output_2") +@broker.publisher("output_3") +@broker.subscriber("input") +async def on_input(msg: str, logger: Logger) -> str: + """ + Processes a message from the 'input' topic and publishes the same message to the 'output_1', 'output_2' and 'output_3' topic. + + Instructions: + 1. Consume a message from 'input' topic. + 2. Publish the same message to 'output_1', 'output_2' and 'output_3' topic. + """ + raise NotImplementedError() diff --git a/search/examples/example_forward_to_multiple_topics/description.txt b/search/examples/example_forward_to_multiple_topics/description.txt new file mode 100644 index 0000000..ea6f81c --- /dev/null +++ b/search/examples/example_forward_to_multiple_topics/description.txt @@ -0,0 +1,2 @@ +Simple FastStream application which only forwards all messages from the 'input' topic to the 'output_1', 'output_2' and 'output_3' topic. +Each message is a string diff --git a/search/examples/example_forward_to_multiple_topics/test_app.py b/search/examples/example_forward_to_multiple_topics/test_app.py new file mode 100644 index 0000000..c6d1676 --- /dev/null +++ b/search/examples/example_forward_to_multiple_topics/test_app.py @@ -0,0 +1,29 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import broker + + +@broker.subscriber("output_1") +async def on_output_1(msg: str): + pass + + +@broker.subscriber("output_2") +async def on_output_2(msg: str): + pass + + +@broker.subscriber("output_3") +async def on_output_3(msg: str): + pass + + +@pytest.mark.asyncio +async def test_app(): + async with TestKafkaBroker(broker): + await broker.publish("input string", "input") + on_output_1.mock.assert_called_once_with("input string") + on_output_2.mock.assert_called_once_with("input string") + on_output_3.mock.assert_called_once_with("input string") diff --git a/search/examples/example_forward_with_security/__init__.py b/search/examples/example_forward_with_security/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_forward_with_security/app.py b/search/examples/example_forward_with_security/app.py new file mode 100644 index 0000000..a495381 --- /dev/null +++ b/search/examples/example_forward_with_security/app.py @@ -0,0 +1,28 @@ +import ssl +from typing import Optional + +from pydantic import BaseModel, Field + +from faststream import Logger +from faststream.broker.security import BaseSecurity +from faststream.kafka import KafkaBroker + + +class Document(BaseModel): + name: str = Field(..., examples=["doc_name.txt"], description="Name example") + content: Optional[str] = Field( + default=None, examples=["New content"], description="Content example" + ) + + +ssl_context = ssl.create_default_context() +security = BaseSecurity(ssl_context=ssl_context) + +broker = KafkaBroker("localhost:9092", security=security) + + +@broker.publisher("document_backup") +@broker.subscriber("document") +async def on_document(msg: Document, logger: Logger) -> Document: + logger.info(msg) + return msg diff --git a/search/examples/example_forward_with_security/app_skeleton.py b/search/examples/example_forward_with_security/app_skeleton.py new file mode 100644 index 0000000..33e26d9 --- /dev/null +++ b/search/examples/example_forward_with_security/app_skeleton.py @@ -0,0 +1,36 @@ +import ssl +from typing import Optional + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.broker.security import BaseSecurity +from faststream.kafka import KafkaBroker + + +class Document(BaseModel): + name: str = Field(..., examples=["doc_name.txt"], description="Name example") + content: Optional[str] = Field( + default=None, examples=["New content"], description="Content example" + ) + + +ssl_context = ssl.create_default_context() +security = BaseSecurity(ssl_context=ssl_context) + +broker = KafkaBroker("localhost:9092", security=security) +app = FastStream(broker) + + +@broker.publisher("document_backup") +@broker.subscriber("document") +async def on_document(msg: Document, logger: Logger) -> Document: + """ + Processes a message from the 'document' topic and publishes the same message to the 'document_backup' topic. + + Instructions: + 1. Consume a message from 'document' topic. + 2. Publish the same message to 'document_backup' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_forward_with_security/description.txt b/search/examples/example_forward_with_security/description.txt new file mode 100644 index 0000000..b058d36 --- /dev/null +++ b/search/examples/example_forward_with_security/description.txt @@ -0,0 +1,3 @@ +Simple FastStream application which only forwards all messages from the 'document' topic to the 'document_backup' topic. +Each Document has two attributes: name and content. +The communication with the broker is encrypted with ssl. diff --git a/search/examples/example_forward_with_security/test_app.py b/search/examples/example_forward_with_security/test_app.py new file mode 100644 index 0000000..5b44b32 --- /dev/null +++ b/search/examples/example_forward_with_security/test_app.py @@ -0,0 +1,24 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Document, broker, on_document + + +@broker.subscriber("document_backup") +async def on_document_backup(msg: Document): + pass + + +@pytest.mark.asyncio +async def test_app(): + async with TestKafkaBroker(broker): + await broker.publish( + Document(name="doc.txt", content="Introduction to FastStream"), "document" + ) + on_document.mock.assert_called_with( + dict(Document(name="doc.txt", content="Introduction to FastStream")) + ) + on_document_backup.mock.assert_called_with( + dict(Document(name="doc.txt", content="Introduction to FastStream")) + ) diff --git a/search/examples/example_infinity_publishing/__init__.py b/search/examples/example_infinity_publishing/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_infinity_publishing/app.py b/search/examples/example_infinity_publishing/app.py new file mode 100644 index 0000000..c97de17 --- /dev/null +++ b/search/examples/example_infinity_publishing/app.py @@ -0,0 +1,45 @@ +import asyncio +from datetime import datetime + +from faststream import ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +publisher = broker.publisher("current_time") + + +@app.on_startup +async def app_setup(context: ContextRepo): + context.set_global("app_is_running", True) + + +@app.on_shutdown +async def app_shutdown(context: ContextRepo): + context.set_global("app_is_running", False) + + # Get the running task and await for it to finish + publish_task = context.get("publish_task") + await publish_task + + +async def publish_time_task( + logger: Logger, context: ContextRepo, time_interval: int = 5 +): + # Always use context: ContextRepo for storing app_is_running variable + while context.get("app_is_running"): + current_time = datetime.now() + await publisher.publish(current_time.isoformat()) + logger.info(f"Current time published: {current_time}") + await asyncio.sleep(time_interval) + + +@app.after_startup +async def publish_time(logger: Logger, context: ContextRepo): + logger.info("Starting publishing:") + + publish_task = asyncio.create_task(publish_time_task(logger, context)) + + # you need to save asyncio task so you can wait for it to finish at app shutdown (the function with @app.on_shutdown function) + context.set_global("publish_task", publish_task) diff --git a/search/examples/example_infinity_publishing/app_skeleton.py b/search/examples/example_infinity_publishing/app_skeleton.py new file mode 100644 index 0000000..0b811af --- /dev/null +++ b/search/examples/example_infinity_publishing/app_skeleton.py @@ -0,0 +1,47 @@ +from faststream import ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +publisher = broker.publisher("current_time") + + +@app.on_startup +async def app_setup(context: ContextRepo): + """ + Set all necessary global variables inside ContextRepo object: + Set app_is_running to True - we will use this variable as running loop condition + """ + raise NotImplementedError() + + +@app.on_shutdown +async def app_shutdown(context: ContextRepo): + """ + Set all necessary global variables inside ContextRepo object: + Set app_is_running to False + + Get executed task from context and wait for it to finish + """ + raise NotImplementedError() + + +async def publish_time_task( + logger: Logger, context: ContextRepo, time_interval: int = 5 +): + """ + While app_is_running variable inside context is True, repeat the following process: + publish the current time to the 'current_time' topic. + asynchronous sleep for time_interval + """ + raise NotImplementedError() + + +@app.after_startup +async def publish_time(logger: Logger, context: ContextRepo): + """ + Create asynchronous task for executing publish_time_task function. + Save asyncio task so you can wait for it to finish at app shutdown (the function with @app.on_shutdown function) + """ + raise NotImplementedError() diff --git a/search/examples/example_infinity_publishing/description.txt b/search/examples/example_infinity_publishing/description.txt new file mode 100644 index 0000000..5a35053 --- /dev/null +++ b/search/examples/example_infinity_publishing/description.txt @@ -0,0 +1,2 @@ +Develop a simple FastStream application which publishes the current time to the 'current_time' topic. +App should publish messages every five seconds until the app shuts down. diff --git a/search/examples/example_infinity_publishing/test_app.py b/search/examples/example_infinity_publishing/test_app.py new file mode 100644 index 0000000..135187d --- /dev/null +++ b/search/examples/example_infinity_publishing/test_app.py @@ -0,0 +1,21 @@ +from datetime import datetime + +import pytest + +from faststream import TestApp +from faststream.kafka import TestKafkaBroker + +from .app import app, broker + + +@broker.subscriber("current_time") +async def on_current_time(msg: datetime): + pass + + +@pytest.mark.asyncio +async def test_message_was_published(): + async with TestKafkaBroker(broker): + async with TestApp(app): + await on_current_time.wait_call(3) + on_current_time.mock.assert_called() diff --git a/search/examples/example_investment_updates/__init__.py b/search/examples/example_investment_updates/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_investment_updates/app.py b/search/examples/example_investment_updates/app.py new file mode 100644 index 0000000..1619530 --- /dev/null +++ b/search/examples/example_investment_updates/app.py @@ -0,0 +1,32 @@ +from pydantic import BaseModel, Field, NonNegativeFloat, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Investment(BaseModel): + investor_id: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + investment_amount: NonNegativeFloat = Field( + ..., examples=[100.5], description="Float data example" + ) + portfolio_value: NonNegativeFloat = Field( + ..., examples=[1000.5], description="Float data example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_risk_management = broker.publisher("risk_management") + + +@broker.subscriber("investment_updates") +async def on_investment_updates(msg: Investment, logger: Logger) -> None: + logger.info(msg) + + default_trashold = 1000 + + if msg.investment_amount > default_trashold: + await to_risk_management.publish(msg) diff --git a/search/examples/example_investment_updates/app_skeleton.py b/search/examples/example_investment_updates/app_skeleton.py new file mode 100644 index 0000000..b4f9090 --- /dev/null +++ b/search/examples/example_investment_updates/app_skeleton.py @@ -0,0 +1,38 @@ +from pydantic import BaseModel, Field, NonNegativeFloat, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Investment(BaseModel): + investor_id: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + investment_amount: NonNegativeFloat = Field( + ..., examples=[100.5], description="Float data example" + ) + portfolio_value: NonNegativeFloat = Field( + ..., examples=[1000.5], description="Float data example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_risk_management = broker.publisher("risk_management") + + +@broker.subscriber("investment_updates") +async def on_investment_updates(msg: Investment, logger: Logger) -> None: + """ + Processes a message from the 'investment_updates' topic. + Upon reception, the function should verify if the investment_amount exceeds a predetermined threshold (default treshold is 1000). + If yes, forward the message to the 'risk_management' topic. + + Instructions: + 1. Consume a message from 'investment_updates' topic. + 2. Check if the investment_amount exceeds a predetermined threshold (default treshold is 1000). + 3. If 2. is True, forward the message to the 'risk_management' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_investment_updates/description.txt b/search/examples/example_investment_updates/description.txt new file mode 100644 index 0000000..d841174 --- /dev/null +++ b/search/examples/example_investment_updates/description.txt @@ -0,0 +1,3 @@ +Develop a FastStream application which consumes messages from the 'investment_updates' topic. +The consumed message has following attributes: investor_id, investment_amount, and portfolio_value. +If investment_amount exceeds a predetermined threshold (default treshold is 1000), forward the message to the 'risk_management' topic for further investigation. diff --git a/search/examples/example_investment_updates/test_app.py b/search/examples/example_investment_updates/test_app.py new file mode 100644 index 0000000..e18a5a9 --- /dev/null +++ b/search/examples/example_investment_updates/test_app.py @@ -0,0 +1,42 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Investment, broker, on_investment_updates + + +@broker.subscriber("risk_management") +async def on_risk_management(msg: Investment) -> None: + pass + + +@pytest.mark.asyncio +async def test_invest_smaller_amount_than_threshold(): + async with TestKafkaBroker(broker): + await broker.publish( + Investment(investor_id=1, investment_amount=100, portfolio_value=1000), + "investment_updates", + ) + on_investment_updates.mock.assert_called_with( + dict(Investment(investor_id=1, investment_amount=100, portfolio_value=1000)) + ) + on_risk_management.mock.assert_not_called() + + +@pytest.mark.asyncio +async def test_invest_grater_amount_than_threshold(): + async with TestKafkaBroker(broker): + await broker.publish( + Investment(investor_id=1, investment_amount=1500, portfolio_value=1000), + "investment_updates", + ) + on_investment_updates.mock.assert_called_with( + dict( + Investment(investor_id=1, investment_amount=1500, portfolio_value=1000) + ) + ) + on_risk_management.mock.assert_called_with( + dict( + Investment(investor_id=1, investment_amount=1500, portfolio_value=1000) + ) + ) diff --git a/search/examples/example_log_msgs_with_plaintext_security/__init__.py b/search/examples/example_log_msgs_with_plaintext_security/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_log_msgs_with_plaintext_security/app.py b/search/examples/example_log_msgs_with_plaintext_security/app.py new file mode 100644 index 0000000..f619e0b --- /dev/null +++ b/search/examples/example_log_msgs_with_plaintext_security/app.py @@ -0,0 +1,27 @@ +import ssl +from typing import Optional + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.broker.security import SASLPlaintext +from faststream.kafka import KafkaBroker + + +class Document(BaseModel): + name: str = Field(..., examples=["doc_name.txt"], description="Name example") + content: Optional[str] = Field( + default=None, examples=["New content"], description="Content example" + ) + + +ssl_context = ssl.create_default_context() +security = SASLPlaintext(ssl_context=ssl_context, username="admin", password="admin") + +broker = KafkaBroker("localhost:9092", security=security) +app = FastStream(broker) + + +@broker.subscriber("document") +async def on_document(msg: Document, logger: Logger): + logger.info(msg) diff --git a/search/examples/example_log_msgs_with_plaintext_security/app_skeleton.py b/search/examples/example_log_msgs_with_plaintext_security/app_skeleton.py new file mode 100644 index 0000000..e7b0b10 --- /dev/null +++ b/search/examples/example_log_msgs_with_plaintext_security/app_skeleton.py @@ -0,0 +1,34 @@ +import ssl +from typing import Optional + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.broker.security import SASLPlaintext +from faststream.kafka import KafkaBroker + + +class Document(BaseModel): + name: str = Field(..., examples=["doc_name.txt"], description="Name example") + content: Optional[str] = Field( + default=None, examples=["New content"], description="Content example" + ) + + +ssl_context = ssl.create_default_context() +security = SASLPlaintext(ssl_context=ssl_context, username="admin", password="admin") + +broker = KafkaBroker("localhost:9092", security=security) +app = FastStream(broker) + + +@broker.subscriber("document") +async def on_document(msg: Document, logger: Logger): + """ + Processes a message from the 'document' topic and publishes the same message to the 'document_backup' topic. + + Instructions: + 1. Consume a message from 'document' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_log_msgs_with_plaintext_security/description.txt b/search/examples/example_log_msgs_with_plaintext_security/description.txt new file mode 100644 index 0000000..f04ace5 --- /dev/null +++ b/search/examples/example_log_msgs_with_plaintext_security/description.txt @@ -0,0 +1,3 @@ +Simple FastStream application which consumes messages from 'document' topic and prints them to log. +Each Document has two attributes: name and content. +The communication with the broker is encrypted with ssl, and uses SASL Plaintext authorization with username "admin" and password "password". diff --git a/search/examples/example_log_msgs_with_plaintext_security/test_app.py b/search/examples/example_log_msgs_with_plaintext_security/test_app.py new file mode 100644 index 0000000..7bfda9b --- /dev/null +++ b/search/examples/example_log_msgs_with_plaintext_security/test_app.py @@ -0,0 +1,16 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Document, broker, on_document + + +@pytest.mark.asyncio +async def test_app(): + async with TestKafkaBroker(broker): + await broker.publish( + Document(name="doc.txt", content="Introduction to FastStream"), "document" + ) + on_document.mock.assert_called_with( + dict(Document(name="doc.txt", content="Introduction to FastStream")) + ) diff --git a/search/examples/example_new_employee/__init__.py b/search/examples/example_new_employee/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_new_employee/app.py b/search/examples/example_new_employee/app.py new file mode 100644 index 0000000..64da9e3 --- /dev/null +++ b/search/examples/example_new_employee/app.py @@ -0,0 +1,31 @@ +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Employee(BaseModel): + name: str = Field(..., examples=["Mickey"], description="name example") + surname: str = Field(..., examples=["Mouse"], description="surname example") + email: str = Field( + ..., examples=["mikey.mouse@mail.ai"], description="email example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_notify_accounting = broker.publisher("notify_accounting") +to_notify_all_employees = broker.publisher("notify_all_employees") + + +@broker.subscriber("new_employee") +async def on_new_employee(msg: Employee, logger: Logger) -> None: + logger.info(msg) + + await to_notify_accounting.publish( + f"Please prepare all the paper work for: {msg.name} {msg.surname}" + ) + await to_notify_all_employees.publish( + f"Please welcome our new colleague: {msg.name} {msg.surname}" + ) diff --git a/search/examples/example_new_employee/app_skeleton.py b/search/examples/example_new_employee/app_skeleton.py new file mode 100644 index 0000000..1145a32 --- /dev/null +++ b/search/examples/example_new_employee/app_skeleton.py @@ -0,0 +1,35 @@ +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Employee(BaseModel): + name: str = Field(..., examples=["Mickey"], description="name example") + surname: str = Field(..., examples=["Mouse"], description="surname example") + email: str = Field( + ..., examples=["mikey.mouse@mail.ai"], description="email example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_notify_accounting = broker.publisher("notify_accounting") +to_notify_all_employees = broker.publisher("notify_all_employees") + + +@broker.subscriber("new_employee") +async def on_new_employee(msg: Employee, logger: Logger) -> None: + """ + Processes a message from the 'new_employee' topic. + Upon reception, the function should publish messages to two topics: + 1. Send message to the 'notify_accounting' with content 'Please prepare all the paper work for:' and add at the end of the message employee name and surname + 2. Send message to the 'notify_all_employees' with content 'Please welcome our new colleague:' and add at the end of the message employee name and surname + + Instructions: + 1. Consume a message from 'new_employee' topic. + 2. Send message to the 'notify_accounting' with content 'Please prepare all the paper work for:' and add at the end of the message employee name and surname + 3. Send message to the 'notify_all_employees' with content 'Please welcome our new colleague:' and add at the end of the message employee name and surname + """ + raise NotImplementedError() diff --git a/search/examples/example_new_employee/description.txt b/search/examples/example_new_employee/description.txt new file mode 100644 index 0000000..57acd31 --- /dev/null +++ b/search/examples/example_new_employee/description.txt @@ -0,0 +1,5 @@ +FastStream app with one subscribes and two produces functions. +App should subscribe to the 'new_employee' topic, The new_employee topic receives Employee object with three attributes: name, surname and email. +For each employee received on this topic, produce two messages: +1. Send message to the 'notify_accounting' with content 'Please prepare all the paper work for:' and add at the end of the message employee name and surname +2. Send message to the 'notify_all_employees' with content 'Please welcome our new colleague:' and add at the end of the message employee name and surname diff --git a/search/examples/example_new_employee/test_app.py b/search/examples/example_new_employee/test_app.py new file mode 100644 index 0000000..01e9247 --- /dev/null +++ b/search/examples/example_new_employee/test_app.py @@ -0,0 +1,33 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Employee, broker, on_new_employee + + +@broker.subscriber("notify_accounting") +async def on_notify_accounting(msg: str) -> None: + pass + + +@broker.subscriber("notify_all_employees") +async def on_notify_all_employees(msg: str) -> None: + pass + + +@pytest.mark.asyncio +async def test_new_employee(): + async with TestKafkaBroker(broker): + await broker.publish( + Employee(name="Liam", surname="Neeson", email="linee@mail"), "new_employee" + ) + on_new_employee.mock.assert_called_once_with( + dict(Employee(name="Liam", surname="Neeson", email="linee@mail")) + ) + + on_notify_accounting.mock.assert_called_once_with( + "Please prepare all the paper work for: Liam Neeson" + ) + on_notify_all_employees.mock.assert_called_once_with( + "Please welcome our new colleague: Liam Neeson" + ) diff --git a/search/examples/example_pets/__init__.py b/search/examples/example_pets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_pets/app.py b/search/examples/example_pets/app.py new file mode 100644 index 0000000..bba54b8 --- /dev/null +++ b/search/examples/example_pets/app.py @@ -0,0 +1,22 @@ +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Pet(BaseModel): + pet_id: NonNegativeInt = Field(..., examples=[1], description="Int data example") + species: str = Field(..., examples=["dog"], description="Pet example") + age: NonNegativeInt = Field(..., examples=[1], description="Int data example") + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("notify_adopters") +@broker.subscriber("new_pet") +async def on_new_pet(msg: Pet, logger: Logger) -> Pet: + logger.info(msg) + + return msg diff --git a/search/examples/example_pets/app_skeleton.py b/search/examples/example_pets/app_skeleton.py new file mode 100644 index 0000000..d89d788 --- /dev/null +++ b/search/examples/example_pets/app_skeleton.py @@ -0,0 +1,28 @@ +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Pet(BaseModel): + pet_id: NonNegativeInt = Field(..., examples=[1], description="Int data example") + species: str = Field(..., examples=["dog"], description="Pet example") + age: NonNegativeInt = Field(..., examples=[1], description="Int data example") + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("notify_adopters") +@broker.subscriber("new_pet") +async def on_new_pet(msg: Pet, logger: Logger) -> Pet: + """ + Processes a message from the 'new_pet' topic and send the new pet's information to the 'notify_adopters' topic. + + Instructions: + 1. Consume a message from 'new_pet' topic. + 2. Send the new pet's information to the 'notify_adopters' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_pets/description.txt b/search/examples/example_pets/description.txt new file mode 100644 index 0000000..52ee9de --- /dev/null +++ b/search/examples/example_pets/description.txt @@ -0,0 +1,3 @@ +Create a FastStream application with the localhost broker. +Consume from the 'new_pet' topic, which includes JSON encoded object with attributes: pet_id, species, and age. +Whenever a new pet is added, send the new pet's information to the 'notify_adopters' topic. diff --git a/search/examples/example_pets/test_app.py b/search/examples/example_pets/test_app.py new file mode 100644 index 0000000..9ca7121 --- /dev/null +++ b/search/examples/example_pets/test_app.py @@ -0,0 +1,20 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Pet, broker, on_new_pet + + +@broker.subscriber("notify_adopters") +async def on_notify_adopters(msg: Pet) -> None: + pass + + +@pytest.mark.asyncio +async def test_app(): + async with TestKafkaBroker(broker): + await broker.publish(Pet(pet_id=2, species="Dog", age=2), "new_pet") + on_new_pet.mock.assert_called_with(dict(Pet(pet_id=2, species="Dog", age=2))) + on_notify_adopters.mock.assert_called_with( + dict(Pet(pet_id=2, species="Dog", age=2)) + ) diff --git a/search/examples/example_plants/__init__.py b/search/examples/example_plants/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_plants/app.py b/search/examples/example_plants/app.py new file mode 100644 index 0000000..ee68274 --- /dev/null +++ b/search/examples/example_plants/app.py @@ -0,0 +1,27 @@ +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Plant(BaseModel): + plant_id: NonNegativeInt = Field(..., examples=[1], description="Int data example") + species: str = Field(..., examples=["Apple"], description="Species example") + ready_to_sell: bool + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_sell_plant = broker.publisher("sell_plant") +to_still_growing = broker.publisher("still_growing") + + +@broker.subscriber("plant_growth") +async def on_plant_growth(msg: Plant, logger: Logger) -> None: + logger.info(msg) + + if msg.ready_to_sell: + await to_sell_plant.publish(msg.plant_id) + else: + await to_still_growing.publish(msg.plant_id) diff --git a/search/examples/example_plants/app_skeleton.py b/search/examples/example_plants/app_skeleton.py new file mode 100644 index 0000000..48d89bf --- /dev/null +++ b/search/examples/example_plants/app_skeleton.py @@ -0,0 +1,33 @@ +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Plant(BaseModel): + plant_id: NonNegativeInt = Field(..., examples=[1], description="Int data example") + species: str = Field(..., examples=["Apple"], description="Species example") + ready_to_sell: bool + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_sell_plant = broker.publisher("sell_plant") +to_still_growing = broker.publisher("still_growing") + + +@broker.subscriber("plant_growth") +async def on_plant_growth(msg: Plant, logger: Logger) -> None: + """ + Processes a message from the 'plant_growth' topic. + Upon reception, the function should verify if the ready_to_sell attribute is True. + If yes, publish plant_id to the 'sell_plant' topic, otherwise, publish plant_id to the 'still_growing' topic. + + Instructions: + 1. Consume a message from 'plant_growth' topic. + 2. Check if the ready_to_sell attribute is True. + 3. If ready_to_sell is True, publish plant_id to the 'sell_plant' topic, otherwise, publish plant_id to the 'still_growing' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_plants/description.txt b/search/examples/example_plants/description.txt new file mode 100644 index 0000000..949e221 --- /dev/null +++ b/search/examples/example_plants/description.txt @@ -0,0 +1,2 @@ +Create a FastStream application for consuming messages from the 'plant_growth' topic which includes JSON encoded object with attributes: plant_id, species and ready_to_sell. +If ready_to_sell attribute is True publish plant_id to the 'sell_plant' topic, otherwise, publish plant_id to the 'still_growing' topic. diff --git a/search/examples/example_plants/test_app.py b/search/examples/example_plants/test_app.py new file mode 100644 index 0000000..2041640 --- /dev/null +++ b/search/examples/example_plants/test_app.py @@ -0,0 +1,43 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Plant, broker, on_plant_growth + + +@broker.subscriber("sell_plant") +async def on_sell_plant(msg: int) -> None: + pass + + +@broker.subscriber("still_growing") +async def on_still_growing(msg: int) -> None: + pass + + +@pytest.mark.asyncio +async def test_sell_plant_is_called(): + async with TestKafkaBroker(broker): + await broker.publish( + Plant(plant_id=1, species="Orange", ready_to_sell=True), "plant_growth" + ) + on_plant_growth.mock.assert_called_with( + dict(Plant(plant_id=1, species="Orange", ready_to_sell=True)) + ) + + on_sell_plant.mock.assert_called_with(1) + on_still_growing.mock.assert_not_called() + + +@pytest.mark.asyncio +async def test_still_growing_is_calles(): + async with TestKafkaBroker(broker): + await broker.publish( + Plant(plant_id=1, species="Orange", ready_to_sell=False), "plant_growth" + ) + on_plant_growth.mock.assert_called_with( + dict(Plant(plant_id=1, species="Orange", ready_to_sell=False)) + ) + + on_still_growing.mock.assert_called_with(1) + on_sell_plant.mock.assert_not_called() diff --git a/search/examples/example_product_reviews/__init__.py b/search/examples/example_product_reviews/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_product_reviews/app.py b/search/examples/example_product_reviews/app.py new file mode 100644 index 0000000..3d875be --- /dev/null +++ b/search/examples/example_product_reviews/app.py @@ -0,0 +1,37 @@ +from datetime import datetime + +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class ProductReview(BaseModel): + product_id: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + customer_id: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + review_grade: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_customer_service = broker.publisher("customer_service") + + +@broker.subscriber("product_reviews") +async def on_product_reviews(msg: ProductReview, logger: Logger) -> None: + logger.info(msg) + + if msg.review_grade < 5: + await to_customer_service.publish(msg) diff --git a/search/examples/example_product_reviews/app_skeleton.py b/search/examples/example_product_reviews/app_skeleton.py new file mode 100644 index 0000000..3a47d8c --- /dev/null +++ b/search/examples/example_product_reviews/app_skeleton.py @@ -0,0 +1,45 @@ +from datetime import datetime + +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class ProductReview(BaseModel): + product_id: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + customer_id: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + review_grade: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + timestamp: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_customer_service = broker.publisher("customer_service") + + +@broker.subscriber("product_reviews") +async def on_product_reviews(msg: ProductReview, logger: Logger) -> None: + """ + Consumes a message from the 'product_reviews' topic. + Upon reception, the function should verify if the review_grade attribute is smaller then 5. If yes, publish alert message to the 'customer_service' topic. + + Instructions: + 1. Consume a message from 'product_reviews' topic. + 2. Create a new message object (do not directly modify the original). + 3. Check if the review_grade attribute is smaller then 5. + 4. If 3. is True, publish alert message to the 'customer_service' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_product_reviews/description.txt b/search/examples/example_product_reviews/description.txt new file mode 100644 index 0000000..5887909 --- /dev/null +++ b/search/examples/example_product_reviews/description.txt @@ -0,0 +1,3 @@ +Create a FastStream application using the localhost broker. +The application should consume from the 'product_reviews' topic which includes JSON encoded objects with attributes: product_id, customer_id, review_grade and timestamp. +If the review_grade attribute is smaller then 5, send an alert message to the 'customer_service' topic. diff --git a/search/examples/example_product_reviews/test_app.py b/search/examples/example_product_reviews/test_app.py new file mode 100644 index 0000000..49ff262 --- /dev/null +++ b/search/examples/example_product_reviews/test_app.py @@ -0,0 +1,54 @@ +from datetime import datetime + +import pytest + +from faststream._compat import model_to_jsonable +from faststream.kafka import TestKafkaBroker + +from .app import ProductReview, broker, on_product_reviews + + +@broker.subscriber("customer_service") +async def on_customer_service(msg: ProductReview) -> None: + pass + + +@pytest.mark.asyncio +async def test_app_where_review_grade_is_grater_then_5(): + async with TestKafkaBroker(broker): + timestamp = datetime.now() + await broker.publish( + ProductReview( + product_id=1, customer_id=1, review_grade=6, timestamp=timestamp + ), + "product_reviews", + ) + + on_product_review_json = model_to_jsonable( + ProductReview( + product_id=1, customer_id=1, review_grade=6, timestamp=timestamp + ) + ) + on_product_reviews.mock.assert_called_with(on_product_review_json) + on_customer_service.mock.assert_not_called() + + +@pytest.mark.asyncio +async def test_app_where_review_grade_is_less_then_5(): + async with TestKafkaBroker(broker): + timestamp = datetime.now() + await broker.publish( + ProductReview( + product_id=1, customer_id=2, review_grade=2, timestamp=timestamp + ), + "product_reviews", + ) + + product_review_json = model_to_jsonable( + ProductReview( + product_id=1, customer_id=2, review_grade=2, timestamp=timestamp + ) + ) + + on_product_reviews.mock.assert_called_with(product_review_json) + on_customer_service.mock.assert_called_with(product_review_json) diff --git a/search/examples/example_publish_with_key/__init__.py b/search/examples/example_publish_with_key/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_publish_with_key/app.py b/search/examples/example_publish_with_key/app.py new file mode 100644 index 0000000..04e675e --- /dev/null +++ b/search/examples/example_publish_with_key/app.py @@ -0,0 +1,28 @@ +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Point(BaseModel): + x: float = Field( + ..., examples=[0.5], description="The X Coordinate in the coordinate system" + ) + y: float = Field( + ..., examples=[0.5], description="The Y Coordinate in the coordinate system" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@broker.subscriber("input_data") +async def on_input_data(msg: Point, logger: Logger) -> None: + logger.info(f"{msg=}") + incremented_point = Point(x=msg.x + 1, y=msg.y + 1) + key = str(msg.x).encode("utf-8") + await to_output_data.publish(incremented_point, key=key) diff --git a/search/examples/example_publish_with_key/app_skeleton.py b/search/examples/example_publish_with_key/app_skeleton.py new file mode 100644 index 0000000..5096685 --- /dev/null +++ b/search/examples/example_publish_with_key/app_skeleton.py @@ -0,0 +1,37 @@ +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Point(BaseModel): + x: float = Field( + ..., examples=[0.5], description="The X Coordinate in the coordinate system" + ) + y: float = Field( + ..., examples=[0.5], description="The Y Coordinate in the coordinate system" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +to_output_data = broker.publisher("output_data") + + +@broker.subscriber("input_data") +async def on_input_data(msg: Point, logger: Logger) -> None: + """ + Processes a message from the 'input_data' topic. + Increment msg x and y attributes with 1 and publish that message to the output_data topic. + Publish that message to the output_data topic + Use messages attribute x as a partition key when publishing to output_data topic. + + Instructions: + 1. Consume a message from 'input_data' topic. + 2. Create a new message object (do not directly modify the original). + 3. Increment msg x and y attributes with 1. + 4. Publish that message to the output_data topic (Use messages attribute x as a partition key). + """ + raise NotImplementedError() diff --git a/search/examples/example_publish_with_key/description.txt b/search/examples/example_publish_with_key/description.txt new file mode 100644 index 0000000..74a0f45 --- /dev/null +++ b/search/examples/example_publish_with_key/description.txt @@ -0,0 +1,8 @@ +Develop a FastStream application using localhost kafka broker. +The app should consume messages from the input_data topic. +The input message is a JSON encoded object including two attributes: + - x: float + - y: float + +While consuming the message, increment x and y attributes by 1 and publish that message to the output_data topic. +Use messages attribute x as a partition key when publishing to output_data topic. diff --git a/search/examples/example_publish_with_key/test_app.py b/search/examples/example_publish_with_key/test_app.py new file mode 100644 index 0000000..653758e --- /dev/null +++ b/search/examples/example_publish_with_key/test_app.py @@ -0,0 +1,19 @@ +import pytest + +from faststream import Context +from faststream.kafka import TestKafkaBroker + +from .app import Point, broker, on_input_data + + +@broker.subscriber("output_data") +async def on_output_data(msg: Point, key: bytes = Context("message.raw_message.key")): + pass + + +@pytest.mark.asyncio +async def test_point_was_incremented(): + async with TestKafkaBroker(broker): + await broker.publish(Point(x=1.0, y=2.0), "input_data") + on_input_data.mock.assert_called_with(dict(Point(x=1.0, y=2.0))) + on_output_data.mock.assert_called_with(dict(Point(x=2.0, y=3.0))) diff --git a/search/examples/example_scram256_security/__init__.py b/search/examples/example_scram256_security/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_scram256_security/app.py b/search/examples/example_scram256_security/app.py new file mode 100644 index 0000000..18e751e --- /dev/null +++ b/search/examples/example_scram256_security/app.py @@ -0,0 +1,37 @@ +import os +import ssl +from datetime import date + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.broker.security import SASLScram256 +from faststream.kafka import KafkaBroker + + +class Student(BaseModel): + name: str = Field(..., examples=["Student Studentis"], description="Name example") + birthdate: date = Field( + ..., + examples=["2023-09-05"], + description="Students birthdate", + ) + + +ssl_context = ssl.create_default_context() +security = SASLScram256( + ssl_context=ssl_context, + username=os.environ["USERNAME"], + password=os.environ["PASSWORD"], +) + +broker = KafkaBroker("localhost:9092", security=security) +app = FastStream(broker) + +to_class = broker.publisher("class") + + +@broker.subscriber("student_application") +async def on_application(msg: Student, logger: Logger) -> None: + key = msg.name.encode("utf-8") + await to_class.publish(msg, key=key) diff --git a/search/examples/example_scram256_security/app_skeleton.py b/search/examples/example_scram256_security/app_skeleton.py new file mode 100644 index 0000000..9860c93 --- /dev/null +++ b/search/examples/example_scram256_security/app_skeleton.py @@ -0,0 +1,43 @@ +import os +import ssl +from datetime import date + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.broker.security import SASLScram256 +from faststream.kafka import KafkaBroker + + +class Student(BaseModel): + name: str = Field(..., examples=["Student Studentis"], description="Name example") + birthdate: date = Field( + ..., + examples=["2023-09-05"], + description="Students birthdate", + ) + + +ssl_context = ssl.create_default_context() +security = SASLScram256( + ssl_context=ssl_context, + username=os.environ["USERNAME"], + password=os.environ["PASSWORD"], +) + +broker = KafkaBroker("localhost:9092", security=security) +app = FastStream(broker) + + +@broker.publisher("class") +@broker.subscriber("student_application") +async def on_document(msg: Student, logger: Logger) -> Student: + """ + Processes a message from the 'student_application' topic and publises the same message to the 'class' topic using name as key. + + Instructions: + 1. Consume a message from 'student_application' topic. + 2. Publish the same message to 'class' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_scram256_security/description.txt b/search/examples/example_scram256_security/description.txt new file mode 100644 index 0000000..3533865 --- /dev/null +++ b/search/examples/example_scram256_security/description.txt @@ -0,0 +1,5 @@ +FastStream application that handles the incoming students from "student_application" topic. +The Student is then passed to the "class" topic using student_name as key. +Student has a name and birthdate. +The communication with the broker is encrypted with ssl and uses SASL Scram256 for authorization. +Username and pasword are loaded from environment variables. diff --git a/search/examples/example_scram256_security/test_app.py b/search/examples/example_scram256_security/test_app.py new file mode 100644 index 0000000..2cf2218 --- /dev/null +++ b/search/examples/example_scram256_security/test_app.py @@ -0,0 +1,40 @@ +import os +from datetime import date +from unittest import mock + +import pytest + +from faststream import Context +from faststream._compat import model_to_jsonable +from faststream.kafka import TestKafkaBroker + +with mock.patch.dict( + os.environ, + {"USERNAME": "username", "PASSWORD": "password"}, # pragma: allowlist secret +): + from .app import Student, broker, on_application, to_class + + +@broker.subscriber("class") +async def on_class( + msg: Student, key: bytes = Context("message.raw_message.key") +) -> None: + pass + + +@pytest.mark.asyncio +async def test_app(): + async with TestKafkaBroker(broker): + birthdate = date(2020, 9, 5) + await broker.publish( + Student(name="Student Studentis", birthdate=birthdate), + "student_application", + ) + + student_json = model_to_jsonable( + Student(name="Student Studentis", birthdate=birthdate) + ) + + on_application.mock.assert_called_with(student_json) + to_class.mock.assert_called_with(student_json) + on_class.mock.assert_called_with(student_json) diff --git a/search/examples/example_scram512_security/__init__.py b/search/examples/example_scram512_security/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_scram512_security/app.py b/search/examples/example_scram512_security/app.py new file mode 100644 index 0000000..4a1c2fb --- /dev/null +++ b/search/examples/example_scram512_security/app.py @@ -0,0 +1,37 @@ +import os +import ssl + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.broker.security import SASLScram512 +from faststream.kafka import KafkaBroker + + +class Student(BaseModel): + name: str = Field(..., examples=["Student Studentis"], description="Name example") + age: int = Field( + ..., + examples=[ + 20, + ], + description="Student age", + ) + + +ssl_context = ssl.create_default_context() +security = SASLScram512( + ssl_context=ssl_context, + username=os.environ["USERNAME"], + password=os.environ["PASSWORD"], +) + +broker = KafkaBroker("localhost:9092", security=security) +app = FastStream(broker) + +to_class = broker.publisher("class") + + +@broker.subscriber("student_application") +async def on_application(msg: Student, logger: Logger) -> None: + await to_class.publish(msg) diff --git a/search/examples/example_scram512_security/app_skeleton.py b/search/examples/example_scram512_security/app_skeleton.py new file mode 100644 index 0000000..3939dec --- /dev/null +++ b/search/examples/example_scram512_security/app_skeleton.py @@ -0,0 +1,44 @@ +import os +import ssl + +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.broker.security import SASLScram512 +from faststream.kafka import KafkaBroker + + +class Student(BaseModel): + name: str = Field(..., examples=["Student Studentis"], description="Name example") + age: int = Field( + ..., + examples=[ + 20, + ], + description="Student age", + ) + + +ssl_context = ssl.create_default_context() +security = SASLScram512( + ssl_context=ssl_context, + username=os.environ["USERNAME"], + password=os.environ["PASSWORD"], +) + +broker = KafkaBroker("localhost:9092", security=security) +app = FastStream(broker) + + +@broker.publisher("class") +@broker.subscriber("student_application") +async def on_document(msg: Student, logger: Logger) -> Student: + """ + Processes a message from the 'student_application' topic and publises the same message to the 'class' topic using name as key. + + Instructions: + 1. Consume a message from 'student_application' topic. + 2. Publish the same message to 'class' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_scram512_security/description.txt b/search/examples/example_scram512_security/description.txt new file mode 100644 index 0000000..b7b00fa --- /dev/null +++ b/search/examples/example_scram512_security/description.txt @@ -0,0 +1,4 @@ +FastStream application that handles the incoming students from "student_application" topic. +The Student is then passed to the "class" topic using student_name as key. +Student has a name and age. +The communication with the broker is encrypted with ssl and uses SASL Scram512 for authorization, username and pasword are hardcoded. diff --git a/search/examples/example_scram512_security/test_app.py b/search/examples/example_scram512_security/test_app.py new file mode 100644 index 0000000..40fd2d3 --- /dev/null +++ b/search/examples/example_scram512_security/test_app.py @@ -0,0 +1,34 @@ +import os +from unittest import mock + +import pytest + +from faststream.kafka import TestKafkaBroker + +with mock.patch.dict( + os.environ, + {"USERNAME": "username", "PASSWORD": "password"}, # pragma: allowlist secret +): + from .app import Student, broker, on_application, to_class + + +@broker.subscriber("class") +async def on_class(msg: Student) -> None: + pass + + +@pytest.mark.asyncio +async def test_app(): + async with TestKafkaBroker(broker): + await broker.publish( + Student(name="Student Studentis", age=12), "student_application" + ) + on_application.mock.assert_called_with( + dict(Student(name="Student Studentis", age=12)) + ) + to_class.mock.assert_called_with( + dict(Student(name="Student Studentis", age=12)) + ) + on_class.mock.assert_called_with( + dict(Student(name="Student Studentis", age=12)) + ) diff --git a/search/examples/example_scrape_weather/__init__.py b/search/examples/example_scrape_weather/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_scrape_weather/app.py b/search/examples/example_scrape_weather/app.py new file mode 100644 index 0000000..4a2a2e7 --- /dev/null +++ b/search/examples/example_scrape_weather/app.py @@ -0,0 +1,103 @@ +import asyncio +import json +from datetime import datetime + +import httpx +from pydantic import BaseModel, Field, NonNegativeFloat + +from faststream import ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +publisher = broker.publisher("weather") + + +class Weather(BaseModel): + latitude: NonNegativeFloat = Field( + ..., + examples=[22.5], + description="Latitude measures the distance north or south of the equator.", + ) + longitude: NonNegativeFloat = Field( + ..., + examples=[55], + description="Longitude measures distance east or west of the prime meridian.", + ) + temperature: float = Field( + ..., examples=[20], description="Temperature in Celsius degrees" + ) + windspeed: NonNegativeFloat = Field( + ..., examples=[20], description="Wind speed in kilometers per hour" + ) + time: str = Field( + ..., examples=["2023-09-13T07:00"], description="The time of the day" + ) + + +@app.on_startup +async def app_setup(context: ContextRepo): + context.set_global("app_is_running", True) + + +@app.on_shutdown +async def shutdown(context: ContextRepo): + context.set_global("app_is_running", False) + + # Get all the running tasks and wait them to finish + publish_tasks = context.get("publish_tasks") + await asyncio.gather(*publish_tasks) + + +async def _fetch_and_publish_weather( + latitude: float, + longitude: float, + logger: Logger, + context: ContextRepo, + time_interval: int = 5, +) -> None: + # Always use context: ContextRepo for storing app_is_running variable + while context.get("app_is_running"): + uri = f"https://api.open-meteo.com/v1/forecast?current_weather=true&latitude={latitude}&longitude={longitude}" + async with httpx.AsyncClient() as client: + response = await client.get(uri) + + if response.status_code == 200: + # read json response + raw_data = json.loads(response.content) + temperature = raw_data["current_weather"]["temperature"] + windspeed = raw_data["current_weather"]["windspeed"] + time = raw_data["current_weather"]["time"] + + new_data = Weather( + latitude=latitude, + longitude=longitude, + temperature=temperature, + windspeed=windspeed, + time=time, + ) + key = str(latitude) + "_" + str(longitude) + await publisher.publish(new_data, key=key.encode("utf-8")) + else: + logger.warning(f"Failed API request {uri} at time {datetime.now()}") + + await asyncio.sleep(time_interval) + + +@app.after_startup +async def publish_weather(logger: Logger, context: ContextRepo): + logger.info("Starting publishing:") + + latitudes = [13, 50, 44, 24] + longitudes = [17, 13, 45, 70] + # start scraping and producing to kafka topic + publish_tasks = [ + asyncio.create_task( + _fetch_and_publish_weather(latitude, longitude, logger, context) + ) + for latitude, longitude in zip(latitudes, longitudes) + ] + # you need to save asyncio tasks so you can wait them to finish at app shutdown (the function with @app.on_shutdown function) + context.set_global("publish_tasks", publish_tasks) diff --git a/search/examples/example_scrape_weather/app_skeleton.py b/search/examples/example_scrape_weather/app_skeleton.py new file mode 100644 index 0000000..e052fdc --- /dev/null +++ b/search/examples/example_scrape_weather/app_skeleton.py @@ -0,0 +1,89 @@ +from pydantic import BaseModel, Field, NonNegativeFloat + +from faststream import ContextRepo, FastStream, Logger +from faststream.kafka import KafkaBroker + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +class Weather(BaseModel): + latitude: NonNegativeFloat = Field( + ..., + examples=[22.5], + description="Latitude measures the distance north or south of the equator.", + ) + longitude: NonNegativeFloat = Field( + ..., + examples=[55], + description="Longitude measures distance east or west of the prime meridian.", + ) + temperature: float = Field( + ..., examples=[20], description="Temperature in Celsius degrees" + ) + windspeed: NonNegativeFloat = Field( + ..., examples=[20], description="Wind speed in kilometers per hour" + ) + time: str = Field( + ..., examples=["2023-09-13T07:00"], description="The time of the day" + ) + + +@app.on_startup +async def app_setup(context: ContextRepo): + """ + Set all necessary global variables inside ContextRepo object: + Set app_is_running to True - we will use this variable as running loop condition + """ + raise NotImplementedError() + + +@app.on_shutdown +async def shutdown(context: ContextRepo): + """ + Set all necessary global variables inside ContextRepo object: + Set app_is_running to False + + Get all executed tasks from context and wait them to finish + """ + raise NotImplementedError() + + +async def fetch_and_publish_weather( + latitude: float, + longitude: float, + logger: Logger, + context: ContextRepo, + time_inverval: int = 5, +) -> None: + """ + While app_is_running variable inside context is True, repeat the following process: + get the weather information by sending a GET request to "https://api.open-meteo.com/v1/forecast?current_weather=true" + At the end of url you should add additional 'latitude' and 'longitude' parameters which are type float. + Here is url example when you want to fetch information for latitude=52.3 and longitude=13.2: + "https://api.open-meteo.com/v1/forecast?current_weather=true&latitude=52.3&longitude=13.2" + + from the response we want to get info about the temperature (float), windspeed (float) and time (string) and you can find them in: + response["current_weather"]["temperature"], response["current_weather"]["windspeed"], and response["current_weather"]["time"] + + We need to fetch this data, construct the Weather object and publish it at 'weather' topic. + For each message you are publishing we must use a key which will be constructed as: + string value of latitude + '_' + string value of longitude + + asynchronous sleep for time_interval + """ + raise NotImplementedError() + + +@app.after_startup +async def publish_weather(logger: Logger, context: ContextRepo): + """ + Create asynchronous tasks for executing fetch_and_publish_weather function. + Run this process for the following latitude and longitude combinations: + - latitude=13 and longitude=17 + - latitude=50 and longitude=13 + - latitude=44 and longitude=45 + - latitude=24 and longitude=70 + Put all executed tasks to list and set it as global variable in context (It is needed so we can wait for this tasks at app shutdown) + """ + raise NotImplementedError() diff --git a/search/examples/example_scrape_weather/description.txt b/search/examples/example_scrape_weather/description.txt new file mode 100644 index 0000000..593c2e8 --- /dev/null +++ b/search/examples/example_scrape_weather/description.txt @@ -0,0 +1,26 @@ +Develop a FastStream application which will fetch weather information from the web until the app shuts down. + +You can get the weather information by sending a GET request to "https://api.open-meteo.com/v1/forecast?current_weather=true" +At the end of url you should add additional 'latitude' and 'longitude' parameters which are type float. +Here is url example when you want to fetch information for latitude=52.3 and longitude=13.2: + "https://api.open-meteo.com/v1/forecast?current_weather=true&latitude=52.3&longitude=13.2" + +from the response we want to get info about the temperature (float), windspeed (float) and time (string) and you can find them in: + response["current_weather"]["temperature"], response["current_weather"]["windspeed"], and response["current_weather"]["time"] + +We need to fetch this data every 5 seconds and publish it at 'weather' topic. +For each message you are publishing we must use a key which will be constructed as: + string value of latitude + '_' + string value of longitude + +Message that we will publish needs to have following parameters: + - latitude (type float) + - longitude (type float) + - temperature (type float) + - windspeed (type float) + - time (type string) + +We need this process for the following latitude and longitude combinations: + - latitude=13 and longitude=17 + - latitude=50 and longitude=13 + - latitude=44 and longitude=45 + - latitude=24 and longitude=70 diff --git a/search/examples/example_scrape_weather/test_app.py b/search/examples/example_scrape_weather/test_app.py new file mode 100644 index 0000000..41dbfdf --- /dev/null +++ b/search/examples/example_scrape_weather/test_app.py @@ -0,0 +1,19 @@ +import pytest + +from faststream import Context, TestApp +from faststream.kafka import TestKafkaBroker + +from .app import Weather, app, broker + + +@broker.subscriber("weather") +async def on_weather(msg: Weather, key: bytes = Context("message.raw_message.key")): + pass + + +@pytest.mark.asyncio +async def test_message_was_published(): + async with TestKafkaBroker(broker): + async with TestApp(app): + await on_weather.wait_call(3) + on_weather.mock.assert_called() diff --git a/search/examples/example_social_media_post/__init__.py b/search/examples/example_social_media_post/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_social_media_post/app.py b/search/examples/example_social_media_post/app.py new file mode 100644 index 0000000..9b187e1 --- /dev/null +++ b/search/examples/example_social_media_post/app.py @@ -0,0 +1,34 @@ +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Post(BaseModel): + user_id: NonNegativeInt = Field(..., examples=[1], description="Int data example") + text: str = Field(..., examples=["Just another day"], description="text example") + number_of_likes: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("just_text") +@broker.subscriber("popular_post") +async def on_popular_post(msg: Post, logger: Logger) -> str: + logger.info(msg) + return msg.text + + +to_popular_post = broker.publisher("popular_post") + + +@broker.subscriber("new_post") +async def on_new_post(msg: Post, logger: Logger) -> None: + logger.info(msg) + + if msg.number_of_likes > 10: + await to_popular_post.publish(msg) diff --git a/search/examples/example_social_media_post/app_skeleton.py b/search/examples/example_social_media_post/app_skeleton.py new file mode 100644 index 0000000..a090821 --- /dev/null +++ b/search/examples/example_social_media_post/app_skeleton.py @@ -0,0 +1,49 @@ +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class Post(BaseModel): + user_id: NonNegativeInt = Field(..., examples=[1], description="Int data example") + text: str = Field(..., examples=["Just another day"], description="text example") + number_of_likes: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + + +@broker.publisher("just_text") +@broker.subscriber("popular_post") +async def on_popular_post(msg: Post, logger: Logger) -> str: + """ + Processes a message from the 'popular_post' topic. + Publishes the text attribute of the post to the 'just_text' topic + + Instructions: + 1. Consume a message from 'popular_post' topic. + 2. Publishes the text attribute of the post to the 'just_text' topic + + """ + raise NotImplementedError() + + +to_popular_post = broker.publisher("popular_post") + + +@broker.subscriber("new_post") +async def on_new_post(msg: Post, logger: Logger) -> None: + """ + Processes a message from the 'new_post' topic. + Upon reception, the function should check if the number_of_likes attribute is grater then 10. If yes, retrieve the current message to the 'popular_post' topic. + + Instructions: + 1. Consume a message from 'new_post' topic. + 2. Check if the number_of_likes attribute is grater then 10. + 3. If 2. is True, retrieve the current message to the 'popular_post' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_social_media_post/description.txt b/search/examples/example_social_media_post/description.txt new file mode 100644 index 0000000..504b9d2 --- /dev/null +++ b/search/examples/example_social_media_post/description.txt @@ -0,0 +1,4 @@ +FastStream application for social media. +Create function which reads from the 'new_post' topic. Messages which come to this topic have 3 attributes: user_id, text and number_of_likes. +If received Post has more then 10 likes, publish it to the 'popular_post' topic. +While consuming from the popular_post topic, publish the text attribute of the post to the 'just_text' topic diff --git a/search/examples/example_social_media_post/test_app.py b/search/examples/example_social_media_post/test_app.py new file mode 100644 index 0000000..134d13b --- /dev/null +++ b/search/examples/example_social_media_post/test_app.py @@ -0,0 +1,38 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import Post, broker, on_new_post, on_popular_post + + +@broker.subscriber("just_text") +async def on_just_text(msg: str) -> None: + pass + + +@pytest.mark.asyncio +async def test_post_with_just_two_likes(): + async with TestKafkaBroker(broker): + await broker.publish( + Post(user_id=1, text="bad post", number_of_likes=2), "new_post" + ) + on_new_post.mock.assert_called_with( + dict(Post(user_id=1, text="bad post", number_of_likes=2)) + ) + on_popular_post.mock.assert_not_called() + on_just_text.mock.assert_not_called() + + +@pytest.mark.asyncio +async def test_post_with_many_likes(): + async with TestKafkaBroker(broker): + await broker.publish( + Post(user_id=1, text="cool post", number_of_likes=100), "new_post" + ) + on_new_post.mock.assert_called_with( + dict(Post(user_id=1, text="cool post", number_of_likes=100)) + ) + on_popular_post.mock.assert_called_with( + dict(Post(user_id=1, text="cool post", number_of_likes=100)) + ) + on_just_text.mock.assert_called_with("cool post") diff --git a/search/examples/example_student_query/__init__.py b/search/examples/example_student_query/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_student_query/app.py b/search/examples/example_student_query/app.py new file mode 100644 index 0000000..4c2f17d --- /dev/null +++ b/search/examples/example_student_query/app.py @@ -0,0 +1,46 @@ +from datetime import datetime + +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class StudentQuery(BaseModel): + student_id: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + department: str = Field( + ..., examples=["axademic_department"], description="Department example" + ) + query: str = Field( + ..., examples=["Please help me with..."], description="Query example" + ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_finance_department = broker.publisher("finance_department") +to_academic_department = broker.publisher("academic_department") +to_admissions_department = broker.publisher("admissions_department") +to_unclassified_query = broker.publisher("unclassified_query") + + +@broker.subscriber("student_query") +async def on_student_query(msg: StudentQuery, logger: Logger) -> None: + logger.info(msg) + + if msg.department == "finance_department": + await to_finance_department.publish(msg) + elif msg.department == "academic_department": + await to_academic_department.publish(msg) + elif msg.department == "admissions_department": + await to_admissions_department.publish(msg) + else: + await to_unclassified_query.publish(msg) diff --git a/search/examples/example_student_query/app_skeleton.py b/search/examples/example_student_query/app_skeleton.py new file mode 100644 index 0000000..117f2a1 --- /dev/null +++ b/search/examples/example_student_query/app_skeleton.py @@ -0,0 +1,49 @@ +from datetime import datetime + +from pydantic import BaseModel, Field, NonNegativeInt + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class StudentQuery(BaseModel): + student_id: NonNegativeInt = Field( + ..., examples=[1], description="Int data example" + ) + department: str = Field( + ..., examples=["axademic_department"], description="Department example" + ) + query: str = Field( + ..., examples=["Please help me with..."], description="Query example" + ) + time: datetime = Field( + ..., + examples=["2020-04-23 10:20:30.400000"], + description="The timestamp of the record", + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_finance_department = broker.publisher("finance_department") +to_academic_department = broker.publisher("academic_department") +to_admissions_department = broker.publisher("admissions_department") +to_unclassified_query = broker.publisher("unclassified_query") + + +@broker.subscriber("student_query") +async def on_student_query(msg: StudentQuery, logger: Logger) -> None: + """ + Processes a message from the 'student_query'. + Each query should then be forwarded to the corresponding department based on the department attribute. + The relevant department topics could be 'finance_department', 'academic_department', or 'admissions_department'. + If department is not one of these topics, forward the message to the 'unclassified_query' topic. + + Instructions: + 1. Consume a message from 'student_query' topic. + 2. Check the department attribute - The relevant department topics could be 'finance_department', 'academic_department', or 'admissions_department'. + 3. If departman is one of the relevant departments, forward the message to that topic. Otherwise forward the message to the 'unclassified_query' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_student_query/description.txt b/search/examples/example_student_query/description.txt new file mode 100644 index 0000000..bc9c90d --- /dev/null +++ b/search/examples/example_student_query/description.txt @@ -0,0 +1,4 @@ +Create a FastStream application using localhost as a broker. Consume from 'student_query' topic, which includes attributes: student_id, department and query, time. +Each query should then be forwarded to the corresponding department based on the department attribute. +The relevant department topics could be 'finance_department', 'academic_department', or 'admissions_department'. +If department is not one of these topics, forward the message to the 'unclassified_query' topic. diff --git a/search/examples/example_student_query/test_app.py b/search/examples/example_student_query/test_app.py new file mode 100644 index 0000000..a4c13a8 --- /dev/null +++ b/search/examples/example_student_query/test_app.py @@ -0,0 +1,58 @@ +from datetime import datetime + +import pytest + +from faststream._compat import model_to_jsonable +from faststream.kafka import TestKafkaBroker + +from .app import StudentQuery, broker, on_student_query + + +@broker.subscriber("finance_department") +async def on_finance_department(msg: StudentQuery) -> None: + pass + + +@broker.subscriber("academic_department") +async def on_academic_department(msg: StudentQuery) -> None: + pass + + +@broker.subscriber("admissions_department") +async def on_admissions_department(msg: StudentQuery) -> None: + pass + + +@broker.subscriber("unclassified_query") +async def on_unclassified_query(msg: StudentQuery) -> None: + pass + + +@pytest.mark.asyncio +async def test_message_published_to_correct_topic(): + async with TestKafkaBroker(broker): + time = datetime.now() + await broker.publish( + StudentQuery( + student_id=1, + department="admissions_department", + query="Help me with...", + time=time, + ), + "student_query", + ) + student_query_json = model_to_jsonable( + StudentQuery( + student_id=1, + department="admissions_department", + query="Help me with...", + time=time, + ) + ) + + on_student_query.mock.assert_called_with(student_query_json) + on_admissions_department.mock.assert_called_with(student_query_json) + + on_finance_department.mock.assert_not_called() + on_academic_department.mock.assert_not_called() + on_unclassified_query.mock.assert_not_called() diff --git a/search/examples/example_weather_updates/__init__.py b/search/examples/example_weather_updates/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/search/examples/example_weather_updates/app.py b/search/examples/example_weather_updates/app.py new file mode 100644 index 0000000..780fbed --- /dev/null +++ b/search/examples/example_weather_updates/app.py @@ -0,0 +1,30 @@ +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class WeatherConditions(BaseModel): + city: str = Field(..., examples=["Zagreb"], description="City example") + temperature: float + conditions: str = Field( + ..., examples=["Mostly Cloudy"], description="Conditions example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_weather_alerts = broker.publisher("weather_alerts") + + +@broker.subscriber("weather_updates") +async def on_weather_updates(msg: WeatherConditions, logger: Logger) -> None: + logger.info(msg) + + if msg.temperature > 40 or msg.temperature < -10: + alert_city = "Alert: " + msg.city + alert_msg = WeatherConditions( + city=alert_city, temperature=msg.temperature, conditions=msg.conditions + ) + await to_weather_alerts.publish(alert_msg) diff --git a/search/examples/example_weather_updates/app_skeleton.py b/search/examples/example_weather_updates/app_skeleton.py new file mode 100644 index 0000000..2c9e3c2 --- /dev/null +++ b/search/examples/example_weather_updates/app_skeleton.py @@ -0,0 +1,35 @@ +from pydantic import BaseModel, Field + +from faststream import FastStream, Logger +from faststream.kafka import KafkaBroker + + +class WeatherConditions(BaseModel): + city: str = Field(..., examples=["Zagreb"], description="City example") + temperature: float + conditions: str = Field( + ..., examples=["Mostly Cloudy"], description="Conditions example" + ) + + +broker = KafkaBroker("localhost:9092") +app = FastStream(broker) + +to_weather_alerts = broker.publisher("weather_alerts") + + +@broker.subscriber("weather_updates") +async def on_weather_updates(msg: WeatherConditions, logger: Logger) -> None: + """ + Processes a message from the 'weather_updates' topic. + Upon reception, the function should verify if the temperature attribute is above 40 or below -10. + If yes, append the string 'Alert: ' to the city attribute and publish this message to 'weather_alerts' topic. + + Instructions: + 1. Consume a message from 'weather_updates' topic. + 2. Create a new message object (do not directly modify the original). + 3. Check if the temperature attribute is above 40 or below -10. + 4. If 3. is True, append the string 'Alert: ' to the city attribute and publish this message to 'weather_alerts' topic. + + """ + raise NotImplementedError() diff --git a/search/examples/example_weather_updates/description.txt b/search/examples/example_weather_updates/description.txt new file mode 100644 index 0000000..f826433 --- /dev/null +++ b/search/examples/example_weather_updates/description.txt @@ -0,0 +1,3 @@ +FastStream application for consuming messages from 'weather_updates' topic where the message includes attributes: city, temperature, and conditions. +For every consumed message, append the string 'Alert: ' to the city attribute if the temperature attribute is above 40 or below -10. +Publish this message to 'weather_alerts' topic. diff --git a/search/examples/example_weather_updates/test_app.py b/search/examples/example_weather_updates/test_app.py new file mode 100644 index 0000000..aed2012 --- /dev/null +++ b/search/examples/example_weather_updates/test_app.py @@ -0,0 +1,44 @@ +import pytest + +from faststream.kafka import TestKafkaBroker + +from .app import WeatherConditions, broker, on_weather_updates + + +@broker.subscriber("weather_alerts") +async def on_weather_alerts(msg: WeatherConditions) -> None: + pass + + +@pytest.mark.asyncio +async def test_not_published_to_weather_alerts(): + async with TestKafkaBroker(broker): + await broker.publish( + WeatherConditions(city="Zagreb", temperature=20.5, conditions="Sunny"), + "weather_updates", + ) + on_weather_updates.mock.assert_called_with( + dict(WeatherConditions(city="Zagreb", temperature=20.5, conditions="Sunny")) + ) + + on_weather_alerts.mock.assert_not_called() + + +@pytest.mark.asyncio +async def test_published_to_weather_alerts(): + async with TestKafkaBroker(broker): + await broker.publish( + WeatherConditions(city="Zagreb", temperature=-15, conditions="Sunny"), + "weather_updates", + ) + on_weather_updates.mock.assert_called_with( + dict(WeatherConditions(city="Zagreb", temperature=-15, conditions="Sunny")) + ) + + on_weather_alerts.mock.assert_called_with( + dict( + WeatherConditions( + city="Alert: Zagreb", temperature=-15, conditions="Sunny" + ) + ) + ) diff --git a/settings.ini b/settings.ini index 6c38cd3..9143a10 100644 --- a/settings.ini +++ b/settings.ini @@ -5,7 +5,7 @@ ### Python library ### repo = faststream-gen lib_name = %(repo)s -version = 0.1.6 +version = 0.1.7rc0 min_python = 3.8 license = apache2 black_formatting = False @@ -58,7 +58,7 @@ dev_requirements = \ pytest==7.4.2 \ nbqa==1.7.0 \ black==23.7.0 \ - mypy==1.4.1 \ + mypy==1.5.1 \ isort==5.12.0 \ pre-commit==3.3.3 \ detect-secrets==1.4.0