通过编写案例,研究gRPC的负载均衡策略,包括默认的 pick_first、手动配置的round_robin轮询、还有手动实现的加权轮询策略。
什么是负载均衡
负载均衡的目的:
- 分布式请求: 将客户端请求分散到多个后端服务器。
- 防止过载: 避免单一服务器压力过大。
- 可伸缩性: 系统可以通过增加服务器来横向扩展。
负载均衡的职责:
- 输入:从名称解析器 (Name Resolver) 获取一个服务器 IP 地址列表。
- 维护连接 (Subchannels): 负责客户端与这些服务器地址建立和维护 gRPC 连接(称为子通道 Subchannel)。
- 选择连接策略 (Picker): 当一个 RPC 被发送时,负载均衡策略的 Picker (选择器) 决定使用哪个子通道 (即哪个后端服务器) 来发送这个 RPC。
gRPC负载均衡
gRPC 的一个关键特性是负载均衡,它允许将来自客户端的请求分布到多个服务器上。这有助于防止任何一台服务器过载,并允许系统通过添加更多服务器来扩展。
gRPC 负载均衡策略由名称解析器(name resolver)提供一组服务器IP地址列表。该策略负责维护与服务器的连接(子通道),并在发送 RPC 时选择要使用的连接。
默认情况下,gRPC将使用 pick_first 策略。此策略实际上不进行负载均衡,而只是尝试从名称解析器获取的每个地址,并使用第一个可以连接的地址。通过更新 gRPC 服务配置,还可以切换到 round_robin 策略,该策略会连接到它获取的每个地址,并在每个 RPC 连接的后端之间轮流进行连接。还有一些其他可用的负载均衡策略,但具体设置因语言而异。如果内置策略无法满足需求,还可以实现自己的自定义策略。
接下来我们将通过具体的example来了解pick_first、round_robin和自定义gRPC负载均衡策略。 代码结构如下
grpc-lb-demo/
├── go.mod
├── proto/
│ ├── greet.pb.go
│ ├── greet.proto
│ └── greet_grpc.pb.go
├── server/
│ └── server.go
├── client_pick_first/
│ └── client.go
├── client_round_robin/
│ └── client.go
├── client_custom_lb/
│ ├── client.go
│ └── weighted_round_robin_lb/
│ └── weighted_round_robin_lb.go
| └── weighted_resolver.go
└── resolver/
└── resolver.go
pick_first
首先我们测试默认的pick_first策略。通过测试,我们需要知道客户端是否会自动选择一个可用的地址,并且后续的请求一直使用该地址。
proto实现
该proto定义连个客户端请求的消息体:EchoRequest,服务端响应的消息体:EchoResponse,以及一个服务端需要实现的Echo方法。
syntax = "proto3";
option go_package = "./proto";
package proto;
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
service Echo {
rpc Echo(EchoRequest) returns (EchoResponse) {}
}
使用protoc生成对应的go代码。
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
proto/greet.proto
Server端
server端我们实现如下代码,我们启动两个服务端口50051和50052,通过在server struct里面传入一个addr用来判断是哪个服务端口处理的请求,通过将这个addr返回给客户端,我们就能知道客户端请求的哪个地址了。Echo方法正是用来响应客户端消息的。
具体代码如下:
package main
import (
"context"
"fmt"
"log"
"net"
"os"
"os/signal"
"syscall"
pb "grpc-lb-example/proto"
"google.golang.org/grpc"
)
var addrs = []string{"localhost:50050", "localhost:50051"}
type server struct {
pb.UnimplementedEchoServer
addr string
}
func (s *server) Echo(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil
}
func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
for _, addr := range addrs {
go startServer(addr)
}
<-stop
log.Println("Shutting down servers...")
}
func startServer(addr string) {
lis, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
log.Printf("Server listening at %v", lis.Addr())
s := grpc.NewServer()
pb.RegisterEchoServer(s, &server{addr: addr})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
解析器实现
客户端想要获取多个地址,需要实现一个名称解析。这里写了三个地址backendAddr1、backendAddr2、backendAddr3。这个三个地址只有backendAddr2和backendAddr3可用。我们看一下client端会不会自动选择出这个可用的地址连接Server,并且只使用其中一个。
package custom_resolver
import (
"log"
"google.golang.org/grpc/resolver"
)
const (
myScheme = "example"
myServiceName = "my-custom-service:1234"
backendAddr1 = "127.0.0.1:1"
backendAddr2 = "127.0.0.1:50051"
backendAddr3 = "127.0.0.1:50052"
)
type myResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
}
type myResolverBuilder struct{}
func (*myResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
r := &myResolver{
target: target,
cc: cc,
addrsStore: map[string][]string{
myServiceName: {backendAddr1, backendAddr2, backendAddr3},
},
}
r.start()
return r, nil
}
func (*myResolverBuilder) Scheme() string { return myScheme }
func (r *myResolver) start() {
addrStrs := r.addrsStore[r.target.Endpoint()]
addrs := make([]resolver.Address, len(addrStrs))
for i, s := range addrStrs {
addrs[i] = resolver.Address{Addr: s}
}
err := r.cc.UpdateState(resolver.State{Addresses: addrs})
if err != nil {
log.Fatalf("UpdateState failed: %v", err)
}
}
func (r *myResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (r *myResolver) Close() {}
func init() {
resolver.Register(&myResolverBuilder{})
}
client端实现
client传入的地址就是解析器里的地址
package main
import (
"context"
"log"
"time"
pb "grpc-lb-example/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
// 重要:导入自定义解析器包,这会执行其 init() 函数来注册解析器
_ "grpc-lb-example/resolver"
)
const (
address = "example:///my-custom-service:1234"
clientName = "Colin"
)
func main() {
log.Println("Client: Dialing with custom resolver address:", address)
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("Client: Did not connect: %v", err)
}
defer conn.Close()
log.Println("Client: Connected successfully!")
c := pb.NewEchoClient(conn)
for range 10 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
r, err := c.Echo(ctx, &pb.EchoRequest{Message: clientName})
if err != nil {
log.Fatalf("Client: Could not greet: %v", err)
}
log.Printf("Client: Echo from server: %s", r.GetMessage())
time.Sleep(500 * time.Millisecond)
}
}
测试连接
我们运行测试,client跳过了backendAddr1,连上了backendAddr2
[root@debian client]# go run client.go
2025/05/18 17:22:55 Client: Dialing with custom resolver address: example:///my-custom-service:1234
2025/05/18 17:22:55 Client: Connected successfully!
2025/05/18 17:22:55 Client: Echo from server: Colin (from :50051)
2025/05/18 17:22:56 Client: Echo from server: Colin (from :50051)
2025/05/18 17:22:56 Client: Echo from server: Colin (from :50051)
2025/05/18 17:22:57 Client: Echo from server: Colin (from :50051)
2025/05/18 17:22:57 Client: Echo from server: Colin (from :50051)
当我们将resolve里的地址顺序进行调换。将backendAddr2和backendAddr3的值进行调换。
const (
myScheme = "example"
myServiceName = "my-custom-service:1234"
backendAddr1 = "127.0.0.1:1"
backendAddr2 = "127.0.0.1:50052"
backendAddr3 = "127.0.0.1:50051"
)
再次运行
[root@debian client]# go run client.go
2025/05/18 17:27:05 Client: Dialing with custom resolver address: example:///my-custom-service:1234
2025/05/18 17:27:05 Client: Connected successfully!
2025/05/18 17:27:05 Client: Echo from server: Colin (from :50052)
2025/05/18 17:27:05 Client: Echo from server: Colin (from :50052)
2025/05/18 17:27:06 Client: Echo from server: Colin (from :50052)
2025/05/18 17:27:06 Client: Echo from server: Colin (from :50052)
2025/05/18 17:27:07 Client: Echo from server: Colin (from :50052)
这次连上的还是backendAddr2,说明pick_first策略是按照顺序连接第一个可用的地址,并且后续一直使用这个地址。
round_robin
测试round_robin我们只需要在client代码添加一条策略即可
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
//这里增加ound_robin
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
再次运行
[root@debian client]# go run client.go
2025/05/18 17:30:56 Client: Dialing with custom resolver address: example:///my-custom-service:1234
2025/05/18 17:30:56 Client: Connected successfully!
2025/05/18 17:30:56 Client: Echo from server: Colin (from :50051)
2025/05/18 17:30:57 Client: Echo from server: Colin (from :50052)
2025/05/18 17:30:57 Client: Echo from server: Colin (from :50051)
2025/05/18 17:30:58 Client: Echo from server: Colin (from :50052)
2025/05/18 17:30:58 Client: Echo from server: Colin (from :50051)
2025/05/18 17:30:59 Client: Echo from server: Colin (from :50052)
2025/05/18 17:30:59 Client: Echo from server: Colin (from :50051)
2025/05/18 17:31:00 Client: Echo from server: Colin (from :50052)
2025/05/18 17:31:00 Client: Echo from server: Colin (from :50051)
2025/05/18 17:31:01 Client: Echo from server: Colin (from :50052)
发现现在client是轮询了可用的server地址,这就是轮询地址了。
自定义负载均衡
轮询策略roud_robin不能满足因服务器配置不同而承担不同负载量。接下来介绍如何实现加权随机法的自定义负载均衡策略。
加权随机法可以根据服务器的处理能力而分配不同的权重,从而实现处理能力高的服务器可承担更多的请求,处理能力低的服务器少承担请求。
resolver实现
我们需要先实现名称解析,与之前的解析器不同的是,需要为每个地址添加元数据,这个元数据就是权重,这里设置backendAddr2的权重为3,backendAddr1的权重为1。如果配置正确,请求比例讲师1:3。
package weighted_round_robin_lb
import (
"log"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)
func init() {
// 注册带权重的解析器
resolver.Register(&weightedResolverBuilder{})
}
type AddrMetadata struct {
Weight int
}
const (
myScheme = "example"
myServiceName = "my-custom-service:1234"
backendAddr1 = "127.0.0.1:50051"
backendAddr2 = "127.0.0.1:50052"
backendAddr3 = "127.0.0.1:50053"
)
// 权重定义
var weightMap = map[string]int{
backendAddr1: 1, //权重定为1
backendAddr2: 3, //权重定为3
backendAddr3: 0,
}
type weightedResolver struct {
target resolver.Target
cc resolver.ClientConn
addrsStore map[string][]string
}
type weightedResolverBuilder struct{}
// RegisterResolver 注册加权解析器
func RegisterResolver() {
resolver.Register(&weightedResolverBuilder{})
}
func (*weightedResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
r := &weightedResolver{
target: target,
cc: cc,
addrsStore: map[string][]string{
myServiceName: {backendAddr1, backendAddr2, backendAddr3},
},
}
r.start()
return r, nil
}
func (*weightedResolverBuilder) Scheme() string { return myScheme }
func (r *weightedResolver) start() {
addrStrs := r.addrsStore[r.target.Endpoint()]
addrs := make([]resolver.Address, len(addrStrs))
for i, s := range addrStrs {
// 获取权重
weight := weightMap[s]
// 创建权重元数据
meta := AddrMetadata{Weight: weight}
// 创建属性
attrs := attributes.New(AddrMetadata{}, meta)
// 创建地址+属性
addrs[i] = resolver.Address{
Addr: s,
Attributes: attrs,
}
log.Printf("加权解析器: 添加服务地址 %s 权重 %d, Attributes: %v", s, weight, attrs)
}
err := r.cc.UpdateState(resolver.State{Addresses: addrs})
if err != nil {
log.Fatalf("UpdateState失败: %v", err)
}
}
func (r *weightedResolver) ResolveNow(resolver.ResolveNowOptions) {}
func (r *weightedResolver) Close() {}
balancer实现
在init方法中使用
balancer.Register()
,将其注册到 gRPC 全局的负载均衡器构建器注册表中。之后,当 gRPC 客户端的 Service Config 指定使用名为 “weighted_round_robin” 的策略时,gRPC 就会调用这个已注册的 Builder 的 Build 方法来创建一个负载均衡器实例。Register需要传入 balancer.Builder实例。base 包提供的 NewBalancerBuilder 辅助函数。这个函数极大地简化了 balancer.Builder 接口的实现。
NewBalancerBuilder方法需要传入PickerBuilder,需要实现wrrPickerBuilder接口。当 gRPC 需要一个新的 Picker 时,这个实例就会被调用。在这个实例的Picker方法中,我们实现了平滑加权轮询 (Smooth Weighted Round-Robin) 算法。
package weighted_round_robin_lb
import (
"log"
"math"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
)
const (
Name = "weighted_round_robin"
DefaultWeight = 1
)
func init() {
log.Println("WeightedRoundRobinLB: Registering balancer builder with name:", Name)
balancer.Register(newBuilder())
}
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &wrrPickerBuilder{}, base.Config{HealthCheck: false})
}
type wrrPickerBuilder struct{}
func (pb *wrrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): Build called. Have %d Ready SubConns.", len(info.ReadySCs))
if len(info.ReadySCs) == 0 {
log.Println("WeightedRoundRobinLB (wrrPickerBuilder): No ready SubConns. Returning ErrNoSubConnAvailable.")
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}
var subConns []*weightedSubConn
totalWeight := 0
for sc, scInfo := range info.ReadySCs {
weight := DefaultWeight
var fetchedMetadata AddrMetadata
var metadataOk bool
if scInfo.Address.Attributes != nil {
if val := scInfo.Address.Attributes.Value((AddrMetadata{})); val != nil {
fetchedMetadata, metadataOk = val.(AddrMetadata)
}
}
if metadataOk {
log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, successfully retrieved AddrMetadata: %+v", scInfo.Address.Addr, fetchedMetadata)
if fetchedMetadata.Weight > 0 {
weight = fetchedMetadata.Weight
log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, using metadata weight: %d", scInfo.Address.Addr, weight)
} else {
log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, metadata weight is %d (<=0). Using default weight: %d", scInfo.Address.Addr, fetchedMetadata.Weight, DefaultWeight)
}
} else {
// Log why metadata wasn't used
if scInfo.Address.Attributes == nil {
log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, no attributes found. Using default weight: %d", scInfo.Address.Addr, DefaultWeight)
} else {
val := scInfo.Address.Attributes.Value(AddrMetadata{})
if val == nil {
log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, attributes present, but key 'AddrMetadata{}' (or its equivalent) not found. Using default weight: %d", scInfo.Address.Addr, DefaultWeight)
} else {
log.Printf("WeightedRoundRobinLB (wrrPickerBuilder): SubConn %s, attributes present and key 'AddrMetadata{}' (or its equivalent) found, but value is of WRONG TYPE (got %T, expected AddrMetadata). Using default weight: %d", scInfo.Address.Addr, val, DefaultWeight)
}
}
}
subConns = append(subConns, &weightedSubConn{
sc: sc,
address: scInfo.Address.Addr,
weight: weight,
currentWeight: 0, // Initialized to 0 as per smooth WRR
})
totalWeight += weight
}
return &wrrPicker{
subConns: subConns,
totalWeight: totalWeight,
}
}
// weightedSubConn holds a SubConn and its associated weight information for WRR.
type weightedSubConn struct {
sc balancer.SubConn
address string
weight int // Static weight assigned to this SubConn
currentWeight int // Dynamic weight, updated during picking
}
// wrrPicker is a Picker that implements the weighted round-robin algorithm.
type wrrPicker struct {
subConns []*weightedSubConn
mu sync.Mutex
totalWeight int
}
// Pick implements the Picker interface.
func (p *wrrPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.subConns) == 0 {
// log.Println("WeightedRoundRobinLB (wrrPicker): Pick - No SubConns available. Returning ErrNoSubConnAvailable.")
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
var bestSc *weightedSubConn
maxCurrentWeight := math.MinInt
for _, wsc := range p.subConns {
wsc.currentWeight += wsc.weight
if wsc.currentWeight > maxCurrentWeight {
maxCurrentWeight = wsc.currentWeight
bestSc = wsc
}
}
if bestSc == nil {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
bestSc.currentWeight -= p.totalWeight
return balancer.PickResult{SubConn: bestSc.sc, Done: nil}, nil
}
client实现
最后在client中,调用weighted_round_robin策略即可
package main
import (
"context"
"log"
"time"
_ "grpc-lb-example/client_custom_lb/weighted_round_robin_lb"
pb "grpc-lb-example/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
address = "example:///my-custom-service:1234"
clientName = "Colin"
)
func main() {
log.Println("客户端: 使用加权轮询负载均衡器连接服务:", address)
// 创建gRPC连接,指定使用加权轮询负载均衡策略
conn, err := grpc.NewClient(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"weighted_round_robin"}`),
)
if err != nil {
log.Fatalf("客户端: 连接失败: %v", err)
}
defer conn.Close()
log.Println("客户端: 连接成功!")
// 创建gRPC客户端
c := pb.NewEchoClient(conn)
// 发送10次请求
for range 10 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
r, err := c.Echo(ctx, &pb.EchoRequest{Message: clientName})
if err != nil {
log.Fatalf("Client: Could not greet: %v", err)
}
log.Printf("Client: Echo from server: %s", r.GetMessage())
time.Sleep(500 * time.Millisecond)
}
}
测试结果
最后运行client测试。 50051: 50052 = 1: 3。符合我们的加权比例
2025/05/19 12:57:06 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:06 Client: Echo from server: Colin (from localhost:50051)
2025/05/19 12:57:07 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:07 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:08 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:08 Client: Echo from server: Colin (from localhost:50051)
2025/05/19 12:57:09 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:09 Client: Echo from server: Colin (from localhost:50052)
2025/05/19 12:57:10 Client: Echo from server: Colin (from localhost:50052)
自定义负载均衡策略完整代码在: