该逻辑的代码片段会有所帮助。
您可以使用Mongo Change Streams来做到这一点。
例如,要查看集合的更改,请使用以下Collection.Watch()方法 -
var collection *mongo.Collection
// specify a pipeline that will only match "insert" events
// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documents
matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}
opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)
if err != nil {
log.Fatal(err)
}
// print out all change stream events in the order they're received
// see the mongo.ChangeStream documentation for more examples of using change streams
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
// NewConsumer
}
然后创建一个新的消费者或者.SubscribeTopics()在你更新你的集合并且它符合你的标准时调用