mongodb订阅发布模型
by Anshul Sanghi
通过Anshul Sanghi
如何使用Go,GQLgen和MongoDB处理GraphQL订阅 How to handle GraphQL subscriptions with Go, GQLgen and MongoDB使用GraphQL订阅和MongoDB ChangeStreams创建实时数据服务器 (Creating a real-time data server with GraphQL subscriptions and MongoDB ChangeStreams)
If you have used GQLgen in the past, you know that it indeed supports subscription models, but the implementation they use doesn’t exactly work with MongoDB properly.
如果您曾经使用过GQLgen,那么您知道它确实支持订阅模型,但是它们使用的实现不适用于MongoDB。
For those of you who haven’t heard of or used GQLgen yet, it is a go package that essentially generates boilerplate code automatically from your GraphQL schemas and provides you with added functionality like setting up a GraphQL server, etc. We are going to use this extensively for our GraphQL setup, so I suggest you go take a look at it before continuing as I won’t be covering it much here. A good starting point would be this.
对于尚未听说或使用GQLgen的人来说,这是一个go软件包,它实质上从GraphQL模式自动生成样板代码,并为您提供诸如设置GraphQL服务器等附加功能。我们将使用这在我们的GraphQL设置中已经广泛使用,所以我建议您在继续之前先对其进行研究,因为在这里我不会对其进行过多介绍。 一个很好的起点就是这个 。
We are going to build an API that handles creating/querying/updating a user and listens when a user has a new notification via a subscription.
我们将构建一个API,该API可以处理用户的创建/查询/更新,并在用户通过订阅收到新通知时进行侦听。
I had to make some changes to my code as well as the GQLgen generated code to make it work properly, but I’m not really sure if this is the best way to go from a performance perspective and I would love to have any suggestions. This is also not going to cover everything into detail except for the required parts since the post is already long enough as is.
我必须对代码以及GQLgen生成的代码进行一些更改才能使其正常运行,但是我不确定从性能角度来看这是否是最佳方法,我很乐意提出任何建议。 这也不会涵盖所有细节,除了所需的部分,因为帖子已经足够长了。
建立 (Setup)
GOPATHdb
GOPATHGOPATHdb
Next, install the following required packages:
接下来,安装以下必需的软件包:
go get github.com/99designs/gqlgengo get github.com/gorilla/muxgo get github.com/globalsign/mgo
The following packages are required to be installed and are only used by GQLgen internally. We are not going to work with these directly but they are required:
以下软件包是必须安装的,仅由GQLgen内部使用。 我们不会直接使用它们,但是它们是必需的:
go get github.com/pkg/errorsgo get github.com/urfave/cligo get golang.org/x/tools/go/ast/astutilgo get golang.org/x/tools/go/loadergo get golang.org/x/tools/importsgo get gopkg.in/yaml.v2
We are ready to start writing some code :)
我们准备开始编写一些代码了:)
项目设置 (Project Setup)
globalsign/mgo
globalsign/mgo
setup.go
setup.go
package dbimport ( "fmt" "github.com/globalsign/mgo")
var session *mgo.Sessionvar db *mgo.Database
func ConnectDB() { session, err := mgo.Dial("mongodb://localhost:27017,localhost:27018")
if err != nil { fmt.Println(err) } session.SetMode(mgo.Monotonic, true) db = session.DB("subscriptionstest")}
func GetCollection(collection string) *mgo.Collection { return db.C(collection)}
func CloseSession() { session.Close()}
2701727018scriptsgqlgen.go
2701727018scriptsgqlgen.go
// +build ignorepackage mainimport "github.com/99designs/gqlgen/cmd"func main() { cmd.Execute()}
This is just required to run the generator and so we are going to exclude it from our build.
这只是运行生成器所必需的,因此我们将其从构建中排除。
usersschema.graphql
usersschema.graphql
schema { query: Query mutation: Mutation}type Query { user(id: ID!): User!}
type Mutation { createUser(input: NewUser!): User! updateUser(input: UpdateUser!): User! updateNotification(input: UpdateNotification): User!}
type User { id: ID! first: String! last: String! email: String! notifications: [Notification!]!}
type Notification { id: ID! seen: Boolean! text: String! title: String!}
input NewUser { email: String!}input UpdateUser { id: ID! first: String last: String email: String}input UpdateNotification { id: ID! userID: ID! seen: Boolean!}
type Subscription { notificationAdded(id: ID!): User!}
Now, navigate to the users folder from the command line and run
现在,从命令行导航到users文件夹并运行
go run ../scripts/gqlgen.go init
resolver.gogenerated.gomodels_gen.gogqlgen.yml
resolver.gogenerated.gomodels_gen.gogqlgen.yml
resolver.gomodels_gen.gobson:"_id"
resolver.gomodels_gen.gobson:"_id"
type User struct { ID string `json:"id" bson:"_id"` First string `json:"first"` Last string `json:"last"` Email string `json:"email"` Notifications []Notification `json:"notifications"`}
Now, let’s quickly set up the basic resolvers without going into much detail. You’ll notice that at the top of the file, you’ll see some code similar to this:
现在,让我们快速设置基本的解析器,而无需太详细。 您会注意到,在文件顶部,您将看到一些类似于以下的代码:
type Resolver struct{}func (r *Resolver) Mutation() MutationResolver { return &mutationResolver{r}}func (r *Resolver) Query() QueryResolver { return &queryResolver{r}}func (r *Resolver) Subscription() SubscriptionResolver { return &subscriptionResolver{r}}
We are going to replace it with this:
我们将用以下内容替换它:
type Resolver struct { users *mgo.Collection}func New() Config { return Config{ Resolvers: &Resolver{ users: db.GetCollection("users"), }, }}func (r *Resolver) Mutation() MutationResolver { r.users = db.GetCollection("users") return &mutationResolver{r}}func (r *Resolver) Query() QueryResolver { r.users = db.GetCollection("users") return &queryResolver{r}}func (r *Resolver) Subscription() SubscriptionResolver { r.users = db.GetCollection("users") return &subscriptionResolver{r}}
New
New
Let’s quickly set up our basic resolvers.
让我们快速设置基本解析器。
CreateUser解析器 (CreateUser Resolver)
func (r *mutationResolver) CreateUser(ctx context.Context, input NewUser) (User, error) { var user User count, err := r.users.Find(bson.M{"email": input.Email}).Count() if err != nil { return User{}, err } else if count > 0 { return User{}, errors.New("user with that email already exists") } err = r.users.Insert(bson.M{"email": input.Email,}) if err != nil { return User{}, err } err = r.users.Find(bson.M{"email": input.Email}).One(&user) if err != nil { return User{}, err } return user, nil}
UpdateUser解析器 (UpdateUser Resolver)
func (r *mutationResolver) UpdateUser(ctx context.Context, input UpdateUser) (User, error) { var fields = bson.M{} var user User update := false if input.First != nil && *input.First != "" { fields["first"] = *input.First update = true } if input.Last != nil && *input.Last != "" { fields["last"] = *input.Last update = true } if input.Email != nil && *input.Email != "" { fields["email"] = *input.Email update = true } if !update { return User{}, errors.New("no fields present for updating data") } err := r.users.UpdateId(bson.ObjectIdHex(input.ID), fields) if err != nil { return User{}, err } err = r.users.Find(bson.M{"_id": bson.ObjectIdHex(input.ID)}).One(&user) if err != nil { return User{}, err }
user.ID = bson.ObjectId(user.ID).Hex()
return user, nil}
UpdateNotification解析器 (UpdateNotification Resolver)
func (r *mutationResolver) UpdateNotification(ctx context.Context, input *UpdateNotification) (User, error) { var user User var oid = bson.ObjectIdHex(input.UserID) if err := r.users.Find(bson.M{"_id": oid}).One(&user); err != nil { return User{}, err } for index, val := range user.Notifications { if bson.ObjectId(val.ID).Hex() == input.ID { val.Seen = input.Seen user.Notifications[index] = val break } } if err := r.users.UpdateId(oid, user); err != nil { return User{}, err } return user, nil}
QueryUser解析器 (QueryUser Resolver)
func (r *queryResolver) User(ctx context.Context, id string) (User, error) { var user User if err := r.users.FindId(bson.ObjectIdHex(id)).One(&user); err != nil { return User{}, err } user.ID = bson.ObjectId(user.ID).Hex() return user, nil}
Now that we are done with the setup, let’s move on to the main part.
现在我们完成了设置,让我们继续进行主要部分。
具有ChangeStreams的MongoDB实时数据 (MongoDB Real-time Data With ChangeStreams)
MongoDB now supports real-time data similar to firebase starting from version 3.6. The setup isn’t as easy though. There are a few important prerequisites for change streams to work properly:
从版本3.6开始,MongoDB现在支持类似于firebase的实时数据。 设置不是那么容易。 要使变更流正常工作,需要满足一些重要的先决条件:
Here’s what our method signature for NotificationAdded Resolver would look like:
这是NotificationAdded Resolver的方法签名如下所示:
func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string) (<-chan User, error) { panic("not implemented")}
There’s a problem with this implementation and we’ll need to change it slightly to make it work properly. But first, let’s look at the code required within the resolver which will also make it easier for us to understand why the change was required.
此实现存在问题,我们需要对其进行一些更改以使其正常运行。 但是首先,让我们看一下解析器中所需的代码,这也将使我们更容易理解为什么需要进行更改。
userDocchange
userDocchange
var userDoc Uservar change bson.Mcs, err := r.users.Watch([]bson.M{}, mgo.ChangeStreamOptions{MaxAwaitTimeMS: time.Hour, FullDocument: mgo.FullDocument("updateLookup")})
if err != nil { return err}if cs.Err() != nil { fmt.Println(err)}
Here, we are watching for changes in the user collection. We are also setting the timeout for ChangeStream as 1 hour. This is required to keep the change stream alive and not close automatically. We are also going to need the full document that was changed and so we define that setting in the ChangeStreamOptions as well. The watch function returns a cursor which we can then iterate over.
在这里,我们正在监视用户集合中的更改。 我们还将ChangeStream的超时设置为1小时。 这是使更改流保持活动状态并且不会自动关闭的必需条件。 我们还将需要已更改的完整文档,因此我们也在ChangeStreamOptions中定义该设置。 watch函数返回一个游标,然后可以对其进行迭代。
goroutine
goroutine
go func() { start := time.Now() for { ok := cs.Next(&change) if ok { byts, _ := bson.Marshal(change["fullDocument"].(bson.M)) bson.Unmarshal(byts, &userDoc) userDoc.ID = bson.ObjectId(userDoc.ID).Hex() if userDoc.ID == id { *userChan <- userDoc } } if time.Since(start).Minutes() >= 60 { break } continue }}()
cursor.Next()change
cursor.Next()change
type User
type User
This is also a good time to discuss the method signature for this method. Once again, you’d have something like this:
这也是讨论该方法的方法签名的好时机。 再一次,您将得到如下所示的内容:
func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string) (<-chan User, error) { ...}
generated.go
generated.go
type SubscriptionResolver interface { NotificationAdded(ctx context.Context, id string) (<-chan User, error)}
func (ec *executionContext) _Subscription_notificationAdded(ctx context.Context, field graphql.CollectedField) func() graphql.Marshaler { rawArgs := field.ArgumentMap(ec.Variables) args, err := field_Subscription_notificationAdded_args(rawArgs) if err != nil { ec.Error(ctx, err) return nil } ctx = graphql.WithResolverContext(ctx, &graphql.ResolverContext{ Field: field, }) rctx := ctx results, err := ec.resolvers.Subscription().NotificationAdded(rctx, args["id"].(string)) if err != nil { ec.Error(ctx, err) return nil } return func() graphql.Marshaler { res, ok := <-results if !ok { return nil } var out graphql.OrderedMap out.Add(field.Alias, func() graphql.Marshaler { return ec._User(ctx, field.Selections, &res) }()) return &out }}
The returned channel is then read by the generated code to get the updates and pass it on to our client. The problem is, once we return the channel from our resolver, that function execution is already over. Basically meaning that the channel would never receive any values here.
然后,返回的通道将由生成的代码读取以获取更新并将其传递给我们的客户端。 问题是,一旦我们从解析器返回通道,函数执行就结束了。 基本上意味着该通道在此永远不会接收任何值。
On the flip side, if values were added to the channel before returning it from the function, we are essentially going to have to wait an hour for all the updates to be pushed to the client since we are waiting an hour for the change streams to timeout (provided that we use a non-goroutine implementation for our ChangeStream cursor). It’s clear that this is not an ideal situation. Let’s make some changes to the above code to make it work for us.
在另一方面,如果在从函数返回通道之前将值添加到通道,则由于要等待一个小时的变更流发送到客户端,我们实际上将不得不等待一个小时才能将所有更新推送到客户端。超时(前提是我们对ChangeStream游标使用了非goroutine实现)。 显然,这不是理想的情况。 让我们对以上代码进行一些更改以使其对我们有用。
I’m first going to define a channel in the _Subscription_notificationAdded method whose pointer will then be passed to our resolver. It would look something like this:
我首先要在_Subscription_notificationAdded方法中定义一个通道,然后将其指针传递给我们的解析器。 它看起来像这样:
func (ec *executionContext) _Subscription_notificationAdded(ctx context.Context, field graphql.CollectedField) func() graphql.Marshaler { rawArgs := field.ArgumentMap(ec.Variables) args, err := field_Subscription_notificationAdded_args(rawArgs) if err != nil { ec.Error(ctx, err) return nil } ctx = graphql.WithResolverContext(ctx, &graphql.ResolverContext{ Field: field, })
userChan := make(chan User, 1) rctx := ctx go ec.resolvers.Subscription().NotificationAdded(rctx, args["id"].(string), &userChan)
return func() graphql.Marshaler { res, ok := <-userChan if !ok { return nil } var out graphql.OrderedMap out.Add(field.Alias, func() graphql.Marshaler { return ec._User(ctx, field.Selections, &res) }()) return &out }}
We are creating a new channel with a limit of 1 item at a time for performance reasons. We are then passing its pointer to our resolver and also making the call to this resolver a goroutine.
由于性能原因,我们正在创建一个新通道,一次限制为1个项目。 然后,我们将其指针传递给解析器,并使对该解析器的调用成为goroutine。
userChan
userChan
We also need to change the method signature for the method we just modified, we need to change
我们还需要为刚刚修改的方法更改方法签名,我们需要更改
type SubscriptionResolver interface { NotificationAdded(ctx context.Context, id string) (<-chan User, error)}
to
至
type SubscriptionResolver interface { NotificationAdded(ctx context.Context, id string, userChan *chan User) error}
That’s all the modification we need. Once that’s done, here’s what the complete NotificationAdded Subscription Resolver would look like:
这就是我们需要的所有修改。 完成后,完整的NotificationAdded Subscription Resolver如下所示:
func (r *subscriptionResolver) NotificationAdded(ctx context.Context, id string, userChan *chan User) error { var userDoc User var change bson.M cs, err := r.users.Watch([]bson.M{}, mgo.ChangeStreamOptions{MaxAwaitTimeMS: time.Hour, FullDocument: mgo.FullDocument("updateLookup")}) if err != nil { return err } if cs.Err() != nil { fmt.Println(err) } go func() { start := time.Now() for { ok := cs.Next(&change) if ok { byts, _ := bson.Marshal(change["fullDocument"].(bson.M)) bson.Unmarshal(byts, &userDoc) userDoc.ID = bson.ObjectId(userDoc.ID).Hex() if userDoc.ID == id { *userChan <- userDoc } } if time.Since(start).Minutes() >= 60 { break } continue } }() return nil}
Now the code that is sending an item to the channel and the one that is receiving it are both non-blocking and running in the background.
现在,将项目发送到通道的代码和正在接收项目的代码都是非阻塞的,并且在后台运行。
Phew! That was a lot of work but that was all the heavy lifting that we had to do. Let’s move on to the fun part and create a server and see the result of our efforts.
! 那是很多工作,但这是我们要做的所有繁重的工作。 让我们继续进行有趣的部分,创建服务器并查看我们的努力结果。
有趣的东西 (The fun stuff)
main.go
main.go
package main
import ( "fmt" "github.com/gorilla/mux" "github.com/gorilla/websocket" "github.com/rs/cors" "log" "net/http" "os" "github.com/99designs/gqlgen/handler" "<project path relative to GOPATH>/users" "<project path relative to GOPATH>/db")
const defaultPort = "8080"func main() { port := os.Getenv("PORT")if port == "" { port = defaultPort}
db.ConnectDB()
c := cors.New(cors.Options{ AllowedOrigins: []string{"http://localhost:" + port}, AllowCredentials: true,})r := mux.NewRouter()r.Handle("/", handler.Playground("User", "/users"))r.Handle("/users", c.Handler(handler.GraphQL(users.NewExecutableSchema(users.New()), handler.WebsocketUpgrader(websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, }))),)http.Handle("/", r)log.Fatal(http.ListenAndServe(":8080", nil))}
GQLgen provides us with some built-in handlers like Playground and WebsocketUpgrader which essentially creates a UI for testing our GraphQL server and for having a WebSocket connection with the clients.
GQLgen为我们提供了一些内置的处理程序,例如Playground和WebsocketUpgrader,它们实质上创建了一个UI来测试GraphQL服务器以及与客户端建立WebSocket连接。
Newusers.Config{Resolvers: &users.Resolvers{}}users
Newusers.Config{Resolvers: &users.Resolvers{}}users
At this point, we are ready to start our GraphQL server and test things out.
至此,我们准备启动GraphQL服务器并进行测试。
go build
go build
创建用户 (Create User)
更新用户 (Update User)
查询用户 (Query User)
通知已添加订阅 (NotificationAdded Subscription)
And there you have it!
在那里,您拥有了!
I once again, I want to stress that this might not be the optimal solution to the problem at hand, but it’s my take on a possible solution and I would love to have your feedback and suggestions on this.
我要再次强调,这可能不是解决当前问题的最佳方法,但这是我的一种可能的解决方案,我很乐意收到您对此的反馈和建议。
Thanks for reading. A few ? are always appreciated ?
谢谢阅读。 一些 ? 总是欣赏?
mongodb订阅发布模型