这个示例是一个类似 twitter 的 web 应用程序,使用 Server-Sent Events 来支持实时刷新。
运行
docker-compose up
然后, 浏览 http://localhost:8080
您可以添加自己的帖子或点击按钮获得随机生成的帖子。
无论哪种方式,feeds 列表和 feed 中的帖子都应该是最新的。尝试使用第二个浏览器窗口查看更新。
它是如何工作的
- 可以创建和更新帖子。
- 帖子可以包含标签。
- 每个标签都有自己的 feed,其中包含来自该标签的所有帖子。
- 所有的帖子都存储在 MySQL 中。这就是写模型。
- 所有 feed 都异步更新并存储在 MongoDB 中。这是读模型。
为什么要使用单独的写和读模型?
对于这个示例应用程序,使用多语言持久性(两个数据库引擎)当然有些过头了。我们这样做是为了展示这个技术,以及如何很容易地将它应用到 Watermill。
专用的读模型对于具有高读/写比率的应用程序是一种有用的模式。所有写操作都被原子地应用到写模型(在我们的例子中是 MySQL)。事件处理程序异步更新读模型(我们使用 Mongo)。
读取模型中的数据可以按原样使用。也可以独立于写模型进行扩展。
请记住,要使用此模式,应用程序中必须接受最终的一致性。而且,在大多数用例中,您可能不需要使用它。务实!
SSE Router
SSERouter
在本例中,我们使用 NATS 作为 Pub/Sub,但这可以是 Watermill 支持的任何 Pub/Sub。
sseRouter, err := watermillHTTP.NewSSERouter(
watermillHTTP.SSERouterConfig{
UpstreamSubscriber: router.Subscriber,
ErrorHandler: watermillHTTP.DefaultErrorHandler,
},
router.Logger,
)
Stream Adapters(流适配器)
SSERouterStreamAdapter
GetResponse
ValidateMessage
type StreamAdapter interface {
// GetResponse returns the response to be sent back to client.
// Any errors that occur should be handled and written to `w`, returning false as `ok`.
GetResponse(w http.ResponseWriter, r *http.Request) (response interface{}, ok bool)
// Validate validates if the incoming message should be handled by this handler.
// Typically this involves checking some kind of model ID.
Validate(r *http.Request, msg *message.Message) (ok bool)
}
Validate
func (p postStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
postUpdated := PostUpdated{}
err := json.Unmarshal(msg.Payload, &postUpdated)
if err != nil {
return false
}
postID := chi.URLParam(r, "id")
return postUpdated.OriginalPost.ID == postID
}
true
func (f allFeedsStreamAdapter) Validate(r *http.Request, msg *message.Message) (ok bool) {
return true
}
SSERouterAddHandler
postHandler := sseRouter.AddHandler(PostUpdatedTopic, postStream)
// ...
r.Get("/posts/{id}", postHandler)
Event handlers(事件处理程序)
该示例使用 Watermill 进行所有异步通信,包括 SSE。
发布了以下事件:
PostCreatedFeedUpdatedPostUpdated
前端 app
前端应用程序是使用 Vue.js 和 Bootstrap 构建的。
EventSource
this.es = new EventSource('/api/feeds/' + this.feed)
this.es.addEventListener('data', event => {
let data = JSON.parse(event.data);
this.posts_stream = data.posts;
}, false);
Refs
- watermill.io