grpc使用etcd服务发现
运行效果
两个服务端, 一个客户端. 客户端发起请求, 服务器端通过etcd发现负载均衡相应.
初始化项目
- docker部署etcd
$ docker pull bitnami/etcd
$ docker run -d --name Etcd-server \
--network app-tier \
--publish 2379:2379 \
--publish 2380:2380 \
--env ALLOW_NONE_AUTHENTICATION=yes \
--env ETCD_ADVERTISE_CLIENT_URLS=http://etcd-server:2379 \
bitnami/etcd:latest
这里就不展开细说部署了
参考:Docker Hub
- 创建server、client文件夹和proto文件
$ ls
client makefile README.md server user.proto
- 初始化server、client
$ cd client/
$ go mod init m2dd.top/etcd-grpc/client
$ cd ../server/
$ go mod init m2dd.top/etcd-grpc/server
$ cd ..
$ go work init ./client/
$ go work use ./server/
go work 是go1.18的一种新依赖管理模式, 我这里为了独立依赖又在同一个文件夹内使用了go work,不用也没关系,但是vscode会爆红
- makefile (protoc生成代码)
client-codegen:
protoc --go_out=./client/rpc --go_opt=paths=source_relative \
--go-grpc_out=./client/rpc --go-grpc_opt=paths=source_relative \
user.proto
server-codegen:
protoc --go_out=./server/rpc --go_opt=paths=source_relative \
--go-grpc_out=./server/rpc --go-grpc_opt=paths=source_relative \
user.proto
code-gen: client-codegen server-codegen
@echo "生成sever client代码成功"
- protobuf文件
syntax = "proto3";
option go_package=".;rpc;";
message UserResponse{
string id = 1;
string name = 2;
string account = 3;
string passowrd = 4;
}
message UserRequest{
string id = 1;
}
message RegisterRequest{
string account = 1;
string password = 2;
}
message RegisterResponse{
string name = 1;
}
service User{
rpc GetUser(UserRequest) returns (UserResponse){}
rpc Register(RegisterRequest) returns (UserResponse){}
}
核心逻辑
使用etcd作为服务发现,则client向etcd发起连接获取真正请求,再由grpc的负载均衡策略去进行访问.
流程图
核心代码
- etcd
etcd依赖和全局变量
package etcd
import (
"context"
"errors"
"fmt"
"log"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints"
)
const (
etcdUrl = "http://localhost:2379"
serverName = "etcd-m2/server"
ttl = 10
)
var cli *clientv3.Client
这里发现了官网一个错误,顺便提交了一个pr
地址
etcd注册
// etcd注册
func Register(addr string) error {
log.Printf("etcdRegister %s\b", addr)
cli, cerr := clientv3.NewFromURL(etcdUrl)
if cerr != nil {
return errors.New("etcd connect error")
}
lease, _ := cli.Grant(context.TODO(), ttl)
//服务注册
em, _ := endpoints.NewManager(cli, serverName)
err := em.AddEndpoint(context.TODO(), serverName+"/"+addr, endpoints.Endpoint{
Addr: addr,
}, clientv3.WithLease(lease.ID))
if err != nil {
return errors.New("Register error")
}
//存活检测
alive, err := cli.KeepAlive(context.TODO(), lease.ID)
if err != nil {
return err
}
//一直监听存活状态
go func() {
for {
<-alive
fmt.Println("etcd server keep alive")
}
}()
return nil
}
etcd注销
func UnRegister(addr string) error {
log.Printf("etcdUnRegister %s\b", addr)
if cli != nil {
em, _ := endpoints.NewManager(cli, serverName)
err := em.DeleteEndpoint(context.TODO(), serverName+"/"+addr)
if err != nil {
return errors.New("UnRegister error")
}
}
return nil
}
- 启动
var addr string
func main() {
etcdRegister()
lis, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
// 日志中间件
grpcserver := grpc.NewServer(grpc.UnaryInterceptor(LogUnaryInterceptor()))
rpc.RegisterUserServer(grpcserver, &service.UserService{})
if err := grpcserver.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
//监听程序运行状态,不在运行则立马下线
func etcdRegister() {
var port int
flag.IntVar(&port, "port", 8801, "port")
flag.Parse()
addr = fmt.Sprintf("localhost:%d", port)
//关闭信号处理
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
s := <-ch
etcd.UnRegister(addr)
if i, ok := s.(syscall.Signal); ok {
os.Exit(int(i))
} else {
os.Exit(0)
}
}()
err := etcd.Register(addr)
if err != nil {
panic(err)
}
}
//日志
func LogUnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println("call ", info.FullMethod)
resp, err = handler(ctx, req)
return resp, err
}
}
service代码就不贴出来了,比较简单的测试服务
- 客户端
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/google/uuid"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/resolver"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/credentials/insecure"
"m2dd.top/etcd-grpc/client/rpc"
)
const (
etcdUrl = "http://localhost:2379"
serverName = "etcd-m2/server"
ttl = 10
)
func main() {
cli, cerr := clientv3.NewFromURL(etcdUrl)
if cerr != nil {
panic(cerr)
}
etcdResolver, err := resolver.NewBuilder(cli)
if err != nil {
panic(err)
}
//服务发现核心代码, 以及负载均衡
conn, gerr := grpc.Dial(fmt.Sprintf("etcd:///%s", serverName), grpc.WithResolvers(etcdResolver), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)))
if gerr != nil {
panic(gerr)
}
defer conn.Close()
serverClient := rpc.NewUserClient(conn)
//循环请求
for {
response, err := serverClient.GetUser(context.TODO(), &rpc.UserRequest{
Id: []byte(uuid.NewString()),
})
if err != nil {
log.Println("ERROR :", err)
return
}
log.Println(response, conn.Target())
time.Sleep(1 * time.Second)
}
}
grpc负载均衡
round_robin
: 轮询, 我采用的负载均衡方式,会正常分担在不同的服务之间.
pick_first
: 选第一个, 找第一个能正常访问的服务,一直发请求,直到这个服务不能使用,才会去调用其他服务.
grpclb
: 已经被弃用了, 使用也只是找其中一个服务进行访问,并不会负载均衡.
参考链接
go_grpc : https://grpc.io/docs/languages/go/quickstart/
dockerhub: https://hub.docker.com/r/bitnami/etcd
gRPC naming and discovery | etcd: https://etcd.io/docs/v3.5/dev-guide/grpc_naming/
grpc/load-balancing.md at master:https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
video: https://www.bilibili.com/video/BV1Je4y1y74Z
Q.E.D.