This is a lightweight library that allows Java developers to scale their usage of MongoDB Change Streams.
Mongo Change Stream Enhancer divides Change Stream events into partitions and enables you to handle them in separate Threads, increasing throughput. It achieves that by creating a Change Stream per each partition (number is configurable) and handling each Change Stream in a dedicated Thread.
Inspired by solutions found in Kafka, we MCSE divides arriving events based on the document's _id
field. We convert this field to a number and divide it by number of partitions. The remainder of the division is then taken as the ID of the partition.
This allows us to direct all events connected to the same document to the same partition - thus guaranteeing that events will be handled in order (as in Kafka - within the same partition).
It's a $match
aggregation:
{"$expr": {"$eq": [{"$cond": ["$_id", {"$mod": [{"$abs": {"$toHashedIndexKey": "$_id"}}, $NUMBER_OF_PARTITIONS]}, -1]}, $PARTITION_ID]}}
In order to use this library you need to do 2 things:
- Create MongoConfig
- Create the MongoCseManager
- Create and register your listeners by implementing the
ChangeStreamListener
interface - Start the workers and begin listening to events
For this example, I've created 2 listeners that I want to use MyChangeStreamListener
and MetricsChangeStreamListener
:
public class MyChangeStreamListener implements ChangeStreamListener {
@Override
public void handle(ChangeStreamDocument<Document> event) {
log.info("I've received an event! {}", event);
}
}
public class MetricsChangeStreamListener implements ChangeStreamListener {
@Override
public void handle(ChangeStreamDocument<Document> event) {
logEventMetrics(event);
}
}
Now, to configure the manager and register my listeners. I want to register myListener
only for partitions 0 and 1, but I want metricsListener
to handle events from all partitions.
Note that both listeners will receive all events from partitions 0 and 1, while metricsListener
will also receive all events from partition 2. You can register as many listeners to each and every partition as you'd like. Partitions don't have to have any listeners assigned to them but the events will still be consumed by the worker and the resumeToken will be updated for that partition.
// Create MongoConfig
MongoConfig mongoConfig = MongoConfig.builder()
.connectionUri("mongodb://localhost:27017,localhost:27018,localhost:27019/?replicaSet=dbrs&retryWrites=true&w=majority")
.databaseName("test-db")
.collectionName("example")
.match(Filters.lt("fullDocument.testValue", 2))
.keyName("_id")
.numberOfPartitions(3)
.workerConfigCollectionName("changeStreamWorkerConfig")
.clusterConfigCollectionName("changeStreamClusterConfig")
.fullDocument(FullDocument.UPDATE_LOOKUP)
.fullDocumentBeforeChange(FullDocumentBeforeChange.DEFAULT)
.maxAwaitTimeInMs(1000)
.build();
// Create manager and configs
MongoCseManager manager = new MongoCseManager(mongoConfig);
// Create listener dedicated to partitions 0 and 1
MyChangeStreamListener myListener = new MyChangeStreamListener();
manager.registerListener(myListener, List.of(0, 1));
// Create a generic listener registered to all partitions
MetricsChangeStreamListener metricsListener = new MetricsChangeStreamListener();
manager.registerListenerToAllPartitions(metricsListener);
Finally, I want to start listening to events. This call will start the worker threads, read the resumeTokens from the configs (if available) and start consuming events.
// Start listening to events
manager.start();
connectionUri
- MongoDB URIdatabaseName
- name of the databasecollectionName
- name of the collection on which change stream listener should be appliedmatch
- $match pipeline stage to filter change stream events. It can be used for filtering e.g.fullDocument
oroperationType
event fields. By default, it accepts all events.keyName
- name of the key that will be used as partitioning key. Default value is_id
. Key value is required for every document and should be of type ObjectId. Value of the key doesn't have to be unique.numberOfPartitions
- how many partitions should be used (how many parallel listeners can be run)workerConfigCollectionName
- by default set tochangeStreamWorkerConfig
. Collection name in which worker config is storedclusterConfigCollectionName
- by default set tochangeStreamClusterConfig
. collection name in which cluster config is storedfullDocument
- by default set toFullDocument.UPDATE_LOOKUP
to return the latest version of the document.fullDocumentBeforeChange
- by default set toFullDocumentBeforeChange.OFF
. It is used to return version of the document before applying the change.maxAwaitTimeMS
- by default set to 1000 ms. The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.
For more info about fullDocument
, fullDocumentBeforeChange
and maxAwaitTime
see https://www.mongodb.com/docs/manual/reference/method/db.collection.watch/
Each listener can be deregistered at any moment. It can be done only for specific partitions or for all to each listener is assigned.
manager.deregisterListenerFromAllPartitions(listener0);
manager.deregisterListener(listener0, List.of(0))
MCSE works on the basis of configs. When used, it will create 2 collections in your MongoDB database: changeStreamWorkerConfig
and changeStreamClusterConfig
. Names of these collections can be changed (see MongoConfig configuration). They are used for:
- Storing the worker's collection name, partition and resumeToken
- Making sure the number of partitions is maintained. If you need to re-partition the change streams - they will need to start over from the beginning of the change stream. You can modify the configs manually in those collections but it is not advised.
This means that the MongoDB user needs to be able to create collections (or write to those collections if you create them manually).
As of version 1.0.0 it is not yet possible to spread partitions across multiple JVMs. However, such a feature is planned for future releases.
Version 1.0.0 enables you, however, to handle change streams from different collections on different JVMs as MongoCseManager's created for different collections are completely independent.
To use this library your MongoDB instance must support change streams, which are available for replica sets and sharded clusters. See Change streams availability.
This library was tested with the following MongoDB versions but should be working with all higher ones too:
- 4.4
We will try and expand the list with tests on other versions in the future.
In order for the replica set to properly advertise it's node's addresses you should add the following mapping in your /etc/hosts file:
127.0.0.1 mongo1
127.0.0.1 mongo2
127.0.0.1 mongo3
After that, run sh startReplicaSetEnvironment.sh
and wait for the replica set to start.
To run tests, simply run mvn test
With Gradle:
dependencies {
compile(group: "com.gravity9", name: 'mongo-change-stream-enhancer', version: "yourVersionHere");
}
With Maven:
<dependency>
<groupId>com.gravity9</groupId>
<artifactId>mongo-change-stream-enhancer</artifactId>
<version>1.0.0-beta</version>
</dependency>