Skip to content

Commit

Permalink
Merge branch 'trunk' into hb-error-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
lianetm committed Nov 16, 2024
2 parents 64d020a + a8f84ca commit 5a8d851
Show file tree
Hide file tree
Showing 62 changed files with 2,494 additions and 465 deletions.
16 changes: 16 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2697,6 +2697,7 @@ project(':streams') {
':streams:upgrade-system-tests-36:test',
':streams:upgrade-system-tests-37:test',
':streams:upgrade-system-tests-38:test',
':streams:upgrade-system-tests-39:test',
':streams:examples:test'
]
)
Expand Down Expand Up @@ -3244,6 +3245,21 @@ project(':streams:upgrade-system-tests-38') {
}
}

project(':streams:upgrade-system-tests-39') {
base {
archivesName = "kafka-streams-upgrade-system-tests-39"
}

dependencies {
testImplementation libs.kafkaStreams_39
testRuntimeOnly libs.junitJupiter
}

systemTestLibs {
dependsOn testJar
}
}

project(':jmh-benchmarks') {

apply plugin: 'io.github.goooler.shadow'
Expand Down
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
files="ClientUtils.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|KafkaNetworkChannelTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,10 @@ void maybeReconcile() {
revokedPartitions
);

// Mark partitions as pending revocation to stop fetching from the partitions (no new
// fetches sent out, and no in-flight fetches responses processed).
markPendingRevocationToPauseFetching(revokedPartitions);

// Commit offsets if auto-commit enabled before reconciling a new assignment. Request will
// be retried until it succeeds, fails with non-retriable error, or timer expires.
CompletableFuture<Void> commitResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,12 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
Expand Down Expand Up @@ -1571,10 +1573,12 @@ public void unsubscribe() {
subscriptions.assignedPartitions());

try {
// If users subscribe to an invalid topic name, they will get InvalidTopicException in error events,
// because network thread keeps trying to send MetadataRequest in the background.
// Ignore it to avoid unsubscribe failed.
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e instanceof InvalidTopicException);
// If users subscribe to a topic with invalid name or without permission, they will get some exceptions.
// Because network thread keeps trying to send MetadataRequest or ConsumerGroupHeartbeatRequest in the background,
// there will be some error events in the background queue.
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> e instanceof InvalidTopicException || e instanceof TopicAuthorizationException || e instanceof GroupAuthorizationException);
log.info("Unsubscribed all topics or patterns and assigned partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
Expand Down Expand Up @@ -282,6 +283,23 @@ public void testCloseWithInvalidTopicException() {
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testUnsubscribeWithTopicAuthorizationException() {
consumer = newConsumer();
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.unsubscribe());
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testCloseWithTopicAuthorizationException() {
consumer = newConsumer();
backgroundEventQueue.add(new ErrorEvent(new TopicAuthorizationException(Set.of("test-topic"))));
completeUnsubscribeApplicationEventSuccessfully();
assertDoesNotThrow(() -> consumer.close());
}

@Test
public void testCommitAsyncWithNullCallback() {
consumer = newConsumer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -86,6 +87,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -1433,6 +1435,7 @@ public void testReconcilePartitionsRevokedNoAutoCommitNoCallbacks() {
membershipManager.poll(time.milliseconds());

testRevocationOfAllPartitionsCompleted(membershipManager);
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
}

@Test
Expand All @@ -1456,6 +1459,10 @@ public void testReconcilePartitionsRevokedWithSuccessfulAutoCommitNoCallbacks()

// Complete commit request
commitResult.complete(null);
InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRevocation(anyLong());
inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));

testRevocationOfAllPartitionsCompleted(membershipManager);
}
Expand All @@ -1480,6 +1487,7 @@ public void testReconcilePartitionsRevokedWithFailedAutoCommitCompletesRevocatio
// Complete commit request
commitResult.completeExceptionally(new KafkaException("Commit request failed with " +
"non-retriable error"));
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition("topic1", 0)));

testRevocationOfAllPartitionsCompleted(membershipManager);
}
Expand Down Expand Up @@ -1579,11 +1587,11 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());

// Member received assignment to reconcile;

receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);

verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
verify(subscriptionState).markPendingRevocation(Set.of());

// Member should complete reconciliation
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
Expand All @@ -1607,6 +1615,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
receiveAssignment(topicId, Collections.singletonList(1), membershipManager);

membershipManager.poll(time.milliseconds());
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition(topicName, 0)));

// Revocation should complete without requesting any metadata update given that the topic
// received in target assignment should exist in local topic name cache.
Expand Down Expand Up @@ -2551,7 +2560,6 @@ private void testRevocationCompleted(ConsumerMembershipManager membershipManager
assertEquals(assignmentByTopicId, membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());

verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new HashSet<>(expectedTopicPartitionAssignment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable

verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
verify(subscriptionState).markPendingRevocation(Set.of());

// Member should complete reconciliation
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
Expand All @@ -1123,6 +1124,7 @@ public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable
receiveAssignment(topicId, Collections.singletonList(1), membershipManager);

membershipManager.poll(time.milliseconds());
verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new TopicPartition(topicName, 0)));

// Revocation should complete without requesting any metadata update given that the topic
// received in target assignment should exist in local topic name cache.
Expand Down Expand Up @@ -1423,7 +1425,6 @@ private void testRevocationCompleted(ShareMembershipManager membershipManager,
assertEquals(assignmentByTopicId, membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());

verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new HashSet<>(expectedTopicPartitionAssignment);
Expand Down
Loading

0 comments on commit 5a8d851

Please sign in to comment.