在实现负载均衡之前,我们还需要进行一个小优化
因为在使用负载均衡的时候会启动相同服务的多个实例,之前我们都是将端口配置在yaml中
如果多个服务启动的时候还使用端口配置的方案,会导致端口冲突
所以我们需要先进行优化,可以动态获取可用端口
package utils
import (
"net"
)
func GetFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}
package main
import (
"fmt"
"github.com/spf13/viper"
"web_api/user_web/global"
"web_api/user_web/initialize"
"web_api/user_web/utils"
"github.com/gin-gonic/gin/binding"
ut "github.com/go-playground/universal-translator"
"github.com/go-playground/validator/v10"
"go.uber.org/zap"
myvalidator "web_api/user_web/validator"
)
func main() {
//1. 初始化logger
initialize.InitLogger()
//2. 初始化配置文件
initialize.InitConfig()
//3. 初始化routers
Router := initialize.Routers()
//4. 初始化翻译
if err := initialize.InitTrans("zh"); err != nil {
panic(err)
}
//5. 初始化srv的连接
initialize.InitSrvConn()
viper.AutomaticEnv()
//如果是本地开发环境端口号固定,线上环境启动获取端口号
debug := viper.GetBool("DEV_CONFIG")
if !debug {
port, err := utils.GetFreePort()
if err == nil {
global.ServerConfig.Port = port
}
}
//。。。省略
}
package utils
import (
"net"
)
func GetFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}
package main
import (
"flag"
"fmt"
"github.com/hashicorp/consul/api"
"go.uber.org/zap"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"nd/user_srv/global"
"nd/user_srv/handler"
"nd/user_srv/initialize"
"nd/user_srv/proto"
"nd/user_srv/utils"
"net"
"google.golang.org/grpc"
)
func main() {
IP := flag.String("ip", "0.0.0.0", "ip地址")
Port := flag.Int("port", 0, "端口号") // 这个修改为0,如果我们从命令行带参数启动的话就不会为0
//初始化
initialize.InitLogger()
initialize.InitConfig()
initialize.InitDB()
zap.S().Info(global.ServerConfig)
flag.Parse()
zap.S().Info("ip: ", *IP)
if *Port == 0 {
*Port, _ = utils.GetFreePort()
}
zap.S().Info("port: ", *Port)
//。。。省略
}

a、集中式LB:即在服务的消费方和提供方之间使用独立的LB设施(可以是硬件,如F5, 也可以是软件,如nginx),由该设施负责把访问请求通过某种策略转发至服务的提供方

b、进程内LB:将LB逻辑集成到消费方,消费方从服务注册中心获知有哪些地址可用,然后自己再从这些地址中选择出一个合适的服务器。

调用过程

c、独立进程LB:该方案是针对第二种方案的不足提出的一种折中方案,原理和第二种方案基本类似,不同之处是,它将LB和服务发现功能从进程内移出来,变成主机上的一个独立进程,主机上的一个或者多个服务要访问目标服务时,他们都通过同一主机上的独立LB进程做服务发现和负载均衡(简单的理解就是,将LB也部署成服务)


package main
import (
"flag"
"fmt"
"github.com/hashicorp/consul/api"
"github.com/satori/go.uuid"
"go.uber.org/zap"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"nd/user_srv/global"
"nd/user_srv/handler"
"nd/user_srv/initialize"
"nd/user_srv/proto"
"nd/user_srv/utils"
"net"
"os"
"os/signal"
"syscall"
"google.golang.org/grpc"
)
func main() {
IP := flag.String("ip", "0.0.0.0", "ip地址")
Port := flag.Int("port", 0, "端口号") // 这个修改为0,如果我们从命令行带参数启动的话就不会为0
//初始化
initialize.InitLogger()
initialize.InitConfig()
initialize.InitDB()
zap.S().Info(global.ServerConfig)
flag.Parse()
zap.S().Info("ip: ", *IP)
if *Port == 0 {
*Port, _ = utils.GetFreePort()
}
zap.S().Info("port: ", *Port)
server := grpc.NewServer()
proto.RegisterUserServer(server, &handler.UserServer{})
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", *IP, *Port))
if err != nil {
panic("failed to listen:" + err.Error())
}
//注册服务健康检查
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
//服务注册
cfg := api.DefaultConfig()
cfg.Address = fmt.Sprintf("%s:%d", global.ServerConfig.ConsulInfo.Host,
global.ServerConfig.ConsulInfo.Port)
client, err := api.NewClient(cfg)
if err != nil {
panic(err)
}
//生成对应的检查对象
check := &api.AgentServiceCheck{
GRPC: fmt.Sprintf("192.168.91.1:%d", *Port),
Timeout: "5s",
Interval: "5s",
DeregisterCriticalServiceAfter: "15s",
}
//生成注册对象
registration := new(api.AgentServiceRegistration)
registration.Name = global.ServerConfig.Name
serviceID := fmt.Sprintf("%s", uuid.NewV4())
registration.ID = serviceID
registration.Port = *Port
registration.Tags = []string{"imooc", "bobby", "user", "srv"}
registration.Address = "192.168.91.1"
registration.Check = check
err = client.Agent().ServiceRegister(registration)
if err != nil {
panic(err)
}
go func() {
err = server.Serve(lis)
if err != nil {
panic("failed to start grpc:" + err.Error())
}
}()
//接收终止信号
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
if err = client.Agent().ServiceDeregister(serviceID); err != nil {
zap.S().Info("注销失败")
}
zap.S().Info("注销成功")
}
func (s *UserServer) GetUserList(ctx context.Context, req *proto.PageInfo) (*proto.UserListResponse, error) {
//获取用户列表
var users []model.User
result := global.DB.Find(&users)
if result.Error != nil {
return nil, result.Error
}
fmt.Println(time.Now().Format("2006-01-02 15:04:05.000"), "用户列表")
rsp := &proto.UserListResponse{}
rsp.Total = int32(result.RowsAffected)
global.DB.Scopes(Paginate(int(req.Pn), int(req.PSize))).Find(&users)
for _, user := range users {
userInfoRsp := ModelToResponse(user)
rsp.Data = append(rsp.Data, &userInfoRsp)
}
return rsp, nil
}

package main
import (
"context"
"fmt"
"google.golang.org/grpc/credentials/insecure"
"log"
"test/proto"
_ "github.com/mbobakov/grpc-consul-resolver" // It's important
"google.golang.org/grpc"
)
func main() {
conn, err := grpc.Dial(
"consul://192.168.91.129:8500/user_srv?wait=14s&tag=srv",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
for i := 0; i < 10; i++ {
userSrvClient := proto.NewUserClient(conn)
rsp, err := userSrvClient.GetUserList(context.Background(), &proto.PageInfo{
Pn: 1,
PSize: 2,
})
if err != nil {
panic(err)
}
for index, data := range rsp.Data {
fmt.Println(index, data)
}
}
}



package initialize
import (
"fmt"
"github.com/hashicorp/consul/api"
_ "github.com/mbobakov/grpc-consul-resolver" // It's important
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"web_api/user_web/global"
"web_api/user_web/proto"
)
func InitSrvConn() {
consulInfo := global.ServerConfig.ConsulInfo
userConn, err := grpc.Dial(
fmt.Sprintf("consul://%s:%d/%s?wait=14s", consulInfo.Host, consulInfo.Port, global.ServerConfig.UserSrvInfo.Name),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy": "round_robin"}`),
)
if err != nil {
zap.S().Fatal("[InitSrvConn] 连接 【用户服务失败】")
}
userSrvClient := proto.NewUserClient(userConn)
global.UserSrvClient = userSrvClient
}



