Microservices in Golang - Part5

内容提要:事件驱动 & NATS

原作者:Ewan Valentine

原文链接:https://ewanvalentine.io/microservices-in-golang-part-5/

引言

上文中,我们接触到了用户认证与 JWT。在这篇文章中,我们将会使用到 go-micro 的服务代理功能。

正如之前文章中所提到的,go-micro 是一个可拔插式的架构,它可以使用很多常用开源插件,参见 plugins repo,你可以看到它所支持的插件。

在本节中,我们会用到 NATS 代理插件。

事件驱动

事件驱动 (Event driven architecture) 的核心理解起来很容易,通常我们认为好的软件架构应该是解耦的,即服务之间不应该相互耦合或者依赖。当我们使用如 gRPC 这样的协议时,某些情况下确实是像之前所说的那样 (解耦),比如说我们会将一个请求发送到 go.srv.user-service ,通过服务发现,我们找到 go.srv.user-service 的实际地址。尽管在这样的调用方式中,并没有将请求与实现完全的绑定到一起,但我们还是将请求绑定到了一个叫 go.srv.user-service 的东西上,所以这也不是完完全全解耦合的,我们的代码与服务之间还是有了比较直接的调用。

发布与订阅

所以到底怎样才能使事件驱动架构完完全全的解耦合?为了理解这个,让我们先来看看事件发布与订阅的步骤:服务 A 在完成 任务 X 后通知消息系统 “任务 X 已完成“服务 A 并不需要知道或者说根本不在意谁在监听这个事件、事件的发生造成了什么影响。此时就只剩客户端的事了。

客户端服务现在只需要监听这个事件,意味着这里需要一个“调解人”作为中间件来接受这些事件并且通知所有正在监听的客户端,这些事件发布了。

本文工作

通过一下两张图,应该可以比较清晰的理解系统架构上的改变。

Highly Coupled Model

gRPC 一般实现下,user-service 代码中需要实例化另外两个微服务 Client ,以此调用函数发送邮件和短信,这种实现的耦合度比较高。

Event-Driven model

事件驱动 的架构下,user-service 只需要向 NATS 发送一条 topic 为 “user.created” 的信息,其他两个订阅了此 topic 的客户端就能知道此时有用户注册了,拿到用户的信息后它们会自行发送邮件、发送短信。

本文中我们就需要创建一个事件,来实现上述 topic 触发信息发送的效果,这里暂时先不实现邮件部分,仅作模拟。

代码实现

引入 NATS 插件

首先我们需要做的是将 NATS 代理插件集成到我们的 user-service 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
// user-service/main.go
func main() {
...
// Init 方法会解析所有命令行参数
srv.Init()

// 生成默认 broker 实例
pubsub := srv.Server().Options().Broker

// 注册 handler
pb.RegisterUserServiceHandler(srv.Server(), &service{repo, tokenService, pubsub})
...
}

译者注:

另外,我们通过设置环境变量 GO_MICRO_BROKER 的方法来设定 go-micro 中所使用的消息代理插件。


事件发布

现在需要在创建新用户的时候发布一个事件 (完整代码) :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// user-service/handler.go
const topic = "user.created"

type service struct {
repo Repository
tokenService Authable
PubSub broker.Broker
}
...

func (srv *service) Create(ctx context.Context, req *pb.User, res *pb.Response) error {
// 对密码进行哈希
hashedPass, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
if err != nil {
return err
}
req.Password = string(hashedPass)
if err := srv.repo.Create(req); err != nil {
return err
}
res.User = req
if err := srv.publishEvent(req); err != nil {
return err
}
return nil
}

func (srv *service) publishEvent(user *pb.User) error {
// Marshal 序列化 JSON 字符串
body, err := json.Marshal(user)
if err != nil {
return err
}

// 创建创建事件消息
msg := &broker.Message{
Header: map[string]string{
"id": user.Id,
},
Body: body,
}

// 发布消息到消息代理中
if err := srv.PubSub.Publish(topic, msg); err != nil {
log.Printf("[pub] failed: %v", err)
}

return nil
}
...

首先确保你正在运行 Postgres ,之后来运行新的 user-service :

1
2
3
$ docker run -d -p 5432:5432 postgres
$ make build
$ make run

事件订阅与邮件服务

现在让我们来创建邮件服务,我创建了一个新的仓库.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// email-service/main.go
package main

import (
"encoding/json"
"log"

pb "github.com/EwanValentine/shippy-user-service/proto/user"
micro "github.com/micro/go-micro"
"github.com/micro/go-micro/broker"
_ "github.com/micro/go-plugins/broker/nats"
)

const topic = "user.created"

func main() {
srv := micro.NewService(
micro.Name("go.micro.srv.email"),
micro.Version("latest"),
)

srv.Init()

// 通过环境变量获取代理信息
pubsub := srv.Server().Options().Broker
if err := pubsub.Connect(); err != nil {
log.Fatal(err)
}

// 订阅消息,定义消息的回调函数,对消息进行反序列化
_, err := pubsub.Subscribe(topic, func(e broker.Event) error {
var user *pb.User
if err := json.Unmarshal(e.Message().Body, &user); err != nil {
return err
}
log.Println(user)
go sendEmail(user)
return nil
})

if err != nil {
log.Println(err)
}

// Run the server
if err := srv.Run(); err != nil {
log.Println(err)
}
}

func sendEmail(user *pb.User) error {
log.Println("Sending email to:", user.Name)
return nil
}

译者注:

Change Publication to Event #569 中将 broker 中的 Publication 改为 Event,注意修改代码

在运行这个之前,我们需要运行 NATS.

1
$ docker run -d -p 4222:4222 nats

另外记得先把 user-service 运行起来,接下来像之前一样构建并运行服务

