Skip to content

Commit

Permalink
Pass on FullDocumentBeforeChange option to change stream iterable/pub…
Browse files Browse the repository at this point in the history
…lisher.

This commit ensures to pass on a potentially set FullDocumentBeforeChange option to the change stream iterable/publisher.
It also corrects false optional behavior within the change stream task which did some defaulting though the actual value is an optional one that must not be present.

Original pull request: #4541
Closes #4495
  • Loading branch information
christophstrobl authored and mp911de committed Nov 2, 2023
1 parent 6928515 commit bf54054
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ private ChangeStreamOptionsBuilder initOptionsBuilder() {
}
});
options.getFullDocumentLookup().ifPresent(builder::fullDocumentLookup);
options.getFullDocumentBeforeChangeLookup().ifPresent(builder::fullDocumentBeforeChangeLookup);
options.getCollation().ifPresent(builder::collation);

if (options.isResumeAfter()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2050,6 +2050,10 @@ public <T> Flux<ChangeStreamEvent<T>> changeStream(@Nullable String database, @N
publisher = options.getCollation().map(Collation::toMongoCollation).map(publisher::collation)
.orElse(publisher);
publisher = options.getResumeBsonTimestamp().map(publisher::startAtOperationTime).orElse(publisher);

if(options.getFullDocumentBeforeChangeLookup().isPresent()) {
publisher = publisher.fullDocumentBeforeChange(options.getFullDocumentBeforeChangeLookup().get());
}
return publisher.fullDocument(options.getFullDocumentLookup().orElse(fullDocument));
}) //
.flatMapMany(publisher -> Flux.from(publisher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.messaging.ChangeStreamRequest.ChangeStreamRequestOptions;
import org.springframework.data.mongodb.core.messaging.Message.MessageProperties;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest.RequestOptions;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -88,7 +87,7 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
Collation collation = null;
FullDocument fullDocument = ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP;
FullDocumentBeforeChange fullDocumentBeforeChange = FullDocumentBeforeChange.DEFAULT;
FullDocumentBeforeChange fullDocumentBeforeChange = null;
BsonTimestamp startAt = null;
boolean resumeAfter = true;

Expand Down Expand Up @@ -116,8 +115,9 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
.orElseGet(() -> ClassUtils.isAssignable(Document.class, targetType) ? FullDocument.DEFAULT
: FullDocument.UPDATE_LOOKUP);

fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup()
.orElse(FullDocumentBeforeChange.DEFAULT);
if(changeStreamOptions.getFullDocumentBeforeChangeLookup().isPresent()) {
fullDocumentBeforeChange = changeStreamOptions.getFullDocumentBeforeChangeLookup().get();
}

startAt = changeStreamOptions.getResumeBsonTimestamp().orElse(null);
}
Expand Down Expand Up @@ -158,7 +158,9 @@ protected MongoCursor<ChangeStreamDocument<Document>> initCursor(MongoTemplate t
}

iterable = iterable.fullDocument(fullDocument);
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
if(fullDocumentBeforeChange != null) {
iterable = iterable.fullDocumentBeforeChange(fullDocumentBeforeChange);
}

return iterable.iterator();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
import static org.springframework.data.mongodb.core.aggregation.Aggregation.*;
import static org.springframework.data.mongodb.test.util.Assertions.assertThat;

import com.mongodb.WriteConcern;
import org.springframework.data.mongodb.core.MongoTemplateUnitTests.Sith;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -101,6 +99,7 @@
import com.mongodb.MongoClientSettings;
import com.mongodb.ReadConcern;
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.DeleteOptions;
Expand All @@ -110,6 +109,7 @@
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.model.TimeSeriesGranularity;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.InsertManyResult;
import com.mongodb.client.result.InsertOneResult;
Expand Down Expand Up @@ -1604,6 +1604,25 @@ void changeStreamOptionStartAftershouldApplied() {
verify(changeStreamPublisher).startAfter(eq(token));
}

@Test // GH-4495
void changeStreamOptionFullDocumentBeforeChangeShouldBeApplied() {

when(factory.getMongoDatabase(anyString())).thenReturn(Mono.just(db));

when(collection.watch(any(Class.class))).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.batchSize(anyInt())).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.startAfter(any())).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.fullDocument(any())).thenReturn(changeStreamPublisher);
when(changeStreamPublisher.fullDocumentBeforeChange(any())).thenReturn(changeStreamPublisher);

template
.changeStream("database", "collection", ChangeStreamOptions.builder().fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED).build(), Object.class)
.subscribe();

verify(changeStreamPublisher).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED));

}

@Test // GH-4462
void replaceShouldUseCollationWhenPresent() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
import org.springframework.data.mongodb.core.convert.MongoConverter;
Expand All @@ -39,6 +38,7 @@
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;

/**
* @author Christoph Strobl
Expand Down Expand Up @@ -68,8 +68,6 @@ void setUp() {
when(mongoCollection.watch(eq(Document.class))).thenReturn(changeStreamIterable);

when(changeStreamIterable.fullDocument(any())).thenReturn(changeStreamIterable);

when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable);
}

@Test // DATAMONGO-2258
Expand Down Expand Up @@ -125,6 +123,22 @@ void shouldApplyStartAfterToChangeStream() {
verify(changeStreamIterable).startAfter(eq(resumeToken));
}

@Test // GH-4495
void shouldApplyFullDocumentBeforeChangeToChangeStream() {

when(changeStreamIterable.fullDocumentBeforeChange(any())).thenReturn(changeStreamIterable);

ChangeStreamRequest request = ChangeStreamRequest.builder() //
.collection("start-wars") //
.fullDocumentBeforeChangeLookup(FullDocumentBeforeChange.REQUIRED) //
.publishTo(message -> {}) //
.build();

initTask(request, Document.class);

verify(changeStreamIterable).fullDocumentBeforeChange(eq(FullDocumentBeforeChange.REQUIRED));
}

private MongoCursor<ChangeStreamDocument<Document>> initTask(ChangeStreamRequest request, Class<?> targetType) {

ChangeStreamTask task = new ChangeStreamTask(template, request, targetType, er -> {});
Expand Down

0 comments on commit bf54054

Please sign in to comment.