Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change stream enhancements #326

Open
wants to merge 16 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 37 additions & 47 deletions changestreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ const (
type ChangeStream struct {
iter *Iter
isClosed bool
timedout bool
options ChangeStreamOptions
pipeline interface{}
resumeToken *bson.Raw
collection *Collection
readPreference *ReadPreference
err error
m sync.Mutex
sessionCopied bool
domainType changeDomainType
session *Session
database *Database
Expand All @@ -59,6 +59,10 @@ type ChangeStreamOptions struct {

// Collation specifies the way the server should collate returned data.
//TODO Collation *Collation

// Specify StartAtOperationTime to open the cursor at a particular point in time.
// If the specified starting point is in the past, it must be in the time range of the oplog.
StartAtOperationTime bson.MongoTimestamp
}

var errMissingResumeToken = errors.New("resume token missing from result")
Expand Down Expand Up @@ -89,14 +93,14 @@ func (c *Collection) Watch(pipeline interface{},
return nil, err
}

pIter.isChangeStream = true
return &ChangeStream{
iter: pIter,
collection: c,
resumeToken: nil,
options: options,
pipeline: pipeline,
domainType: changeDomainCollection,
session: c.Database.Session,
}, nil
}

Expand Down Expand Up @@ -126,7 +130,6 @@ func (sess *Session) Watch(pipeline interface{},
return nil, err
}

pIter.isChangeStream = true
return &ChangeStream{
iter: pIter,
resumeToken: nil,
Expand Down Expand Up @@ -163,7 +166,6 @@ func (db *Database) Watch(pipeline interface{},
return nil, err
}

pIter.isChangeStream = true
return &ChangeStream{
iter: pIter,
resumeToken: nil,
Expand Down Expand Up @@ -204,6 +206,8 @@ func (changeStream *ChangeStream) Next(result interface{}) bool {

defer changeStream.m.Unlock()

changeStream.timedout = false

// if we are in a state of error, then don't continue.
if changeStream.err != nil {
return false
Expand All @@ -219,13 +223,20 @@ func (changeStream *ChangeStream) Next(result interface{}) bool {
// attempt to fetch the change stream result.
err = changeStream.fetchResultSet(result)
if err == nil {
if changeStream.iter.Timeout() {
changeStream.timedout = true
return false
}
return true
}

// if we get no results we return false with no errors so the user can call Next
// again, resuming is not needed as the iterator is simply timed out as no events happened.
// The user will call Timeout in order to understand if this was the case.
if err == ErrNotFound {
if err == ErrNotFound && changeStream.iter.op.cursorId != 0 {
// must reset error if cursor is to be reused
changeStream.iter.err = nil
changeStream.timedout = true
return false
}

Expand All @@ -245,15 +256,10 @@ func (changeStream *ChangeStream) Next(result interface{}) bool {
return false
}

// we've successfully resumed the changestream.
// try to fetch the next result.
err = changeStream.fetchResultSet(result)
if err != nil {
changeStream.err = err
return false
}

return true
// resume was ok so return a timeout to the client indicating
// it is ok to call Next again for results
changeStream.timedout = true
return false
}

// Err returns nil if no errors happened during iteration, or the actual
Expand All @@ -274,10 +280,6 @@ func (changeStream *ChangeStream) Close() error {
if err != nil {
changeStream.err = err
}
if changeStream.sessionCopied {
changeStream.iter.session.Close()
changeStream.sessionCopied = false
}
return err
}

Expand All @@ -296,7 +298,7 @@ func (changeStream *ChangeStream) ResumeToken() *bson.Raw {

// Timeout returns true if the last call of Next returned false because of an iterator timeout.
func (changeStream *ChangeStream) Timeout() bool {
return changeStream.iter.Timeout()
return changeStream.timedout
}

func constructChangeStreamPipeline(pipeline interface{},
Expand All @@ -318,6 +320,9 @@ func constructChangeStreamPipeline(pipeline interface{},
if options.ResumeAfter != nil {
changeStreamStageOptions["resumeAfter"] = options.ResumeAfter
}
if options.StartAtOperationTime > 0 {
changeStreamStageOptions["startAtOperationTime"] = options.StartAtOperationTime
}
if domain == changeDomainCluster {
changeStreamStageOptions["allChangesForCluster"] = true
}
Expand All @@ -339,35 +344,21 @@ func constructChangeStreamPipeline(pipeline interface{},
}

func (changeStream *ChangeStream) resume() error {
// copy the information for the new socket.

// Thanks to Copy() future uses will acquire a new socket against the newly selected DB.
newSession := changeStream.iter.session.Copy()

// fetch the cursor from the iterator and use it to run a killCursors
// on the connection.
cursorId := changeStream.iter.op.cursorId
err := runKillCursorsOnSession(newSession, cursorId)
if err != nil {
newSession.Close()
// Close and kill the cursor of the current iterator
if err := changeStream.iter.Close(); err != nil {
return err
}

// change out the old connection to the database with the new connection.
if changeStream.sessionCopied {
changeStream.collection.Database.Session.Close()
}
changeStream.collection.Database.Session = newSession
changeStream.sessionCopied = true

opts := changeStream.options
if changeStream.resumeToken != nil {
opts.ResumeAfter = changeStream.resumeToken
opts.StartAtOperationTime = bson.MongoTimestamp(0)
}
// make a new pipeline containing the resume token.
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts, changeStream.domainType)

// generate the new iterator with the new connection.
// generate the new pipe
var newPipe *Pipe
if changeStream.domainType == changeDomainCollection {
newPipe = changeStream.collection.Pipe(changeStreamPipeline)
Expand All @@ -377,11 +368,19 @@ func (changeStream *ChangeStream) resume() error {
newPipe = changeStream.database.pipe(changeStreamPipeline)
}

// apply any options set to the new pipe
if opts.MaxAwaitTimeMS > 0 {
newPipe.SetMaxTime(opts.MaxAwaitTimeMS)
}
if opts.BatchSize > 0 {
newPipe.Batch(opts.BatchSize)
}

// pipes internally create new socket connnections
changeStream.iter = newPipe.Iter()
if err := changeStream.iter.Err(); err != nil {
return err
}
changeStream.iter.isChangeStream = true
return nil
}

Expand Down Expand Up @@ -440,12 +439,3 @@ func isResumableError(err error) bool {
//and is not a missingResumeToken error (caused by the user provided pipeline)
return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken)
}

func runKillCursorsOnSession(session *Session, cursorId int64) error {
socket, err := session.acquireSocket(true)
if err != nil {
return err
}
defer socket.Release()
return socket.Query(&killCursorsOp{[]int64{cursorId}})
}
67 changes: 67 additions & 0 deletions changestreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type changeEvent struct {
Ns evNamespace `bson:"ns"`
DocumentKey M `bson:"documentKey"`
UpdateDescription *updateDesc `bson:"updateDescription,omitempty"`
ClusterTime bson.MongoTimestamp `bson:"clusterTime,omitempty"`
}

type watchable interface {
Expand Down Expand Up @@ -58,6 +59,69 @@ func (s *S) TestStreamsWatch(c *C) {
}
}

func (s *S) TestStreamsStartAtOperationTime(c *C) {
if !s.versionAtLeast(4, 0) {
c.Skip("StartAtOperationTime only works on 4.0+")
}
session, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
defer session.Close()
coll := session.DB("mydb").C("mycoll")

var id bson.ObjectId
var ts bson.MongoTimestamp

// checkEv will check the event is correct
var checkEv = func(ev changeEvent) {
oid := ev.DocumentKey["_id"].(bson.ObjectId)
ots := ev.ClusterTime
c.Assert(oid, Equals, id)
c.Assert(ots > 0, Equals, true)
c.Assert(ots, Equals, ts)
}
var testF = func(w watchable) {
var hasEvent bool
pipeline := []bson.M{}
changeStream, e := w.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500})
c.Assert(e, IsNil)

//insert a new document while the change stream is listening
id = bson.NewObjectId()
e = coll.Insert(M{"_id": id, "a": 1})
c.Assert(e, IsNil)

//read the insert event
changeEv := changeEvent{}
hasEvent = changeStream.Next(&changeEv)
c.Assert(hasEvent, Equals, true)

//capture timestamp of insert event
ts = changeEv.ClusterTime

//verify the insert event
checkEv(changeEv)
e = changeStream.Close()
c.Assert(e, IsNil)

//start another change stream starting at the insert event
changeStream, e = w.Watch(pipeline, mgo.ChangeStreamOptions{StartAtOperationTime: ts, MaxAwaitTimeMS: 1500})
changeEv = changeEvent{}
hasEvent = changeStream.Next(&changeEv)
c.Assert(hasEvent, Equals, true)

//check that we restarted at the insert event
checkEv(changeEv)
e = changeStream.Close()
c.Assert(e, IsNil)
}
//collection level
testF(coll)
//db level
testF(session.DB("mydb"))
//cluster level
testF(session)
}

func (s *S) TestStreamsInsert(c *C) {
if !s.versionAtLeast(3, 6) {
c.Skip("ChangeStreams only work on 3.6+")
Expand Down Expand Up @@ -163,6 +227,9 @@ func (s *S) TestStreamsNextNoEventTimeout(c *C) {
c.Assert(changeStream.Err(), IsNil)
c.Assert(changeStream.Timeout(), Equals, true)

err = changeStream.Close()
c.Assert(err, IsNil)

//test the same with default timeout (MaxTimeMS=1000)
//create the stream
changeStream, err = w.Watch(pipeline, mgo.ChangeStreamOptions{})
Expand Down
16 changes: 2 additions & 14 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ type Iter struct {
limit int32
timedout bool
isFindCmd bool
isChangeStream bool
maxTimeMS int64
}

Expand Down Expand Up @@ -4464,11 +4463,6 @@ func (iter *Iter) Next(result interface{}) bool {
iter.m.Lock()
iter.timedout = false
timeout := time.Time{}
// for a ChangeStream iterator we have to call getMore before the loop otherwise
// we'll always return false
if iter.isChangeStream {
iter.getMore()
}
// check should we expect more data.
for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) {
// we should expect more data.
Expand All @@ -4485,12 +4479,6 @@ func (iter *Iter) Next(result interface{}) bool {
return false
}
}
// for a ChangeStream one loop i enought to declare the timeout
if iter.isChangeStream {
iter.timedout = true
iter.m.Unlock()
return false
}
// run a getmore to fetch more data.
iter.getMore()
if iter.err != nil {
Expand Down Expand Up @@ -4716,7 +4704,7 @@ func (iter *Iter) getMore() {
}
}
var op interface{}
if iter.isFindCmd || iter.isChangeStream {
if iter.isFindCmd {
op = iter.getMoreCmd()
} else {
op = &iter.op
Expand Down Expand Up @@ -5389,7 +5377,7 @@ func (iter *Iter) replyFunc() replyFunc {
iter.err = err
} else if !findReply.Ok && findReply.Errmsg != "" {
iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg}
} else if !iter.isChangeStream && len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 {
} else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 {
iter.err = ErrNotFound
} else {
batch := findReply.Cursor.FirstBatch
Expand Down