• dolphinscheduler负载均衡源码


    🐬官网介绍


    官网资料,从1.3开始到3.1都是一样的,源码将以2.0.6版本为例,官网介绍如下:
    在这里插入图片描述

    🐬负载均衡


    可以修改master.properties,指定负载均衡算法:
    在这里插入图片描述
    MasterConfig.java默认为线性负载lowerweight
    在这里插入图片描述
    HostManagerConfig.java根据配置选择算法:
    在这里插入图片描述
    父类CommonHostManager,获取有效的worker列表:
    在这里插入图片描述

    🐠加权随机(random)


    RandomSelector
    RandomHostManager
    AbstractSelector
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    🐠平滑轮询(roundrobin)


    用到了原子类

    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicLong;
    
    • 1
    • 2
    • AtomicBoolean
      在这里插入图片描述

      /*
       * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
       * Written by Doug Lea with assistance from members of JCP JSR-166
       * Expert Group and released to the public domain, as explained at
       * http://creativecommons.org/publicdomain/zero/1.0/
       */
      
      package java.util.concurrent.atomic;
      import sun.misc.Unsafe;
      
      /**
       * A {@code boolean} value that may be updated atomically. See the
       * {@link java.util.concurrent.atomic} package specification for
       * description of the properties of atomic variables. An
       * {@code AtomicBoolean} is used in applications such as atomically
       * updated flags, and cannot be used as a replacement for a
       * {@link java.lang.Boolean}.
       *
       * @since 1.5
       * @author Doug Lea
       */
      public class AtomicBoolean implements java.io.Serializable {
          private static final long serialVersionUID = 4654671469794556979L;
          // setup to use Unsafe.compareAndSwapInt for updates
          private static final Unsafe unsafe = Unsafe.getUnsafe();
          private static final long valueOffset;
      
          static {
              try {
                  valueOffset = unsafe.objectFieldOffset
                      (AtomicBoolean.class.getDeclaredField("value"));
              } catch (Exception ex) { throw new Error(ex); }
          }
      
          private volatile int value;
      
          /**
           * Creates a new {@code AtomicBoolean} with the given initial value.
           *
           * @param initialValue the initial value
           */
          public AtomicBoolean(boolean initialValue) {
              value = initialValue ? 1 : 0;
          }
      
          /**
           * Creates a new {@code AtomicBoolean} with initial value {@code false}.
           */
          public AtomicBoolean() {
          }
      
          /**
           * Returns the current value.
           *
           * @return the current value
           */
          public final boolean get() {
              return value != 0;
          }
      
          /**
           * Atomically sets the value to the given updated value
           * if the current value {@code ==} the expected value.
           *
           * @param expect the expected value
           * @param update the new value
           * @return {@code true} if successful. False return indicates that
           * the actual value was not equal to the expected value.
           */
          public final boolean compareAndSet(boolean expect, boolean update) {
              int e = expect ? 1 : 0;
              int u = update ? 1 : 0;
              return unsafe.compareAndSwapInt(this, valueOffset, e, u);
          }
      
          /**
           * Atomically sets the value to the given updated value
           * if the current value {@code ==} the expected value.
           *
           * 

      May fail * spuriously and does not provide ordering guarantees, so is * only rarely an appropriate alternative to {@code compareAndSet}. * * @param expect the expected value * @param update the new value * @return {@code true} if successful */ public boolean weakCompareAndSet(boolean expect, boolean update) { int e = expect ? 1 : 0; int u = update ? 1 : 0; return unsafe.compareAndSwapInt(this, valueOffset, e, u); } /** * Unconditionally sets to the given value. * * @param newValue the new value */ public final void set(boolean newValue) { value = newValue ? 1 : 0; } /** * Eventually sets to the given value. * * @param newValue the new value * @since 1.6 */ public final void lazySet(boolean newValue) { int v = newValue ? 1 : 0; unsafe.putOrderedInt(this, valueOffset, v); } /** * Atomically sets to the given value and returns the previous value. * * @param newValue the new value * @return the previous value */ public final boolean getAndSet(boolean newValue) { boolean prev; do { prev = get(); } while (!compareAndSet(prev, newValue)); return prev; } /** * Returns the String representation of the current value. * @return the String representation of the current value */ public String toString() { return Boolean.toString(get()); } }

      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
    • AtomicLong
      在这里插入图片描述

      /*
       * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
       * Written by Doug Lea with assistance from members of JCP JSR-166
       * Expert Group and released to the public domain, as explained at
       * http://creativecommons.org/publicdomain/zero/1.0/
       */
      
      package java.util.concurrent.atomic;
      import java.util.function.LongUnaryOperator;
      import java.util.function.LongBinaryOperator;
      import sun.misc.Unsafe;
      
      /**
       * A {@code long} value that may be updated atomically.  See the
       * {@link java.util.concurrent.atomic} package specification for
       * description of the properties of atomic variables. An
       * {@code AtomicLong} is used in applications such as atomically
       * incremented sequence numbers, and cannot be used as a replacement
       * for a {@link java.lang.Long}. However, this class does extend
       * {@code Number} to allow uniform access by tools and utilities that
       * deal with numerically-based classes.
       *
       * @since 1.5
       * @author Doug Lea
       */
      public class AtomicLong extends Number implements java.io.Serializable {
          private static final long serialVersionUID = 1927816293512124184L;
      
          // setup to use Unsafe.compareAndSwapLong for updates
          private static final Unsafe unsafe = Unsafe.getUnsafe();
          private static final long valueOffset;
      
          /**
           * Records whether the underlying JVM supports lockless
           * compareAndSwap for longs. While the Unsafe.compareAndSwapLong
           * method works in either case, some constructions should be
           * handled at Java level to avoid locking user-visible locks.
           */
          static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
      
          /**
           * Returns whether underlying JVM supports lockless CompareAndSet
           * for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
           */
          private static native boolean VMSupportsCS8();
      
          static {
              try {
                  valueOffset = unsafe.objectFieldOffset
                      (AtomicLong.class.getDeclaredField("value"));
              } catch (Exception ex) { throw new Error(ex); }
          }
      
          private volatile long value;
      
          /**
           * Creates a new AtomicLong with the given initial value.
           *
           * @param initialValue the initial value
           */
          public AtomicLong(long initialValue) {
              value = initialValue;
          }
      
          /**
           * Creates a new AtomicLong with initial value {@code 0}.
           */
          public AtomicLong() {
          }
      
          /**
           * Gets the current value.
           *
           * @return the current value
           */
          public final long get() {
              return value;
          }
      
          /**
           * Sets to the given value.
           *
           * @param newValue the new value
           */
          public final void set(long newValue) {
              value = newValue;
          }
      
          /**
           * Eventually sets to the given value.
           *
           * @param newValue the new value
           * @since 1.6
           */
          public final void lazySet(long newValue) {
              unsafe.putOrderedLong(this, valueOffset, newValue);
          }
      
          /**
           * Atomically sets to the given value and returns the old value.
           *
           * @param newValue the new value
           * @return the previous value
           */
          public final long getAndSet(long newValue) {
              return unsafe.getAndSetLong(this, valueOffset, newValue);
          }
      
          /**
           * Atomically sets the value to the given updated value
           * if the current value {@code ==} the expected value.
           *
           * @param expect the expected value
           * @param update the new value
           * @return {@code true} if successful. False return indicates that
           * the actual value was not equal to the expected value.
           */
          public final boolean compareAndSet(long expect, long update) {
              return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
          }
      
          /**
           * Atomically sets the value to the given updated value
           * if the current value {@code ==} the expected value.
           *
           * 

      May fail * spuriously and does not provide ordering guarantees, so is * only rarely an appropriate alternative to {@code compareAndSet}. * * @param expect the expected value * @param update the new value * @return {@code true} if successful */ public final boolean weakCompareAndSet(long expect, long update) { return unsafe.compareAndSwapLong(this, valueOffset, expect, update); } /** * Atomically increments by one the current value. * * @return the previous value */ public final long getAndIncrement() { return unsafe.getAndAddLong(this, valueOffset, 1L); } /** * Atomically decrements by one the current value. * * @return the previous value */ public final long getAndDecrement() { return unsafe.getAndAddLong(this, valueOffset, -1L); } /** * Atomically adds the given value to the current value. * * @param delta the value to add * @return the previous value */ public final long getAndAdd(long delta) { return unsafe.getAndAddLong(this, valueOffset, delta); } /** * Atomically increments by one the current value. * * @return the updated value */ public final long incrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L; } /** * Atomically decrements by one the current value. * * @return the updated value */ public final long decrementAndGet() { return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L; } /** * Atomically adds the given value to the current value. * * @param delta the value to add * @return the updated value */ public final long addAndGet(long delta) { return unsafe.getAndAddLong(this, valueOffset, delta) + delta; } /** * Atomically updates the current value with the results of * applying the given function, returning the previous value. The * function should be side-effect-free, since it may be re-applied * when attempted updates fail due to contention among threads. * * @param updateFunction a side-effect-free function * @return the previous value * @since 1.8 */ public final long getAndUpdate(LongUnaryOperator updateFunction) { long prev, next; do { prev = get(); next = updateFunction.applyAsLong(prev); } while (!compareAndSet(prev, next)); return prev; } /** * Atomically updates the current value with the results of * applying the given function, returning the updated value. The * function should be side-effect-free, since it may be re-applied * when attempted updates fail due to contention among threads. * * @param updateFunction a side-effect-free function * @return the updated value * @since 1.8 */ public final long updateAndGet(LongUnaryOperator updateFunction) { long prev, next; do { prev = get(); next = updateFunction.applyAsLong(prev); } while (!compareAndSet(prev, next)); return next; } /** * Atomically updates the current value with the results of * applying the given function to the current and given values, * returning the previous value. The function should be * side-effect-free, since it may be re-applied when attempted * updates fail due to contention among threads. The function * is applied with the current value as its first argument, * and the given update as the second argument. * * @param x the update value * @param accumulatorFunction a side-effect-free function of two arguments * @return the previous value * @since 1.8 */ public final long getAndAccumulate(long x, LongBinaryOperator accumulatorFunction) { long prev, next; do { prev = get(); next = accumulatorFunction.applyAsLong(prev, x); } while (!compareAndSet(prev, next)); return prev; } /** * Atomically updates the current value with the results of * applying the given function to the current and given values, * returning the updated value. The function should be * side-effect-free, since it may be re-applied when attempted * updates fail due to contention among threads. The function * is applied with the current value as its first argument, * and the given update as the second argument. * * @param x the update value * @param accumulatorFunction a side-effect-free function of two arguments * @return the updated value * @since 1.8 */ public final long accumulateAndGet(long x, LongBinaryOperator accumulatorFunction) { long prev, next; do { prev = get(); next = accumulatorFunction.applyAsLong(prev, x); } while (!compareAndSet(prev, next)); return next; } /** * Returns the String representation of the current value. * @return the String representation of the current value */ public String toString() { return Long.toString(get()); } /** * Returns the value of this {@code AtomicLong} as an {@code int} * after a narrowing primitive conversion. * @jls 5.1.3 Narrowing Primitive Conversions */ public int intValue() { return (int)get(); } /** * Returns the value of this {@code AtomicLong} as a {@code long}. */ public long longValue() { return get(); } /** * Returns the value of this {@code AtomicLong} as a {@code float} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public float floatValue() { return (float)get(); } /** * Returns the value of this {@code AtomicLong} as a {@code double} * after a widening primitive conversion. * @jls 5.1.2 Widening Primitive Conversions */ public double doubleValue() { return (double)get(); } }

      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
      • 162
      • 163
      • 164
      • 165
      • 166
      • 167
      • 168
      • 169
      • 170
      • 171
      • 172
      • 173
      • 174
      • 175
      • 176
      • 177
      • 178
      • 179
      • 180
      • 181
      • 182
      • 183
      • 184
      • 185
      • 186
      • 187
      • 188
      • 189
      • 190
      • 191
      • 192
      • 193
      • 194
      • 195
      • 196
      • 197
      • 198
      • 199
      • 200
      • 201
      • 202
      • 203
      • 204
      • 205
      • 206
      • 207
      • 208
      • 209
      • 210
      • 211
      • 212
      • 213
      • 214
      • 215
      • 216
      • 217
      • 218
      • 219
      • 220
      • 221
      • 222
      • 223
      • 224
      • 225
      • 226
      • 227
      • 228
      • 229
      • 230
      • 231
      • 232
      • 233
      • 234
      • 235
      • 236
      • 237
      • 238
      • 239
      • 240
      • 241
      • 242
      • 243
      • 244
      • 245
      • 246
      • 247
      • 248
      • 249
      • 250
      • 251
      • 252
      • 253
      • 254
      • 255
      • 256
      • 257
      • 258
      • 259
      • 260
      • 261
      • 262
      • 263
      • 264
      • 265
      • 266
      • 267
      • 268
      • 269
      • 270
      • 271
      • 272
      • 273
      • 274
      • 275
      • 276
      • 277
      • 278
      • 279
      • 280
      • 281
      • 282
      • 283
      • 284
      • 285
      • 286
      • 287
      • 288
      • 289
      • 290
      • 291
      • 292
      • 293
      • 294
      • 295
      • 296
      • 297
      • 298
      • 299
      • 300
      • 301
      • 302
      • 303
      • 304
      • 305
      • 306
      • 307
      • 308
      • 309
      • 310
      • 311
      • 312
      • 313
      • 314
      • 315
      • 316
      • 317
      • 318
      • 319
      • 320
      • 321
      • 322
      • 323

    真正的源码RoundRobinSelector…神马玩意!!!!
    在这里插入图片描述

    🐠线性负载(lowerweight)


    默认的负载均衡算法,估计也是真正使用的,另外两个就是凑数的,select方法都是单独写的
    在这里插入图片描述
    ExecutorDispatcher:只有LowerWeightHostManager重写了select(ExecutionContext context)
    在这里插入图片描述

    • LowerWeightRoundRobin
      在这里插入图片描述
      /*
       * Licensed to the Apache Software Foundation (ASF) under one or more
       * contributor license agreements.  See the NOTICE file distributed with
       * this work for additional information regarding copyright ownership.
       * The ASF licenses this file to You under the Apache License, Version 2.0
       * (the "License"); you may not use this file except in compliance with
       * the License.  You may obtain a copy of the License at
       *
       *    http://www.apache.org/licenses/LICENSE-2.0
       *
       * Unless required by applicable law or agreed to in writing, software
       * distributed under the License is distributed on an "AS IS" BASIS,
       * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       * See the License for the specific language governing permissions and
       * limitations under the License.
       */
      
      package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
      
      import java.util.Collection;
      import java.util.Comparator;
      import java.util.List;
      import java.util.stream.Collectors;
      
      import com.google.common.collect.Lists;
      
      /**
       * lower weight round robin
       */
      public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
      
          /**
           * select
           *
           * @param sources sources
           * @return HostWeight
           */
          @Override
          public HostWeight doSelect(Collection<HostWeight> sources) {
              double totalWeight = 0;
              double lowWeight = 0;
              HostWeight lowerNode = null;
              List<HostWeight> weights = canAssignTaskHost(sources);
              for (HostWeight hostWeight : weights) {
                  totalWeight += hostWeight.getWeight();
                  hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
                  if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
                      lowerNode = hostWeight;
                      lowWeight = hostWeight.getCurrentWeight();
                  }
              }
              lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
              return lowerNode;
          }
      
          private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) {
              List<HostWeight> zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList());
              if (!zeroWaitingTask.isEmpty()) {
                  return zeroWaitingTask;
              }
              HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
              List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
              List<HostWeight> equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount())
                  .collect(Collectors.toList());
              if (!equalWaitingTask.isEmpty()) {
                  waitingTask.addAll(equalWaitingTask);
              }
              return waitingTask;
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71

    🐵其它


    @Bean注解官网

    Spring的@Bean注解用于告诉方法,产生一个Bean对象,然后这个Bean对象交给Spring管理。产生这个Bean对象的方法Spring只会调用一次,随后将这个Bean对象放在自己的IOC容器中。

    SpringIOC
    容器管理一个或者多个bean,这些bean都需要在@Configuration注解下进行创建,在一个方法上使用@Bean注解就表明这个方法需要交给Spring进行管理。

    在这里插入图片描述

  • 相关阅读:
    计算机网络-----ICMP
    【ARM Trace32(劳特巴赫) 使用介绍 2 - Veloce 环境中使用trace32 连接 Cortex-M33】
    macOS 的「预览」有几种用法
    Element-Form表单单独取消某一项的校验
    不黑箱,不抽卡,分分钟带你拿捏SD中的色彩控制 | 京东云技术团队
    Pandas数据分析17——pandas数据清洗(缺失值、重复值处理)
    [免费专栏] Android安全之ADB常用命令
    【推荐系统】特征处理
    elasticsearch搜索IK分词器实现单个字搜索
    Fedora Linux 38下Mariadb数据库设置utf8mb4字符编码
  • 原文地址:https://blog.csdn.net/qq_36434219/article/details/127849367