• 运筹系列75:LKH核心代码的python实现


    1. mst1tree模块

    这个模块是输入距离矩阵c和pi值,给出mst1tree。
    有两种方式:

    from scipy.sparse import csr_matrix
    from scipy.sparse.csgraph import minimum_spanning_tree 
    # mst算法可以替换为自己实现
    
    # 根据LKH的c代码实现
    def getLT1(c,pi):
        c = (c.T+pi).T+pi
        sec_dist = np.hstack(np.sort(c)[:,2:3]) # 这里可以优化
        mst = minimum_spanning_tree(csr_matrix(c))
        r = mst.toarray()
        v = np.sum(abs(r)>0,axis=0)+np.sum(abs(r)>0,axis=1)-2
        return np.sum(r)-2*sum(pi)+max((v<0)*sec_dist)
    
    # 这种方式是根据LKH report实现
    def getLT2(c,pi):
        max_v = 0
        for k in range(len(c)):
            trans = np.hstack([k,np.arange(k),np.arange(k+1,len(distA))])
            nc = c[trans,:][:,trans]
            nc = (nc.T+pi).T+pi
            i,j = np.argpartition(nc[0][1:], 2)[:2] 
            mst = minimum_spanning_tree(csr_matrix(nc[1:,1:]))
            r = mst.toarray()
            v = np.sum(r)+nc[0][i+1]+nc[0][j+1]-2*np.sum(pi)
            if v>max_v:max_v = v
        return max_v
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    其中minimum_spanning_tree模块,可以使用prim算法自己实现,cpu上性能和scipy差不多。

    from numba import njit
    @njit
    def prim(matrix):
        n = len(matrix)
        non_matrix = np.zeros_like(matrix)
        v = np.zeros(n)
        v[0] = 1
        while sum(v) < n:
            mst_len = np.inf
            for i in np.arange(n):
                if v[i] > 0:
                    for j in np.arange(i,n):
                        if v[j] == 0:
                            if matrix[i][j] < mst_len:
                                mst_len = matrix[i][j]
                                bi = i
                                bj = j
            v[bj] = 1
            non_matrix[bi][bj] = matrix[bi][bj]
        return non_matrix
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    2. ascent模块

    这个模块使用梯度下降法给出最后的pi值,首先给出个简单的版本:

    def ascent_simple(c):
        pi = np.zeros(len(c))
        t = np.ones(len(c))
        best_w, v = getLT(c, pi)
        period = max(len(c)//2, 100)
        for k in range(period):
            w, v = getLT(c, pi+t*v)
            if sum(abs(v)) == 0:
                return w, pi+t*v
            elif w > best_w:
                best_w = w
                pi += t*v
            else:
                t /= 2
        return best_w, pi
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    接下来是按照LKH_report的逻辑进行补全的版本,如下。这里去掉了initialPhase的判断。

    def ascent(c):
        pi = np.zeros(len(c), int)
        t = 1
        best_w, v = getLT(c, pi)
        best_deg = np.sum(np.abs(v)) #np.sum(v*v)
        last_v = v
        period = max(len(c)//2, 100)
        while period > 0 and t > 0.5:
            period_continue = False
            for k in range(period):
                for i in range(len(v)):
                    if v[i] == 0:
                        last_v[i] = 0
                pi = pi+t*(0.7*v+0.3*last_v)
                last_v = v
                w, v = getLT(c, pi)
                deg = np.sum(np.abs(v))#np.sum(v*v)
                if sum(abs(v)) == 0:
                    logger.info("* T = %d, Period = %d, BestW = %.1f, Norm = %d" % (t, period, w, deg))
                    return w, pi
                elif w > best_w or (w == best_w and deg < best_deg):
                    logger.info("* T = %d, Period = %d, BestW = %.1f, Norm = %d" % (t, period, w, deg))
                    best_w = w
                    best_deg = deg
                    t *= 2
                    if k == period-1:
                        period_continue = True
                else:
                    logger.info("  T = %d, Period = %d, BestW = %.1f, Norm = %d" % (t, period, w,deg))
                    if k > period /2:
                        t = t*3/4
                        break
            if not period_continue:
                t = t/2
                period //= 2
        return best_w, pi
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3. generateCandidate模块

    ascent模块给出来最佳的pi值,接下来用递推方式计算α-nearess值,以及生成candidate set。代码如下:

    def geneCandidate(r, c, first_node):
        # 首先要将树结构用suc和dad两个数组保存下来,mst是用来遍历树的堆栈
        x, y = np.where(r > 0)
        alpha = c.copy()
        beta = np.zeros_like(c)
        for i in range(len(x)):
            beta[x[i]][y[i]] = beta[y[i]][x[i]] = c[y[i]][x[i]]
        neighbour = beta > 0
    
        suc = [0]
        dad = {0: None}
        mst = [0]
        heapq.heapify(mst)
        uv = set(range(len(c)))
        while len(mst) > 0:
            j = mst.pop()
            uv.remove(j)
            for nj in np.where(neighbour[j])[0]:
                if nj in uv:
                    mst.append(nj)
                    dad[nj] = j
                    suc.append(nj)
    
        # 迭带所有的边
        for idx, i in enumerate(suc):
            if i == first_node:
                for jdx, j in enumerate(suc):
                    if i == j or j == first_node:
                        continue
                    if j == dad[i] or i == dad[j]:
                        alpha[i][j] = alpha[j][i] = 0
                    else:
                        second_cost = np.sort(c[i])[2]
                        alpha[i][j] = alpha[j][i] = c[i][j] - second_cost
            else:
                for jdx, j in enumerate(suc):
                    if idx >= jdx or j == first_node:
                        continue
                    if j == dad[i] or i == dad[j]:
                        alpha[i][j] = alpha[j][i] = 0
                    else:
                        beta[i][j] = beta[j][i] = max(beta[i][dad[j]], c[j][dad[j]])
                        alpha[i][j] = alpha[j][i] = c[i][j] - beta[i][j]
        candidate_index = np.argpartition(alpha, max_candicate)[:max_candicate]
        return alpha, candidate_index
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    4. LinKernighan

    这里仅放出简化版本,不使用Candidate,而是按顺序遍历所有点位。

    def LKH(c, t):
        n = len(t)
        max_swaps = len(t) # 在没有优化的情况下,最多允许swap的次数
        for i in range(n): # 探索第一个点位
            nt = t.copy()
            x1 = (nt[i], nt[(i + 1) % n])
            G0 = c[x1[0]][x1[1]]
            for swaps in range(max_swaps):
                nt, G0, G, j = bestOptMove(c, i, G0, nt) # 探索第二个点位
                if G > 0.01:
                    return nt, G
        return t, 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    探索第二个点位的bestOptMove如下:

    def bestOptMove(c, i, G0, t):
        n = len(t)
        best_g2 = -np.inf
        best_j = -1
        for j in np.arange(n): # 第二个点位也是按顺序遍历
            if (i + 1) % n != j and i != j and (i - 1) != j and (j + 1) % n != i:
                y1 = (t[(i + 1) % n], t[(j + 1) % n])
                G1 = G0 - c[y1[0]][y1[1]]
                if G1 > 0: # 探索条件:正收益原则
                    x2 = (t[(j + 1) % n], t[j])
                    y2 = (t[j], t[i])
                    G2 = G1 + c[x2[0]][x2[1]]
                    G = G2 - c[y2[0]][y2[1]]
                    if G > 0.01: # 一旦找到优化的解,立即返回
                        t = TwoOptMove(i, j, t)
                        return t, G2, G, j
                    if G2 > best_g2:
                        best_g2 = G2
                        best_j = j
        # 所有的j都不符合条件时,原样返回
        if best_j == -1: 
            return t, G0, 0, -1
            
        # 如果符合正收益原则的所有j都探索完都没有优化,那么选择其中最佳的j尝试进行swap
        return TwoOptMove(i, best_j, t), best_g2, 0, best_j
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    接下来是数组版本的边交换代码:

    def TwoOptMove(i, j, t):
        n = len(t)
        if i < j:
            t[i + 1:j + 1] = t[i + 1:j + 1][::-1]
        else:
            temp = np.zeros(n + j - i, np.int32)
            temp[:n - i - 1] = t[i + 1:]
            temp[n - i - 1:] = t[:j + 1]
            temp = temp[::-1]
            t[i + 1:] = temp[:n - i - 1]
            t[:j + 1] = temp[n - i - 1:]
        return t
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
  • 相关阅读:
    vue--8.组件传值(provide--inject)、动态组件、缓存组件keep-alive、异步组件
    代码随想录二刷回溯算法-组合问题总结
    企业电子招标采购系统源码Spring Boot + Mybatis + Redis + Layui + 前后端分离 构建企业电子招采平台之立项流程图
    b站黑马JavaScript的Ajax案例代码——评论列表案例
    在 K8s 集群上部署 RabbitMQ 实战
    新王炸:文生视频Sora模型发布,能否引爆AI芯片热潮
    【Spring Security】使用 OncePerRequestFilter 过滤器校验登录过期、请求日志等操作
    基础选择器汇总——标签选择器,类选择器、id选择器、通配符选择器
    Android 12.0 app调用hal层接口功能实现系列二(jni层功能实现)
    面试官:字节流可以处理一切文件为什么还需要字符流呢?
  • 原文地址:https://blog.csdn.net/kittyzc/article/details/125708005