• go 语言 负载均衡 为反向代理添加负载均衡 拓展ReverseProxy


    随机负载

    package random
    
    import (
    	"errors"
    	"math/rand"
    )
    
    type RandomBalance struct {
    	//当前索引
    	curIndex int
    	//存储负载均衡的地址
    	rss []string
    	//观察主体
    	//conf LoadBalanceConf
    }
    
    func (r *RandomBalance) Add(params ...string) error {
    	if len(params) == 0 {
    		return errors.New("param len 1 at least")
    	}
    	addr := params[0]
    	r.rss = append(r.rss, addr)
    	return nil
    }
    
    // Next 随机到下一个index
    func (r *RandomBalance) Next() string {
    	if len(r.rss) == 0 {
    		return ""
    	}
    	r.curIndex = rand.Intn(len(r.rss))
    	return r.rss[r.curIndex]
    }
    
    // Get 拿到下一个地址
    func (r *RandomBalance) Get(key string) (string, error) {
    	return r.Next(), nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    轮询负载

    package round
    
    import "errors"
    
    type RoundRobinBalance struct {
    	curIndex int
    	rss      []string
    	//观察主体
    	//conf LoadBalanceConf
    }
    
    func (r *RoundRobinBalance) Add(params ...string) error {
    	if len(params) == 0 {
    		return errors.New("param len 1 at least")
    	}
    	addr := params[0]
    	r.rss = append(r.rss, addr)
    	return nil
    }
    
    func (r *RoundRobinBalance) Next() string {
    	if len(r.rss) == 0 {
    		return ""
    	}
    	lens := len(r.rss)
    	if r.curIndex >= lens {
    		r.curIndex = 0
    	}
    	curAddr := r.rss[r.curIndex]
    	r.curIndex = (r.curIndex + 1) % lens
    	return curAddr
    }
    
    func (r *RoundRobinBalance) Get(key string) (string, error) {
    	return r.Next(), nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    加权负载

    package weight
    
    import (
    	"errors"
    	"strconv"
    )
    
    type WeightRoundRobinBalance struct {
    	curIndex int
    	rss      []*WeightNode
    	rsw      []int
    	//观察主体
    	//conf LoadBalanceConf
    
    }
    
    type WeightNode struct {
    	addr            string
    	weight          int //权重值
    	currentWeight   int //节点当前权重
    	effectiveWeight int //有效权重
    }
    
    func (r *WeightRoundRobinBalance) Add(params ...string) error {
    	if len(params) != 2 {
    		return errors.New("param len need 2")
    	}
    	parInt, err := strconv.ParseInt(params[1], 10, 64)
    	if err != nil {
    		return err
    	}
    	node := &WeightNode{addr: params[0], weight: int(parInt)}
    	node.effectiveWeight = node.weight
    	r.rss = append(r.rss, node)
    	return nil
    }
    
    func (r *WeightRoundRobinBalance) Next() string {
    	total := 0
    	var best *WeightNode
    	for i := 0; i < len(r.rss); i++ {
    		w := r.rss[i]
    		//step 1 统计所以有效权重之和
    		total += w.effectiveWeight
    		//step 2 变更节点 临时权重
    		w.currentWeight += w.effectiveWeight
    
    		//step 3 有效权重默认与权重相同,通讯异常时-1, 通讯成功+1,直到恢复到weight大小
    		if w.effectiveWeight < w.weight {
    			w.effectiveWeight++
    		}
    		//step 4 选择最大临时权重点节点
    		if best == nil || w.currentWeight > best.currentWeight {
    			best = w
    		}
    	}
    	if best == nil {
    		return ""
    	}
    	//step 5 变更临时权重为 临时权重-有效权重之和
    	best.currentWeight -= total
    	return best.addr
    }
    func (r *WeightRoundRobinBalance) Get(key string) (string, error) {
    	return r.Next(), nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    在这里插入图片描述
    节点有效权重:故障一次,权重减一 为了检测服务器故障准备的
    在这里插入图片描述

    加权轮询过程

    在这里插入图片描述

    一致性哈希负载

    package consist_hash
    
    import (
    	"errors"
    	"hash/crc32"
    	"sort"
    	"strconv"
    	"sync"
    )
    
    type Hash func(data []byte) uint32
    
    type UInt32Slice []uint32
    
    func (s UInt32Slice) Len() int {
    	return len(s)
    }
    
    func (s UInt32Slice) Less(i, j int) bool {
    	return s[i] < s[j]
    }
    
    func (s UInt32Slice) Swap(i, j int) {
    	s[i], s[j] = s[j], s[i]
    }
    
    type ConsistentHashBanlance struct {
    	mux      sync.RWMutex
    	hash     Hash
    	replicas int               //复制因子
    	keys     UInt32Slice       //已排序的节点hash切片
    	hashMap  map[uint32]string //节点哈希和Key的map,键是hash值,值是节点key
    
    	//观察主体
    	//conf LoadBalanceConf
    }
    
    func NewConsistentHashBanlance(replicas int, fn Hash) *ConsistentHashBanlance {
    	m := &ConsistentHashBanlance{
    		replicas: replicas,
    		hash:     fn,
    		hashMap:  make(map[uint32]string),
    	}
    	if m.hash == nil {
    		//最多32位,保证是一个2^32-1环
    		m.hash = crc32.ChecksumIEEE
    	}
    	return m
    }
    
    // 验证是否为空
    func (c *ConsistentHashBanlance) IsEmpty() bool {
    	return len(c.keys) == 0
    }
    
    // Add 方法用来添加缓存节点,参数为节点key,比如使用IP
    func (c *ConsistentHashBanlance) Add(params ...string) error {
    	if len(params) == 0 {
    		return errors.New("param len 1 at least")
    	}
    	addr := params[0]
    	c.mux.Lock()
    	defer c.mux.Unlock()
    	// 结合复制因子计算所有虚拟节点的hash值,并存入m.keys中,同时在m.hashMap中保存哈希值和key的映射
    	for i := 0; i < c.replicas; i++ {
    		hash := c.hash([]byte(strconv.Itoa(i) + addr))
    		c.keys = append(c.keys, hash)
    		c.hashMap[hash] = addr
    	}
    	// 对所有虚拟节点的哈希值进行排序,方便之后进行二分查找
    	sort.Sort(c.keys)
    	return nil
    }
    
    // Get 方法根据给定的对象获取最靠近它的那个节点
    func (c *ConsistentHashBanlance) Get(key string) (string, error) {
    	if c.IsEmpty() {
    		return "", errors.New("node is empty")
    	}
    	hash := c.hash([]byte(key))
    
    	// 通过二分查找获取最优节点,第一个"服务器hash"值大于"数据hash"值的就是最优"服务器节点"
    	idx := sort.Search(len(c.keys), func(i int) bool { return c.keys[i] >= hash })
    
    	// 如果查找结果 大于 服务器节点哈希数组的最大索引,表示此时该对象哈希值位于最后一个节点之后,那么放入第一个节点中
    	if idx == len(c.keys) {
    		idx = 0
    	}
    	c.mux.RLock()
    	defer c.mux.RUnlock()
    	return c.hashMap[c.keys[idx]], nil
    }
    
    //func (c *ConsistentHashBanlance) SetConf(conf LoadBalanceConf) {
    //	c.conf = conf
    //}
    //
    //func (c *ConsistentHashBanlance) Update() {
    //	if conf, ok := c.conf.(*LoadBalanceZkConf); ok {
    //		fmt.Println("Update get conf:", conf.GetConf())
    //		c.keys = nil
    //		c.hashMap = nil
    //		for _, ip := range conf.GetConf() {
    //			c.Add(strings.Split(ip, ",")...)
    //		}
    //	}
    //	if conf, ok := c.conf.(*LoadBalanceCheckConf); ok {
    //		fmt.Println("Update get conf:", conf.GetConf())
    //		c.keys = nil
    //		c.hashMap = map[uint32]string{}
    //		for _, ip := range conf.GetConf() {
    //			c.Add(strings.Split(ip, ",")...)
    //		}
    //	}
    //}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115

    请求访问指定的IP

    • 单调性
    • 平衡性 可以引入虚拟结点 拷贝服务器结点
    • 分散性
      在这里插入图片描述
      在这里插入图片描述

    为反向代理添加负载均衡

    使用工厂方法

    // LoadBalanceFactory 负载均衡的工厂模式
    func LoadBalanceFactory(lbType LbType) LoadBalance {
    	switch lbType {
    	case LbRandom:
    		return &random.RandomBalance{}
    	case LbRoundRobin:
    		return &round.RoundRobinBalance{}
    	case LbWeightRoundRobin:
    		return &weight.WeightRoundRobinBalance{}
    	case LbConsistentHash:
    		return consist_hash.NewConsistentHashBanlance(10, nil)
    	default:
    		return &random.RandomBalance{}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    使用接口统一封装

    type LoadBalance interface {
    	Add(...string) error
    	Get(string) (string, error)
    
    	// Update 后期服务发现补充
    	//Update()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    package main
    
    import (
    	"bytes"
    	"gateway-detail/load_balance/lb_factory"
    	"io"
    	"log"
    	"net"
    	"net/http"
    	"net/http/httputil"
    	"net/url"
    	"strconv"
    	"strings"
    	"time"
    )
    
    var (
    	addr      = "127.0.0.1:2224"
    	transport = &http.Transport{
    		DialContext: (&net.Dialer{
    			Timeout:   30 * time.Second, //连接超时
    			KeepAlive: 30 * time.Second, //长连接超时时间
    		}).DialContext,
    		MaxIdleConns:          100,              //最大空闲连接
    		IdleConnTimeout:       90 * time.Second, //空闲超时时间
    		TLSHandshakeTimeout:   10 * time.Second, //tls握手超时时间
    		ExpectContinueTimeout: 1 * time.Second,  //100-continue状态码超时时间
    	}
    )
    
    func NewMultipleHostsReverseProxy(lb lb_factory.LoadBalance) *httputil.ReverseProxy {
    	// 请求协调者
    	director := func(req *http.Request) {
    		nextAddr, err := lb.Get(req.RemoteAddr)
    		if err != nil {
    			log.Println(err.Error())
    			log.Fatal("get next addr fail")
    
    		}
    		target, err := url.Parse(nextAddr)
    		if err != nil {
    			log.Fatal(err)
    		}
    		//改造req
    		targetQuery := target.RawQuery
    		req.URL.Scheme = target.Scheme
    		req.URL.Host = target.Host
    		req.URL.Path = singleJoiningSlash(target.Path, req.URL.Path)
    		if targetQuery == "" || req.URL.RawQuery == "" {
    			req.URL.RawQuery = targetQuery + req.URL.RawQuery
    		} else {
    			req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
    		}
    		if _, ok := req.Header["User-Agent"]; !ok {
    			req.Header.Set("User-Agent", "user-agent")
    		}
    	}
    
    	//更改内容
    	modifyFunc := func(resp *http.Response) error {
    		//请求以下命令:curl 'http://127.0.0.1:2002/error'
    		if resp.StatusCode != 200 {
    			//获取内容
    		}
    		//追加内容
    		oldPayload, err := io.ReadAll(resp.Body)
    		if err != nil {
    			return err
    		}
    		newPayload := []byte("Hello :" + string(oldPayload))
    		resp.Body = io.NopCloser(bytes.NewBuffer(newPayload))
    		resp.ContentLength = int64(len(newPayload))
    		resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))
    		return nil
    	}
    	//错误回调 :关闭real_server时测试,错误回调
    	//范围:transport.RoundTrip发生的错误、以及ModifyResponse发生的错误
    	errFunc := func(w http.ResponseWriter, r *http.Request, err error) {
    		//todo 如果是权重的负载则调整临时权重
    		http.Error(w, "ErrorHandler error:"+err.Error(), 500)
    	}
    	return &httputil.ReverseProxy{Director: director, Transport: transport, ModifyResponse: modifyFunc, ErrorHandler: errFunc}
    }
    func singleJoiningSlash(a, b string) string {
    	aslash := strings.HasSuffix(a, "/")
    	bslash := strings.HasPrefix(b, "/")
    	switch {
    	case aslash && bslash:
    		return a + b[1:]
    	case !aslash && !bslash:
    		return a + "/" + b
    	}
    	return a + b
    }
    
    func main() {
    	rb := lb_factory.LoadBalanceFactory(lb_factory.LbConsistentHash)
    	if err := rb.Add("http://127.0.0.1:2003/base", "10"); err != nil {
    		log.Println(err)
    	}
    	if err := rb.Add("http://127.0.0.1:2004/base", "50"); err != nil {
    		log.Println(err)
    	}
    	proxy := NewMultipleHostsReverseProxy(rb)
    	log.Println("Starting httpserver at " + addr)
    	log.Fatal(http.ListenAndServe(addr, proxy))
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
  • 相关阅读:
    javascript中依次输出元素并不断循环实现echarts柱图动画效果
    基于windows WSL安装Docker Desktop,修改默认安装到C盘及默认下载镜像到C盘
    使用SSH反向转发服务器上的请求到个人电脑
    python中的code object
    javascript(2)高级
    Web前端—盒子模型:选择器、PxCook、盒子模型、正则表达式、综合案例(产品卡片与新闻列表)
    “扫一扫,不一定是二维码” ScanCan GitHub开源项目发起
    【ES6标准入门】JavaScript中的模块Module的加载实现:循环加载和Node加载,非常详细,建议收藏!!!
    docker安装tomcat
    嵌入式面试常见问题(二)
  • 原文地址:https://blog.csdn.net/weixin_43898670/article/details/133098596