1
$ make build && make run

注:

记得在 user-service/Makefile 以及 email-service/Makefile 中加入 nats 的环境变量 MICRO_BROKER=natsMICRO_BROKER_ADDRESS=0.0.0.0:4222

最后使用 user-cli 创建一个新的用户,观察 email-service 的输出:

nats

email-service 隐式地接收到了 user.created 事件,尽管这个例子很简单,但是希望你能够从中了解到如何写出低耦合的服务。


micro.Option

我想理解 go-micro 这个框架的一些运作原理对于理解整个程序还是很有帮助的,所以在还是简单的说明一下 go-micro 中的一些机制,看这一段代码:

1
2
srv.Init()
pubsub := srv.Server().Options().Broker

当我们在 go-micro 中创建一个新的服务的时候,srv.Init() 会开始加载诸如插件、环境变量、命令行参数等配置,这些配置之后会作为服务的一部分来进行初始化。为了在之后能够顺利地使用实例,我们需要从服务实例中获取这些信息。在 srv.Server().Options() 中,你还能找到 TransportRegistry.

译者注:

micro.Option 是控制微服务的关键,目前共有29个 Option 可用。但是这么重要的选项竟然没有一份文档来描述。先解释下 Transport 和 Registry 选项的含义:

micro.Transport(t transport.Transport) Option 用来指定传输协议,默认使用 http ;

micro.Registry(r registry.Registry) Option 指定用于服务发现的注册机制,默认为基于 mDNS 的注册机制。

在我们的示例中,可以找到一个名为 GO_MICRO_BROKER 的环境变量,通过这个变量可以指定代理插件为 NATS,并且创建一个 NATS 的实例。

如果你是通过命令行工具来传入参数,那么你将会用到 cmd.Init(),也可以做到上述同样的效果。


切换消息代理插件

值得一提的是,在 NATS 中使用 JSON 会造成比 gRPC 更多的性能开销,这是由序列化与反序列化 JSON 字符串的开销造成的。但是在一些情况下,这种开销是完全可以接受的。NATS的效率很高,非常适合 ”即发即弃“ (Fire and Forget) 的事件。

也有一些其他的广泛应用的 “消息队列/发布订阅” (queue/pubsub) 技术,同样可以用在 go-micro 中,可以参考 go-micro的支持列表。由于 go-micro 在消息代理和你的代码实现中做了抽象,所以换用消息代理插件并不需要修改你的代码,你只需要将环境变量由 MICRO_BROKER=nats 修改为 MICRO_BROKER=googlepubsub,之后变更一下包引入:

1
2
3
_ "github.com/micro/go-plugins/broker/nats"
// 修改为
_ "github.com/micro/go-plugins/broker/googlepubsub

脱离 go-micro ,NATS 也提供了一个面向 go 语言的 (NATS 本身就是用 Go 写的,所以对 Go 的支持自然非常好).

事件发布如下:

1
2
3
4
nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("user.created", userJsonString)

事件订阅:

1
2
3
4
5
// Simple Async Subscriber
nc.Subscribe("user.created", func(m *nats.Msg) {
user := convertUserString(m.Data)
go sendEmail(user)
})

之前提到使用第三方的消息代理,比如 NATS,会导致微服务之间失去直接利用 protobuf 进行二进制通信的能力,换用 JSON 也确实会在序列化上造成更多的开销,不过 go-micro 对此也有对策。

pubsub 层

go-micro 内置一个 pubsub 层,处于消息代理层的上方,所以无需使用如 NATS 这样的第三方消息代理,最厉害的是,它可以使用 protobuf 定义,这就意味着,我们又可以在微服务之间使用低延迟的二进制流了。

所以现在再来更新一下 user-service,替换掉现用的 NATS 代理,开始使用 go-micro 的发布订阅:

1
2
3
4
5
6
7
8
9
// user-service/main.go
func main() {
...
publisher := micro.NewPublisher("user.created", srv.Client())

// Register handler
pb.RegisterUserServiceHandler(srv.Server(), &service{repo, tokenService, publisher})
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// user-service/handler.go
func (srv *service) Create(ctx context.Context, req *pb.User, res *pb.Response) error {

// 对密码进行哈希
hashedPass, err := bcrypt.GenerateFromPassword([]byte(req.Password), bcrypt.DefaultCost)
if err != nil {
return err
}
req.Password = string(hashedPass)

// 新的 publisher 代码更简洁
if err := srv.repo.Create(req); err != nil {
return err
}
res.User = req
if err := srv.Publisher.Publish(ctx, req); err != nil {
return err
}
return nil
}

接下来是邮件服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// email-service/main.go
const topic = "user.created"

type Subscriber struct{}

func (sub *Subscriber) Process(ctx context.Context, user *pb.User) error {
log.Println("Picked up a new message")
log.Println("Sending email to:", user.Name)
return nil
}

func main() {
...
micro.RegisterSubscriber(topic, srv.Server(), new(Subscriber))
...
}

现在我们微服务底层就可以使用用户定义 protobuf 利用 gRPC 进行通信了,没有用任何第三方的插件,太棒了!

看一下运行情况:

Built-in Pubsub

注:

你可能看到图中依然用了 MICRO_BROKER=nats ,这个其实可以删除的。

总结

到此结束了! 接下来的教程我们将研究如何为我们的服务创建一个用户界面,并看看 Web 客户端如何能够与我们的服务进行交互。

任何漏洞,错误或者反馈,欢迎你通过邮件[告诉我](mailto: ewan.valentine89@gmail.com)。


觉得这系列文章对您有帮助,可以请原作者喝杯咖啡,Cheers! https://monzo.me/ewanvalentine.

或者通过 Patreon 来支持原作者。