-
Notifications
You must be signed in to change notification settings - Fork 183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pull in hopping window table fix #412
Conversation
Hi, This PR is really good because it will fix a long time issue on Faust and an improper (wrong) way to handle hopping window in Faust. I used the code for I tried to fix the issue in this snippet (which is not perfect) if you want to use it: async def _del_old_keys(self) -> None:
window = cast(WindowT, self.window)
assert window
for partition, timestamps in self._partition_timestamps.items():
while timestamps and window.stale(timestamps[0], time.time()):
timestamp = heappop(timestamps)
triggered_windows = [
self._partition_timestamp_keys.get((partition, window_range[1]))
for window_range in self._window_ranges(timestamp)
]
keys_to_remove = self._partition_timestamp_keys.pop(
(partition, timestamp), None
)
if keys_to_remove:
# TODO: need refactoring with dict comprehension?
window_data = {}
for windows in triggered_windows:
if windows:
for processed_window in windows:
# we use set to avoid duplicate element in window's data
# window[0] is the window's key
# it is not related to window's timestamp
# windows are in format:
# (key, (window_start, window_end))
window_data.setdefault(processed_window[0], []).extend(
self.data.get(processed_window, [])
)
for key_to_remove in keys_to_remove:
self.data.pop(key_to_remove, None)
if key_to_remove[1][0] > self.last_closed_window:
await self.on_window_close(
key_to_remove, window_data[key_to_remove[0]]
)
self.last_closed_window = max(
self.last_closed_window,
max(key[1][0] for key in keys_to_remove),
) The idea is to get different The main issue I still have is that the window is shifted of the You will find a below a full working and self-contained example. The window closing function does not do anything but print the element in the window. import sys
import time
import types
from datetime import datetime, timedelta
from heapq import heappop
from typing import cast
import faust
from faust.types.windows import WindowT
async def _custom_del_old_keys(self) -> None:
window = cast(WindowT, self.window)
assert window
for partition, timestamps in self._partition_timestamps.items():
while timestamps and window.stale(timestamps[0], time.time()):
timestamp = heappop(timestamps)
triggered_windows = [
self._partition_timestamp_keys.get((partition, window_range[1]))
for window_range in self._window_ranges(timestamp)
]
keys_to_remove = self._partition_timestamp_keys.pop(
(partition, timestamp), None
)
if keys_to_remove:
# TODO: need refactoring with dict comprehension?
window_data = {}
for windows in triggered_windows:
if windows:
for processed_window in windows:
# we use set to avoid duplicate element in window's data
# window[0] is the window's key
# it is not related to window's timestamp
# windows are in format:
# (key, (window_start, window_end))
window_data.setdefault(processed_window[0], []).extend(
self.data.get(processed_window, [])
)
for key_to_remove in keys_to_remove:
self.data.pop(key_to_remove, None)
if key_to_remove[1][0] > self.last_closed_window:
await self.on_window_close(
key_to_remove, window_data[key_to_remove[0]]
)
self.last_closed_window = max(
self.last_closed_window,
max(key[1][0] for key in keys_to_remove),
)
class RawModel(faust.Record):
key: str
date: datetime
value: float
TOPIC = "raw-event"
TABLE = "hopping_table"
KAFKA = "kafka://localhost:29092"
CLEANUP_INTERVAL = 1
WINDOW = 10
STEP = 5
WINDOW_EXPIRES = WINDOW + 2
PARTITIONS = 1
app = faust.App(
"windowed-hopping-2",
broker=KAFKA,
topic_partitions=PARTITIONS,
topic_disable_leader=True,
)
app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)
def window_processor(key, events):
"""
The real window is in fact the window_start + step to window_end + step
"""
window_start = key[1][0]
window_end = key[1][1]
count = len(events)
print(f"Window {window_start} - {window_end} has {count} events")
# sort dict for clear view
res = {event.date: (event.key, event.value) for event in events}
res = dict(sorted(res.items()))
for key, value in res.items():
print(f"{key}: {value}")
table = app.Table(
TABLE,
default=list,
partitions=PARTITIONS,
on_window_close=window_processor,
)
table._del_old_keys = types.MethodType(_custom_del_old_keys, table)
hopping_table = table.hopping(
WINDOW, STEP, expires=timedelta(seconds=WINDOW_EXPIRES)
).relative_to_field(RawModel.date)
@app.agent(source)
async def print_windowed_events(stream):
event: RawModel
async for event in stream:
value_list = hopping_table[event.key].value()
value_list.append(event)
hopping_table[event.key] = value_list
@app.timer(1)
async def produce_a():
await source.send(value=RawModel(key="a", value=1, date=time.time()), key="a")
@app.timer(1)
async def produce_b():
await source.send(value=RawModel(key="b", value=999, date=time.time()), key="b")
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.argv.extend(["worker", "-l", "info"])
app.main() |
I edited the previous message by using window_data.setdefault(processed_window[0], []).extend(self.data.get(processed_window, [])) |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #412 +/- ##
==========================================
- Coverage 93.71% 93.69% -0.03%
==========================================
Files 102 102
Lines 11110 11117 +7
Branches 1545 1550 +5
==========================================
+ Hits 10412 10416 +4
- Misses 607 609 +2
- Partials 91 92 +1 ☔ View full report in Codecov by Sentry. |
I substituted your timed producers with: @app.timer(1)
async def produce_a():
await source.send(value=RawModel(key="a", value=random.randint(1, 10), date=time.time()), key="a")
@app.timer(1)
async def produce_b():
await source.send(value=RawModel(key="b", value=random.randint(1, 10), date=time.time()), key="b") And I see this: [2022-11-22 15:55:27,366] [22633] [WARNING] Window 1669150505.0 - 1669150514.9 has 3 events
[2022-11-22 15:55:27,366] [22633] [WARNING] 1669150512.0197647: ('a', 8)
[2022-11-22 15:55:27,366] [22633] [WARNING] 1669150513.0207028: ('a', 2)
[2022-11-22 15:55:27,366] [22633] [WARNING] 1669150514.0224366: ('a', 1)
[2022-11-22 15:55:27,366] [22633] [WARNING] Window 1669150505.0 - 1669150514.9 has 3 events
[2022-11-22 15:55:27,366] [22633] [WARNING] 1669150512.0200157: ('b', 1)
[2022-11-22 15:55:27,367] [22633] [WARNING] 1669150513.0205472: ('b', 6)
[2022-11-22 15:55:27,367] [22633] [WARNING] 1669150514.0226176: ('b', 6)
[2022-11-22 15:55:32,366] [22633] [WARNING] Window 1669150510.0 - 1669150519.9 has 5 events
[2022-11-22 15:55:32,366] [22633] [WARNING] 1669150515.0222416: ('b', 8)
[2022-11-22 15:55:32,366] [22633] [WARNING] 1669150516.0247138: ('b', 3)
[2022-11-22 15:55:32,366] [22633] [WARNING] 1669150517.0247314: ('b', 7)
[2022-11-22 15:55:32,366] [22633] [WARNING] 1669150518.0267282: ('b', 4)
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150519.0277123: ('b', 5)
[2022-11-22 15:55:32,367] [22633] [WARNING] Window 1669150510.0 - 1669150519.9 has 5 events
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150515.0224657: ('a', 1)
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150516.024582: ('a', 9)
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150517.0249085: ('a', 10)
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150518.026591: ('a', 8)
[2022-11-22 15:55:32,367] [22633] [WARNING] 1669150519.0278525: ('a', 8) Could you provide clarification on what you meant by |
IntroI added some prints in the Note: First of all, if we use a ExampleThen, I will beased my explanation on this print (see below for the code generating these logs): [2022-11-23 08:39:40,256] [46850] [WARNING] Window 2022-11-23 08:39:15 - 2022-11-23 08:39:24
[2022-11-23 08:39:40,257] [46850] [WARNING] Window 1669189155.0 - 1669189164.9 has 10 events
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189160.294888, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:20'}
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189161.302301, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:21'}
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189162.297276, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:22'}
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189163.30537, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:23'}
[2022-11-23 08:39:40,257] [46850] [WARNING] {'date': 1669189164.300895, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:24'}
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189165.310261, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:25'}
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189166.3034098, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:26'}
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189167.312484, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:27'}
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189168.305968, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:28'}
[2022-11-23 08:39:40,258] [46850] [WARNING] {'date': 1669189169.316384, 'key': 'b', 'value': 999, 'datetime': '2022-11-23 08:39:29'} We have:
Issue: elements in window do not have the same datetimeIn the example we see that we are processing window from However, we have element containing data from Note that it implies that Codeimport sys
import time
import types
from datetime import datetime, timedelta
from heapq import heappop
from typing import cast
import faust
from faust.types.windows import WindowT
async def _custom_del_old_keys(self) -> None:
window = cast(WindowT, self.window)
assert window
for partition, timestamps in self._partition_timestamps.items():
while timestamps and window.stale(timestamps[0], time.time()):
timestamp = heappop(timestamps)
triggered_windows = [
self._partition_timestamp_keys.get((partition, window_range[1]))
for window_range in self._window_ranges(timestamp)
]
keys_to_remove = self._partition_timestamp_keys.pop(
(partition, timestamp), None
)
if keys_to_remove:
# TODO: need refactoring with dict comprehension?
window_data = {}
for windows in triggered_windows:
if windows:
for processed_window in windows:
# we use set to avoid duplicate element in window's data
# window[0] is the window's key
# it is not related to window's timestamp
# windows are in format:
# (key, (window_start, window_end))
window_data.setdefault(processed_window[0], []).extend(
self.data.get(processed_window, [])
)
for key_to_remove in keys_to_remove:
value = self.data.pop(key_to_remove, None)
if key_to_remove[1][0] > self.last_closed_window:
await self.on_window_close(
key_to_remove,
window_data[key_to_remove[0]]
if key_to_remove[0] in window_data
else value,
)
self.last_closed_window = max(
self.last_closed_window,
max(key[1][0] for key in keys_to_remove),
)
class RawModel(faust.Record):
key: str
date: datetime
value: float
TOPIC = "raw-event"
TABLE = "hopping_table"
KAFKA = "kafka://localhost:29092"
CLEANUP_INTERVAL = 1
WINDOW = 10
STEP = 5
WINDOW_EXPIRES = WINDOW + 5
PARTITIONS = 1
app = faust.App(
"windowed-hopping-3",
broker=KAFKA,
topic_partitions=PARTITIONS,
topic_disable_leader=True,
)
app.conf.table_cleanup_interval = CLEANUP_INTERVAL
source = app.topic(TOPIC, value_type=RawModel)
def window_processor(key, events):
"""
The real window is in fact the window_start + step to window_end + step
"""
window_start = key[1][0]
window_end = key[1][1]
window_start_dt = datetime.fromtimestamp(window_start).strftime("%Y-%m-%d %H:%M:%S")
window_end_dt = datetime.fromtimestamp(window_end).strftime("%Y-%m-%d %H:%M:%S")
count = len(events)
print("-" * 80)
print(f"Window {window_start_dt} - {window_end_dt}")
print(f"Window {window_start} - {window_end} has {count} events")
for event in events:
event["datetime"] = datetime.fromtimestamp(event["date"]).strftime(
"%Y-%m-%d %H:%M:%S"
)
print(event)
print("-" * 80)
table = app.Table(
TABLE,
default=list,
partitions=PARTITIONS,
on_window_close=window_processor,
)
table._del_old_keys = types.MethodType(_custom_del_old_keys, table)
hopping_table = table.hopping(
WINDOW, STEP, expires=timedelta(seconds=WINDOW_EXPIRES)
).relative_to_field(RawModel.date)
@app.agent(source)
async def print_windowed_events(stream):
event: RawModel
async for event in stream:
value_list = hopping_table[event.key].value()
value_list.append({"date": event.date, "key": event.key, "value": event.value})
hopping_table[event.key] = value_list
@app.timer(1)
async def produce_a():
await source.send(value=RawModel(key="a", value=1, date=time.time()), key="a")
@app.timer(1)
async def produce_b():
await source.send(value=RawModel(key="b", value=999, date=time.time()), key="b")
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.argv.extend(["worker", "-l", "info"])
app.main() |
Just wondering whether there's a way to have this data stick around after a worker running it has died? The issue I'm finding is that the agent/worker that runs this on restarting or after a rebalance has no historic window information, it's simply gone. Even if the most recent window should still be around (that is, not closed), there appears to be no data available from it. |
Correct, there's nothing in our RocksDB driver that makes it persist. Admittedly, I'm not an expert on the tumbling window scenario, so my progress on it has been delayed. |
I wish I remember why I shelved this. I think there's something I forgot to test, but I would like to revisit this PR. |
hi, quite recently i started using faust for a project i am working on and stumbled upon the issue with hopping windows not behaving as they should. I would really like to see this PR get merged. I was wondering this is still in progress or if there's anything to be done to land this change. I am not very familiar with the codebase but I can take a crack at if there's something missing. |
Hey there, thanks for the note. I'll revisit this! I left it open because I wanted to review the changes further before merging in case something got broken by them. |
Okay, I think I'm satisfied now. I added a sanity test for when there are no window ranges and it seems to be having the same behavior as well. |
Address additional changes to close #195 by pulling in forgotten changes from #349.