Skip to content

Commit

Permalink
Merge branch 'master' into upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha authored Jun 19, 2023
2 parents 2c3b1e2 + 155e40d commit 2154873
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 3 deletions.
1 change: 1 addition & 0 deletions faust/agents/replies.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ async def _start_fetcher(self, topic_name: str) -> None:
# declare the topic
topic = self._reply_topic(topic_name)
await topic.maybe_declare()
self.app.topics.add(topic)
await self.sleep(3.0)
# then create the future
self._fetchers[topic_name] = self.add_future(self._drain_replies(topic))
Expand Down
2 changes: 1 addition & 1 deletion faust/transport/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def add(self, topic: TopicT) -> None:

def _topic_contain_unsubscribed_topics(self, topic: TopicT) -> bool:
index = self._topic_name_index
return bool(index and any(t not in index for t in topic.topics))
return bool(any(t not in index for t in topic.topics))

def discard(self, topic: Any) -> None:
"""Unregister topic from conductor."""
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/agents/test_replies.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import json
from unittest.mock import Mock
from unittest.mock import MagicMock, Mock

import pytest

Expand Down Expand Up @@ -192,7 +192,7 @@ async def test_add(self, *, c):
async def test_start_fetcher(self, *, c):
c._drain_replies = Mock()
c._reply_topic = Mock(
return_value=Mock(
return_value=MagicMock(
maybe_declare=AsyncMock(),
),
)
Expand Down

0 comments on commit 2154873

Please sign in to comment.