Skip to content

Commit

Permalink
Include byte size of stateKey in estimated weight of WindmillBag, Win…
Browse files Browse the repository at this point in the history
…dmillValue, and WindmillWatermarkHold (#30654)

* Update WindmillBag.java

Include byte size of the stateKey on the BagState weight used to estimate and limit the total state cache size

* Update WindmillValue.java

Include stateKey size in the byte size of a WidnmillValue

* Update WindmillWatermarkHold.java

Include keyState size in the WatermarkHold estimated byte size

* Fix formatting issue

* Fix expected cache item weights in WindmillStateInternalsTest
  • Loading branch information
dmitryor authored Mar 18, 2024
1 parent 63aff7e commit c3b3fa6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForKeyA
}
// We now know the complete bag contents, and any read on it will yield a
// cached value, so cache it for future reads.
cache.put(namespace, address, this, encodedSize);
cache.put(namespace, address, this, encodedSize + stateKey.size());
}

// Don't reuse the localAdditions object; we don't want future changes to it to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected Windmill.WorkItemCommitRequest persistDirectly(WindmillStateCache.ForK
coder.encode(value, stream, Coder.Context.OUTER);
}
encoded = stream.toByteString();
cachedSize = encoded.size();
cachedSize = (long) encoded.size() + stateKey.size();
}

// Place in cache to avoid a future read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,14 @@ public Future<Windmill.WorkItemCommitRequest> persist(
throw new IllegalStateException("Unreachable condition");
}

final int estimatedByteSize = ENCODED_SIZE + stateKey.size();
return Futures.lazyTransform(
result,
result1 -> {
cleared = false;
localAdditions = null;
if (cachedValue != null) {
cache.put(namespace, address, WindmillWatermarkHold.this, ENCODED_SIZE);
cache.put(namespace, address, WindmillWatermarkHold.this, estimatedByteSize);
}
return result1;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3043,15 +3043,15 @@ public void testCachedValue() throws Exception {
value.write("Hi");
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(132, cache.getWeight());
assertEquals(141, cache.getWeight());

resetUnderTest();
value = underTest.state(NAMESPACE, addr);
assertEquals("Hi", value.read());
value.clear();
underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(130, cache.getWeight());
assertEquals(139, cache.getWeight());

resetUnderTest();
value = underTest.state(NAMESPACE, addr);
Expand Down Expand Up @@ -3083,7 +3083,7 @@ public void testCachedBag() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(140, cache.getWeight());
assertEquals(147, cache.getWeight());

resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
Expand All @@ -3103,7 +3103,7 @@ public void testCachedBag() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(133, cache.getWeight());
assertEquals(140, cache.getWeight());

resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
Expand All @@ -3114,7 +3114,7 @@ public void testCachedBag() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(134, cache.getWeight());
assertEquals(141, cache.getWeight());

resetUnderTest();
bag = underTest.state(NAMESPACE, addr);
Expand Down Expand Up @@ -3145,7 +3145,7 @@ public void testCachedWatermarkHold() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(138, cache.getWeight());
assertEquals(151, cache.getWeight());

resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
Expand All @@ -3154,7 +3154,7 @@ public void testCachedWatermarkHold() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(138, cache.getWeight());
assertEquals(151, cache.getWeight());

resetUnderTest();
hold = underTest.state(NAMESPACE, addr);
Expand Down Expand Up @@ -3185,7 +3185,7 @@ public void testCachedCombining() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(131, cache.getWeight());
assertEquals(144, cache.getWeight());

resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);
Expand All @@ -3196,7 +3196,7 @@ public void testCachedCombining() throws Exception {

underTest.persist(Windmill.WorkItemCommitRequest.newBuilder());

assertEquals(130, cache.getWeight());
assertEquals(143, cache.getWeight());

resetUnderTest();
value = underTest.state(NAMESPACE, COMBINING_ADDR);
Expand Down

0 comments on commit c3b3fa6

Please sign in to comment.