grpc使用etcd服务发现

运行效果

两个服务端, 一个客户端. 客户端发起请求, 服务器端通过etcd发现负载均衡相应.

grpcetcddiscover

初始化项目

  • docker部署etcd
$ docker pull bitnami/etcd
$ docker run -d --name Etcd-server \
    --network app-tier \
    --publish 2379:2379 \
    --publish 2380:2380 \
    --env ALLOW_NONE_AUTHENTICATION=yes \
    --env ETCD_ADVERTISE_CLIENT_URLS=http://etcd-server:2379 \
    bitnami/etcd:latest

这里就不展开细说部署了

参考:Docker Hub

  • 创建server、client文件夹和proto文件
$ ls
client  makefile  README.md  server  user.proto
  • 初始化server、client
$ cd client/
$ go mod init m2dd.top/etcd-grpc/client
$ cd ../server/
$ go mod init m2dd.top/etcd-grpc/server
$ cd ..
$ go work init ./client/
$ go work use ./server/

go work 是go1.18的一种新依赖管理模式, 我这里为了独立依赖又在同一个文件夹内使用了go work,不用也没关系,但是vscode会爆红

  • makefile (protoc生成代码)
client-codegen:
    protoc --go_out=./client/rpc --go_opt=paths=source_relative \
    --go-grpc_out=./client/rpc  --go-grpc_opt=paths=source_relative \
    user.proto
server-codegen: 
    protoc --go_out=./server/rpc --go_opt=paths=source_relative \
    --go-grpc_out=./server/rpc  --go-grpc_opt=paths=source_relative \
    user.proto
code-gen: client-codegen server-codegen
    @echo "生成sever client代码成功" 
  • protobuf文件
syntax = "proto3";

option go_package=".;rpc;";

message UserResponse{
  string id = 1;
  string name = 2;
  string account = 3;
  string passowrd = 4;
}

message UserRequest{
  string id = 1;
}

message RegisterRequest{
  string account = 1;
  string password = 2;
}

message RegisterResponse{
  string name = 1;
}

service User{
  rpc GetUser(UserRequest) returns (UserResponse){}
  rpc Register(RegisterRequest) returns (UserResponse){}
} 

核心逻辑

使用etcd作为服务发现,则client向etcd发起连接获取真正请求,再由grpc的负载均衡策略去进行访问.

流程图

etcdgrpcdiscover

核心代码

  • etcd

etcd依赖和全局变量

package etcd

import (
    "context"
    "errors"
    "fmt"
    "log"

    clientv3 "go.etcd.io/etcd/client/v3"
    "go.etcd.io/etcd/client/v3/naming/endpoints"
)

const (
    etcdUrl    = "http://localhost:2379"
    serverName = "etcd-m2/server"
    ttl        = 10
)

var cli *clientv3.Client

这里发现了官网一个错误,顺便提交了一个pr地址

etcd注册

//  etcd注册
func Register(addr string) error {
    log.Printf("etcdRegister %s\b", addr)
    cli, cerr := clientv3.NewFromURL(etcdUrl)
    if cerr != nil {
        return errors.New("etcd connect error")
    }
    lease, _ := cli.Grant(context.TODO(), ttl)
    //服务注册
    em, _ := endpoints.NewManager(cli, serverName)
    err := em.AddEndpoint(context.TODO(), serverName+"/"+addr, endpoints.Endpoint{
        Addr: addr,
    }, clientv3.WithLease(lease.ID))
    if err != nil {
        return errors.New("Register error")
    }
    //存活检测
    alive, err := cli.KeepAlive(context.TODO(), lease.ID)
    if err != nil {
        return err
    }
    //一直监听存活状态
    go func() {
        for {
            <-alive
            fmt.Println("etcd server keep alive")
        }
    }()
    return nil
}

etcd注销

func UnRegister(addr string) error {
    log.Printf("etcdUnRegister %s\b", addr)
    if cli != nil {
        em, _ := endpoints.NewManager(cli, serverName)
        err := em.DeleteEndpoint(context.TODO(), serverName+"/"+addr)
        if err != nil {
            return errors.New("UnRegister error")
        }
    }
    return nil
}
  • 启动
var addr string

func main() {
	etcdRegister()
	lis, err := net.Listen("tcp", addr)
	if err != nil {
		panic(err)
	}
	// 日志中间件
	grpcserver := grpc.NewServer(grpc.UnaryInterceptor(LogUnaryInterceptor()))
	rpc.RegisterUserServer(grpcserver, &service.UserService{})
	if err := grpcserver.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

//监听程序运行状态,不在运行则立马下线
func etcdRegister() {
	var port int
	flag.IntVar(&port, "port", 8801, "port")
	flag.Parse()
	
	addr = fmt.Sprintf("localhost:%d", port)
	//关闭信号处理
	ch := make(chan os.Signal, 1)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGHUP, syscall.SIGQUIT)
	go func() {
		s := <-ch
		etcd.UnRegister(addr)
		if i, ok := s.(syscall.Signal); ok {
			os.Exit(int(i))
		} else {
			os.Exit(0)
		}
	}()
	err := etcd.Register(addr)
	if err != nil {
		panic(err)
	}
}
//日志
func LogUnaryInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		log.Println("call ", info.FullMethod)
		resp, err = handler(ctx, req)
		return resp, err
	}
}

service代码就不贴出来了,比较简单的测试服务

  • 客户端
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/google/uuid"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/naming/resolver"
	"google.golang.org/grpc"
	"google.golang.org/grpc/balancer/roundrobin"
	"google.golang.org/grpc/credentials/insecure"
	"m2dd.top/etcd-grpc/client/rpc"
)

const (
	etcdUrl    = "http://localhost:2379"
	serverName = "etcd-m2/server"
	ttl        = 10
)

func main() {
	cli, cerr := clientv3.NewFromURL(etcdUrl)
	if cerr != nil {
		panic(cerr)
	}
	etcdResolver, err := resolver.NewBuilder(cli)
	if err != nil {
		panic(err)
	}
	//服务发现核心代码, 以及负载均衡
	conn, gerr := grpc.Dial(fmt.Sprintf("etcd:///%s", serverName), grpc.WithResolvers(etcdResolver), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)))
	if gerr != nil {
		panic(gerr)
	}

	defer conn.Close()
	serverClient := rpc.NewUserClient(conn)
	//循环请求
	for {
		response, err := serverClient.GetUser(context.TODO(), &rpc.UserRequest{
			Id: []byte(uuid.NewString()),
		})
		if err != nil {
			log.Println("ERROR :", err)
			return
		}
		log.Println(response, conn.Target())
		time.Sleep(1 * time.Second)
	}
}

grpc负载均衡

round_robin : 轮询, 我采用的负载均衡方式,会正常分担在不同的服务之间.

pick_first: 选第一个, 找第一个能正常访问的服务,一直发请求,直到这个服务不能使用,才会去调用其他服务.

grpclb: 已经被弃用了, 使用也只是找其中一个服务进行访问,并不会负载均衡.

参考链接

go_grpc : https://grpc.io/docs/languages/go/quickstart/

dockerhub: https://hub.docker.com/r/bitnami/etcd

gRPC naming and discovery | etcd: https://etcd.io/docs/v3.5/dev-guide/grpc_naming/

grpc/load-balancing.md at master:https://github.com/grpc/grpc/blob/master/doc/load-balancing.md

video: https://www.bilibili.com/video/BV1Je4y1y74Z

Q.E.D.