You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The actual topic, agent, table should not matter. The issue is with the web request. I only included the agent, table, etc in case something about the agent and page decorators conflicted or interacted with each other unexpectedly.
import faust
from faust import web
import pandas as pd
from datetime import timezone
import json
# Faust app setup
app = faust.App('realtime_processor', broker='kafka://localhost:9092')
topic_name = 'first_test_topic'
kafka_topic = app.topic(topic_name, value_type=bytes)
processed_data_table = app.Table('processed_data', default=float, partitions=1)
# A simple model for our items to enforce a schema
class DataItem(faust.Record, validation=True):
timestamp: str
value: float
@app.agent(kafka_topic)
async def process(stream):
async for message in stream:
data = message
if 'Items' in data:
for entry in data['Items']:
if 'Items' in entry:
for item in entry['Items']:
# timestamp = pd.to_datetime(item['Timestamp']).replace(tzinfo=timezone.utc).timestamp()
timestamp = item['Timestamp']
value = item['Value']
# Update faust table
processed_data_table[timestamp] = value
# Print the entire table
# print("Current state of the table:")
# for key, value in processed_data_table.items():
# print(f"{key}: {value}")
# print("-" * 40) # Just a separator for readability
@app.page('/data/')
async def fetch_first_test_topic(request: web.Request) -> web.Response:
# Extract query parameters for time range or other filters if needed
# For example:
# start_time = request.query['start_time']
# end_time = request.query['end_time']
# Convert table data to list of DataItem objects
# data_list = [DataItem(timestamp=key, value=value).to_representation()
# for key, value in processed_data_table.items()]
# # Convert list of DataItem objects to JSON
# json_data = json.dumps(data_list)
json_data={'hello':'world'}
# Return JSON response
return web.json(json_data)
Script I used to reach this endpoint:
import pandas as pd
import requests
response = requests.get('http://127.0.0.1:6066/data/')
print(response.text)
data = response.json() # This should be the JSON array of objects
print(data)
Expected behavior
I was expecting from the print statement of the get request to be
{'hello':'world'}
Actual behavior
I got errors on both the get request side and the faust side, suggesting the that get request communicated with the faust app, but for some reason there is an error with positional arguments, despite none being defined.
Full traceback
On the faust app side
[2023-11-08 16:23:29,783] [2282037] [ERROR] Error handling request
Traceback (most recent call last):
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/aiohttp/web_protocol.py", line 433, in _handle_request
resp = await request_handler(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/aiohttp/web_app.py", line 504, in _handle
resp = await handler(request)
^^^^^^^^^^^^^^^^^^^^^^
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/faust/web/drivers/aiohttp.py", line 246, in _dispatch
return await handler(request)
^^^^^^^^^^^^^^^^^^^^^^
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/faust/web/views.py", line 102, in __call__
return await self.dispatch(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/faust/web/views.py", line 118, in dispatch
response = await method(cast(Request, request), **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: fetch_first_test_topic() takes 1 positional argument but 2 were given
On the GET request side
500 Internal Server Error
Server got itself in trouble
Traceback (most recent call last):
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/requests/models.py", line 971, in json
return complexjson.loads(self.text, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/json/__init__.py", line 346, in loads
return _default_decoder.decode(s)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/json/decoder.py", line 340, in decode
raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 1 column 5 (char 4)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/home/ksuden/source/Tps-PricePrediction/data_works/data_streaming/faust/api_query.py", line 6, in <module>
data = response.json() # This should be the JSON array of objects
^^^^^^^^^^^^^^^
File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/requests/models.py", line 975, in json
raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)
requests.exceptions.JSONDecodeError: Extra data: line 1 column 5 (char 4)
Versions
Python version: 3.11
Faust version: 0.10.16
Operating system: Linux
Kafka version: 2.0.2
RocksDB version (if applicable)
The text was updated successfully, but these errors were encountered:
ksuden
changed the title
Does This Fork Not Support Original @app.page() Functionality?
Conflicting Positional Arguments Using @app.page() Functionality
Nov 8, 2023
If anyone is having the same issue, I have found a work around.
from aiohttp import web as aio_web
@app.page('/data/')
async def fetch_first_test_topic(request, web) -> web.Response:
data_list = {'Hello':'World'}
try:
return aio_web.Response(text=json.dumps(data_list), status=200, content_type='application/json')
except Exception as e:
return aio_web.Response(text=json.dumps({'error': str(e)}), status=500, content_type='application/json')
If I had to guess it has something to do with the aiohttp package and related functionality underneath faust having conflicting versions, but the solution above worked for me, allowing access to my streams via GET request.
Steps to reproduce
The actual topic, agent, table should not matter. The issue is with the web request. I only included the agent, table, etc in case something about the agent and page decorators conflicted or interacted with each other unexpectedly.
Script I used to reach this endpoint:
Expected behavior
I was expecting from the print statement of the get request to be
Actual behavior
I got errors on both the get request side and the faust side, suggesting the that get request communicated with the faust app, but for some reason there is an error with positional arguments, despite none being defined.
Full traceback
On the faust app side
On the GET request side
Versions
The text was updated successfully, but these errors were encountered: