Skip to content

Commit

Permalink
fix: correct Nats ObjectStorage get file behavior inside watch subscr…
Browse files Browse the repository at this point in the history
…iber (#1523)

* tests: fix flacking nats-real test

* fix: correct NATS ObjectStorage watch processing

* tests: fix NATS OS tes
  • Loading branch information
Lancetnik authored Jun 15, 2024
1 parent 2355603 commit dfec397
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 29 deletions.
19 changes: 9 additions & 10 deletions faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -1014,20 +1014,19 @@ async def _create_subscription( # type: ignore[override]
declare=self.obj_watch.declare,
)

self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](
await self.bucket.watch(
ignore_deletes=self.obj_watch.ignore_deletes,
include_history=self.obj_watch.include_history,
meta_only=self.obj_watch.meta_only,
)
)

self.add_task(self._consume_watch())

async def _consume_watch(self) -> None:
assert self.subscription, "You should call `create_subscription` at first." # nosec B101
assert self.bucket, "You should call `create_subscription` at first." # nosec B101

# Should be created inside task to avoid nats-py lock
obj_watch = await self.bucket.watch(
ignore_deletes=self.obj_watch.ignore_deletes,
include_history=self.obj_watch.include_history,
meta_only=self.obj_watch.meta_only,
)

obj_watch = self.subscription.obj
self.subscription = UnsubscribeAdapter["ObjectStore.ObjectWatcher"](obj_watch)

while self.running:
with suppress(TimeoutError):
Expand Down
22 changes: 4 additions & 18 deletions tests/docs/nats/js/test_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,9 @@
async def test_basic():
from docs.docs_src.nats.js.object import app, broker, handler

async with TestNatsBroker(broker, with_real=True):
await broker.start()

os = await broker.object_storage("example-bucket")
try:
existed_files = await os.list()
except Exception:
existed_files = ()

call = True
for file in existed_files:
if file.name == "file.txt":
call = False

if call:
async with TestApp(app):
pass

async with (
TestNatsBroker(broker, with_real=True, connect_only=True),
TestApp(app),
):
await handler.wait_call(3.0)
handler.mock.assert_called_once_with("file.txt")
2 changes: 1 addition & 1 deletion tests/opentelemetry/nats/test_nats.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def test_batch(
@broker.subscriber(
queue,
stream=stream,
pull_sub=PullSub(3, batch=True),
pull_sub=PullSub(3, batch=True, timeout=30.0),
**self.subscriber_kwargs,
)
async def handler(m):
Expand Down

0 comments on commit dfec397

Please sign in to comment.