Skip to content

Commit

Permalink
client: add method to stream messages asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
james-rms committed Oct 3, 2023
1 parent 5cfba9e commit 52242b2
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 15 deletions.
88 changes: 73 additions & 15 deletions foxglove_data_platform/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,43 @@ def get_messages(
for _, channel, message, decoded_message in reader.iter_decoded_messages()
]

def iter_messages(
self,
*,
device_id: Optional[str] = None,
device_name: Optional[str] = None,
start: datetime.datetime,
end: datetime.datetime,
topics: List[str] = [],
decoder_factories: Optional[List[DecoderFactory]] = None,
):
"""
yields a stream of (schema, channel, message, decoded message) values.
device_id: The id of the device that originated the desired data.
device_name: The name of the device that originated the desired data.
start: The earliest time from which to retrieve data.
end: The latest time from which to retrieve data.
topics: An optional list of topics to retrieve.
All topics will be retrieved if this is omitted.
decoder_factories: an optional list of :py:class:`~mcap.decoder.DecoderFactory` instances
used to decode message content.
"""

stream_link = self._make_stream_link(
device_id=device_id,
device_name=device_name,
start=start,
end=end,
topics=topics,
)
response = requests.get(stream_link, headers=self.__headers, stream=True)
response.raise_for_status()
if decoder_factories is None:
decoder_factories = DEFAULT_DECODER_FACTORIES
reader = make_reader(response.raw, decoder_factories=decoder_factories)
return reader.iter_decoded_messages()

def download_recording_data(
self,
*,
Expand Down Expand Up @@ -350,7 +387,7 @@ def download_recording_data(

return _download_stream_with_progress(json["link"], callback=callback)

def download_data(
def _make_stream_link(
self,
*,
device_id: Optional[str] = None,
Expand All @@ -359,19 +396,7 @@ def download_data(
end: datetime.datetime,
topics: List[str] = [],
output_format: OutputFormat = OutputFormat.mcap0,
callback: Optional[ProgressCallback] = None,
) -> bytes:
"""
Returns raw data bytes for a device and time range.
device_id: The id of the device that originated the desired data.
device_name: The name of the device that originated the desired data.
start: The earliest time from which to retrieve data.
end: The latest time from which to retrieve data.
topics: An optional list of topics to retrieve.
All topics will be retrieved if this is omitted.
output_format: The output format of the data, either .bag or .mcap, defaulting to .mcap.
"""
) -> str:
params = {
"device.id": device_id,
"device.name": device_name,
Expand All @@ -387,8 +412,41 @@ def download_data(
)

json = json_or_raise(link_response)
return json["link"]

return _download_stream_with_progress(json["link"], callback=callback)
def download_data(
self,
*,
device_id: Optional[str] = None,
device_name: Optional[str] = None,
start: datetime.datetime,
end: datetime.datetime,
topics: List[str] = [],
output_format: OutputFormat = OutputFormat.mcap0,
callback: Optional[ProgressCallback] = None,
) -> bytes:
"""
Returns raw data bytes for a device and time range.
device_id: The id of the device that originated the desired data.
device_name: The name of the device that originated the desired data.
start: The earliest time from which to retrieve data.
end: The latest time from which to retrieve data.
topics: An optional list of topics to retrieve.
All topics will be retrieved if this is omitted.
output_format: The output format of the data, either .bag or .mcap, defaulting to .mcap.
"""
return _download_stream_with_progress(
self._make_stream_link(
device_id=device_id,
device_name=device_name,
start=start,
end=end,
topics=topics,
output_format=output_format,
),
callback=callback,
)

def get_coverage(
self,
Expand Down
39 changes: 39 additions & 0 deletions tests/test_stream_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from io import BytesIO
from datetime import datetime
from unittest.mock import MagicMock, patch

from mcap.records import Schema, Channel, Message

from foxglove_data_platform.client import Client

from .generate import generate_json_data


def get_generated_data(url, **kwargs):
assert url == "the_link"

class Resp:
def __init__(self):
self.raw = BytesIO(generate_json_data())

def raise_for_status(self):
return None

return Resp()


@patch("requests.get", side_effect=get_generated_data)
def test_boot(arg):
client = Client("test")
client._make_stream_link = MagicMock(return_value="the_link")
count = 0
for schema, channel, message, decoded_message in client.iter_messages(
device_id="test_id", start=datetime.now(), end=datetime.now()
):
assert "level" in decoded_message
assert isinstance(schema, Schema)
assert isinstance(channel, Channel)
assert isinstance(message, Message)
count += 1

assert count == 10

0 comments on commit 52242b2

Please sign in to comment.