Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conflicting Positional Arguments Using @app.page() Functionality #564

Open
ksuden opened this issue Nov 8, 2023 · 1 comment
Open

Conflicting Positional Arguments Using @app.page() Functionality #564

ksuden opened this issue Nov 8, 2023 · 1 comment

Comments

@ksuden
Copy link

ksuden commented Nov 8, 2023

Steps to reproduce

The actual topic, agent, table should not matter. The issue is with the web request. I only included the agent, table, etc in case something about the agent and page decorators conflicted or interacted with each other unexpectedly.

import faust
from faust import web
import pandas as pd
from datetime import timezone
import json

# Faust app setup
app = faust.App('realtime_processor', broker='kafka://localhost:9092')
topic_name = 'first_test_topic'
kafka_topic = app.topic(topic_name, value_type=bytes)
processed_data_table = app.Table('processed_data', default=float, partitions=1)

# A simple model for our items to enforce a schema
class DataItem(faust.Record, validation=True):
    timestamp: str
    value: float

@app.agent(kafka_topic)
async def process(stream):
    async for message in stream:
        data = message
        if 'Items' in data:
            for entry in data['Items']:
                if 'Items' in entry:
                    for item in entry['Items']:
                        # timestamp = pd.to_datetime(item['Timestamp']).replace(tzinfo=timezone.utc).timestamp()
                        timestamp = item['Timestamp']
                        value = item['Value']
                        # Update faust table
                        processed_data_table[timestamp] = value
                        # Print the entire table
                        # print("Current state of the table:")
                        # for key, value in processed_data_table.items():
                        #     print(f"{key}: {value}")
                        # print("-" * 40)  # Just a separator for readability

@app.page('/data/')
async def fetch_first_test_topic(request: web.Request) -> web.Response:
    # Extract query parameters for time range or other filters if needed
    # For example:
    # start_time = request.query['start_time']
    # end_time = request.query['end_time']


    # Convert table data to list of DataItem objects
    # data_list = [DataItem(timestamp=key, value=value).to_representation() 
    #              for key, value in processed_data_table.items()]

    # # Convert list of DataItem objects to JSON
    # json_data = json.dumps(data_list)
    json_data={'hello':'world'}
    
    # Return JSON response
    return web.json(json_data)

Script I used to reach this endpoint:

import pandas as pd
import requests

response = requests.get('http://127.0.0.1:6066/data/')
print(response.text)
data = response.json()  # This should be the JSON array of objects
print(data)

Expected behavior

I was expecting from the print statement of the get request to be

{'hello':'world'}

Actual behavior

I got errors on both the get request side and the faust side, suggesting the that get request communicated with the faust app, but for some reason there is an error with positional arguments, despite none being defined.

Full traceback

On the faust app side

[2023-11-08 16:23:29,783] [2282037] [ERROR] Error handling request 
Traceback (most recent call last):
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/aiohttp/web_protocol.py", line 433, in _handle_request
    resp = await request_handler(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/aiohttp/web_app.py", line 504, in _handle
    resp = await handler(request)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/faust/web/drivers/aiohttp.py", line 246, in _dispatch
    return await handler(request)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/faust/web/views.py", line 102, in __call__
    return await self.dispatch(request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/faust/web/views.py", line 118, in dispatch
    response = await method(cast(Request, request), **kwargs)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: fetch_first_test_topic() takes 1 positional argument but 2 were given

On the GET request side

500 Internal Server Error

Server got itself in trouble
Traceback (most recent call last):
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/requests/models.py", line 971, in json
    return complexjson.loads(self.text, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/json/decoder.py", line 340, in decode
    raise JSONDecodeError("Extra data", s, end)
json.decoder.JSONDecodeError: Extra data: line 1 column 5 (char 4)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "/home/ksuden/source/Tps-PricePrediction/data_works/data_streaming/faust/api_query.py", line 6, in <module>
    data = response.json()  # This should be the JSON array of objects
           ^^^^^^^^^^^^^^^
  File "/home/ksuden/anaconda3/envs/py311/lib/python3.11/site-packages/requests/models.py", line 975, in json
    raise RequestsJSONDecodeError(e.msg, e.doc, e.pos)
requests.exceptions.JSONDecodeError: Extra data: line 1 column 5 (char 4)

Versions

  • Python version: 3.11
  • Faust version: 0.10.16
  • Operating system: Linux
  • Kafka version: 2.0.2
  • RocksDB version (if applicable)
@ksuden ksuden changed the title Does This Fork Not Support Original @app.page() Functionality? Conflicting Positional Arguments Using @app.page() Functionality Nov 8, 2023
@ksuden
Copy link
Author

ksuden commented Nov 9, 2023

If anyone is having the same issue, I have found a work around.

from aiohttp import web as aio_web

@app.page('/data/')
async def fetch_first_test_topic(request, web) -> web.Response:
    data_list = {'Hello':'World'}
    try:
        return aio_web.Response(text=json.dumps(data_list), status=200, content_type='application/json')
    except Exception as e:
        return aio_web.Response(text=json.dumps({'error': str(e)}), status=500, content_type='application/json')

If I had to guess it has something to do with the aiohttp package and related functionality underneath faust having conflicting versions, but the solution above worked for me, allowing access to my streams via GET request.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant