该逻辑的代码片段会有所帮助。


您可以使用Mongo Change Streams来做到这一点。

例如,要查看集合的更改,请使用以下Collection.Watch()方法 -


var collection *mongo.Collection


// specify a pipeline that will only match "insert" events

// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documents

matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}

opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)

changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)

if err != nil {

    log.Fatal(err)

}


// print out all change stream events in the order they're received

// see the mongo.ChangeStream documentation for more examples of using change streams

for changeStream.Next(context.TODO()) {

    fmt.Println(changeStream.Current)

    // NewConsumer

}

然后创建一个新的消费者或者.SubscribeTopics()在你更新你的集合并且它符合你的标准时调用