@@ -259,34 +259,6 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
259259 assert .Nil (mt , cs .Err (), "change stream error: %v" , cs .Err ())
260260 })
261261
262- startAtOpTimeOpts := mtest .NewOptions ().MinServerVersion ("4.0" ).MaxServerVersion ("4.0.6" )
263- mt .RunOpts ("include startAtOperationTime" , startAtOpTimeOpts , func (mt * mtest.T ) {
264- // $changeStream stage for ChangeStream against a server >=4.0 and <4.0.7 that has not received any results yet
265- // MUST include a startAtOperationTime option when resuming a changestream.
266-
267- cs , err := mt .Coll .Watch (context .Background (), mongo.Pipeline {})
268- assert .Nil (mt , err , "Watch error: %v" , err )
269- defer closeStream (cs )
270-
271- generateEvents (mt , 1 )
272- // kill cursor to force resumable error
273- killChangeStreamCursor (mt , cs )
274-
275- mt .ClearEvents ()
276- // change stream should resume once and get new change
277- assert .True (mt , cs .Next (context .Background ()), "expected Next to return true, got false" )
278- // Next should cause getMore, killCursors, and aggregate to run
279- assert .NotNil (mt , mt .GetStartedEvent (), "expected getMore event, got nil" )
280- assert .NotNil (mt , mt .GetStartedEvent (), "expected killCursors event, got nil" )
281- aggEvent := mt .GetStartedEvent ()
282- assert .NotNil (mt , aggEvent , "expected aggregate event, got nil" )
283- assert .Equal (mt , "aggregate" , aggEvent .CommandName , "expected command name 'aggregate', got '%v'" , aggEvent .CommandName )
284-
285- // check for startAtOperationTime in pipeline
286- csStage := aggEvent .Command .Lookup ("pipeline" ).Array ().Index (0 ).Document () // $changeStream stage
287- _ , err = csStage .Lookup ("$changeStream" ).Document ().LookupErr ("startAtOperationTime" )
288- assert .Nil (mt , err , "startAtOperationTime not included in aggregate command" )
289- })
290262 mt .RunOpts ("decode does not panic" , noClientOpts , func (mt * mtest.T ) {
291263 testCases := []struct {
292264 name string
@@ -418,53 +390,6 @@ func TestChangeStream_ReplicaSet(t *testing.T) {
418390 })
419391 }
420392 })
421-
422- noPbrtOpts := mtest .NewOptions ().MaxServerVersion ("4.0.6" )
423- mt .RunOpts ("without PBRT support" , noPbrtOpts , func (mt * mtest.T ) {
424- collName := mt .Coll .Name ()
425- dbName := mt .Coll .Database ().Name ()
426- cs , err := mt .Coll .Watch (context .Background (), mongo.Pipeline {})
427- assert .Nil (mt , err , "Watch error: %v" , err )
428- defer closeStream (cs )
429-
430- compareResumeTokens (mt , cs , nil ) // should be no resume token because no PBRT
431- numEvents := 5
432- generateEvents (mt , numEvents )
433- // iterate once to get a resume token
434- assert .True (mt , cs .Next (context .Background ()), "expected Next to return true, got false" )
435- token := cs .ResumeToken ()
436- assert .NotNil (mt , token , "expected resume token, got nil" )
437-
438- testCases := []struct {
439- name string
440- opts * options.ChangeStreamOptionsBuilder
441- iterateStream bool // whether or not resulting change stream should be iterated
442- initialToken bson.Raw
443- numDocsExpected int
444- }{
445- {"resumeAfter" , options .ChangeStream ().SetResumeAfter (token ), true , token , numEvents - 1 },
446- {"no options" , nil , false , nil , 0 },
447- }
448- for _ , tc := range testCases {
449- mt .Run (tc .name , func (mt * mtest.T ) {
450- coll := mt .Client .Database (dbName ).Collection (collName )
451- cs , err := coll .Watch (context .Background (), mongo.Pipeline {}, tc .opts )
452- assert .Nil (mt , err , "Watch error: %v" , err )
453- defer closeStream (cs )
454-
455- compareResumeTokens (mt , cs , tc .initialToken )
456- if ! tc .iterateStream {
457- return
458- }
459-
460- for i := 0 ; i < tc .numDocsExpected ; i ++ {
461- assert .True (mt , cs .Next (context .Background ()), "expected Next to return true, got false" )
462- // current resume token should always equal _id of current document
463- compareResumeTokens (mt , cs , cs .Current .Lookup ("_id" ).Document ())
464- }
465- })
466- }
467- })
468393 })
469394 })
470395 mt .RunOpts ("try next" , noClientOpts , func (mt * mtest.T ) {
@@ -826,17 +751,6 @@ func generateEvents(mt *mtest.T, numEvents int) {
826751 }
827752}
828753
829- func killChangeStreamCursor (mt * mtest.T , cs * mongo.ChangeStream ) {
830- mt .Helper ()
831-
832- db := mt .Coll .Database ().Client ().Database ("admin" )
833- err := db .RunCommand (context .Background (), bson.D {
834- {"killCursors" , mt .Coll .Name ()},
835- {"cursors" , bson.A {cs .ID ()}},
836- }).Err ()
837- assert .Nil (mt , err , "killCursors error: %v" , err )
838- }
839-
840754// returns pbrt, operationTime from aggregate command response
841755func getAggregateResponseInfo (mt * mtest.T ) (bson.Raw , bson.Timestamp ) {
842756 mt .Helper ()
0 commit comments