官网资料,从1.3开始到3.1都是一样的,源码将以2.0.6版本为例,官网介绍如下:
可以修改master.properties
,指定负载均衡算法:
MasterConfig.java
默认为线性负载lowerweight
:
HostManagerConfig.java
根据配置选择算法:
父类CommonHostManager
,获取有效的worker列表:
RandomSelector
RandomHostManager
AbstractSelector
用到了原子类
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
AtomicBoolean
/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.atomic;
import sun.misc.Unsafe;
/**
* A {@code boolean} value that may be updated atomically. See the
* {@link java.util.concurrent.atomic} package specification for
* description of the properties of atomic variables. An
* {@code AtomicBoolean} is used in applications such as atomically
* updated flags, and cannot be used as a replacement for a
* {@link java.lang.Boolean}.
*
* @since 1.5
* @author Doug Lea
*/
public class AtomicBoolean implements java.io.Serializable {
private static final long serialVersionUID = 4654671469794556979L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicBoolean.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
/**
* Creates a new {@code AtomicBoolean} with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicBoolean(boolean initialValue) {
value = initialValue ? 1 : 0;
}
/**
* Creates a new {@code AtomicBoolean} with initial value {@code false}.
*/
public AtomicBoolean() {
}
/**
* Returns the current value.
*
* @return the current value
*/
public final boolean get() {
return value != 0;
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* May fail
* spuriously and does not provide ordering guarantees, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
*/
public boolean weakCompareAndSet(boolean expect, boolean update) {
int e = expect ? 1 : 0;
int u = update ? 1 : 0;
return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}
/**
* Unconditionally sets to the given value.
*
* @param newValue the new value
*/
public final void set(boolean newValue) {
value = newValue ? 1 : 0;
}
/**
* Eventually sets to the given value.
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(boolean newValue) {
int v = newValue ? 1 : 0;
unsafe.putOrderedInt(this, valueOffset, v);
}
/**
* Atomically sets to the given value and returns the previous value.
*
* @param newValue the new value
* @return the previous value
*/
public final boolean getAndSet(boolean newValue) {
boolean prev;
do {
prev = get();
} while (!compareAndSet(prev, newValue));
return prev;
}
/**
* Returns the String representation of the current value.
* @return the String representation of the current value
*/
public String toString() {
return Boolean.toString(get());
}
}
AtomicLong
/*
* ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent.atomic;
import java.util.function.LongUnaryOperator;
import java.util.function.LongBinaryOperator;
import sun.misc.Unsafe;
/**
* A {@code long} value that may be updated atomically. See the
* {@link java.util.concurrent.atomic} package specification for
* description of the properties of atomic variables. An
* {@code AtomicLong} is used in applications such as atomically
* incremented sequence numbers, and cannot be used as a replacement
* for a {@link java.lang.Long}. However, this class does extend
* {@code Number} to allow uniform access by tools and utilities that
* deal with numerically-based classes.
*
* @since 1.5
* @author Doug Lea
*/
public class AtomicLong extends Number implements java.io.Serializable {
private static final long serialVersionUID = 1927816293512124184L;
// setup to use Unsafe.compareAndSwapLong for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
/**
* Records whether the underlying JVM supports lockless
* compareAndSwap for longs. While the Unsafe.compareAndSwapLong
* method works in either case, some constructions should be
* handled at Java level to avoid locking user-visible locks.
*/
static final boolean VM_SUPPORTS_LONG_CAS = VMSupportsCS8();
/**
* Returns whether underlying JVM supports lockless CompareAndSet
* for longs. Called only once and cached in VM_SUPPORTS_LONG_CAS.
*/
private static native boolean VMSupportsCS8();
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicLong.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile long value;
/**
* Creates a new AtomicLong with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicLong(long initialValue) {
value = initialValue;
}
/**
* Creates a new AtomicLong with initial value {@code 0}.
*/
public AtomicLong() {
}
/**
* Gets the current value.
*
* @return the current value
*/
public final long get() {
return value;
}
/**
* Sets to the given value.
*
* @param newValue the new value
*/
public final void set(long newValue) {
value = newValue;
}
/**
* Eventually sets to the given value.
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(long newValue) {
unsafe.putOrderedLong(this, valueOffset, newValue);
}
/**
* Atomically sets to the given value and returns the old value.
*
* @param newValue the new value
* @return the previous value
*/
public final long getAndSet(long newValue) {
return unsafe.getAndSetLong(this, valueOffset, newValue);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
/**
* Atomically sets the value to the given updated value
* if the current value {@code ==} the expected value.
*
* May fail
* spuriously and does not provide ordering guarantees, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
*/
public final boolean weakCompareAndSet(long expect, long update) {
return unsafe.compareAndSwapLong(this, valueOffset, expect, update);
}
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final long getAndIncrement() {
return unsafe.getAndAddLong(this, valueOffset, 1L);
}
/**
* Atomically decrements by one the current value.
*
* @return the previous value
*/
public final long getAndDecrement() {
return unsafe.getAndAddLong(this, valueOffset, -1L);
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the previous value
*/
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
/**
* Atomically decrements by one the current value.
*
* @return the updated value
*/
public final long decrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, -1L) - 1L;
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}
/**
* Atomically updates the current value with the results of
* applying the given function, returning the previous value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param updateFunction a side-effect-free function
* @return the previous value
* @since 1.8
*/
public final long getAndUpdate(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return prev;
}
/**
* Atomically updates the current value with the results of
* applying the given function, returning the updated value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param updateFunction a side-effect-free function
* @return the updated value
* @since 1.8
*/
public final long updateAndGet(LongUnaryOperator updateFunction) {
long prev, next;
do {
prev = get();
next = updateFunction.applyAsLong(prev);
} while (!compareAndSet(prev, next));
return next;
}
/**
* Atomically updates the current value with the results of
* applying the given function to the current and given values,
* returning the previous value. The function should be
* side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function
* is applied with the current value as its first argument,
* and the given update as the second argument.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the previous value
* @since 1.8
*/
public final long getAndAccumulate(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}
/**
* Atomically updates the current value with the results of
* applying the given function to the current and given values,
* returning the updated value. The function should be
* side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function
* is applied with the current value as its first argument,
* and the given update as the second argument.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the updated value
* @since 1.8
*/
public final long accumulateAndGet(long x,
LongBinaryOperator accumulatorFunction) {
long prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsLong(prev, x);
} while (!compareAndSet(prev, next));
return next;
}
/**
* Returns the String representation of the current value.
* @return the String representation of the current value
*/
public String toString() {
return Long.toString(get());
}
/**
* Returns the value of this {@code AtomicLong} as an {@code int}
* after a narrowing primitive conversion.
* @jls 5.1.3 Narrowing Primitive Conversions
*/
public int intValue() {
return (int)get();
}
/**
* Returns the value of this {@code AtomicLong} as a {@code long}.
*/
public long longValue() {
return get();
}
/**
* Returns the value of this {@code AtomicLong} as a {@code float}
* after a widening primitive conversion.
* @jls 5.1.2 Widening Primitive Conversions
*/
public float floatValue() {
return (float)get();
}
/**
* Returns the value of this {@code AtomicLong} as a {@code double}
* after a widening primitive conversion.
* @jls 5.1.2 Widening Primitive Conversions
*/
public double doubleValue() {
return (double)get();
}
}
真正的源码RoundRobinSelector
…神马玩意!!!!
默认的负载均衡算法,估计也是真正使用的,另外两个就是凑数的,select方法都是单独写的
ExecutorDispatcher
:只有LowerWeightHostManager
重写了select(ExecutionContext context)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.host.assign;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
/**
* lower weight round robin
*/
public class LowerWeightRoundRobin extends AbstractSelector<HostWeight> {
/**
* select
*
* @param sources sources
* @return HostWeight
*/
@Override
public HostWeight doSelect(Collection<HostWeight> sources) {
double totalWeight = 0;
double lowWeight = 0;
HostWeight lowerNode = null;
List<HostWeight> weights = canAssignTaskHost(sources);
for (HostWeight hostWeight : weights) {
totalWeight += hostWeight.getWeight();
hostWeight.setCurrentWeight(hostWeight.getCurrentWeight() + hostWeight.getWeight());
if (lowerNode == null || lowWeight > hostWeight.getCurrentWeight()) {
lowerNode = hostWeight;
lowWeight = hostWeight.getCurrentWeight();
}
}
lowerNode.setCurrentWeight(lowerNode.getCurrentWeight() + totalWeight);
return lowerNode;
}
private List<HostWeight> canAssignTaskHost(Collection<HostWeight> sources) {
List<HostWeight> zeroWaitingTask = sources.stream().filter(h -> h.getWaitingTaskCount() == 0).collect(Collectors.toList());
if (!zeroWaitingTask.isEmpty()) {
return zeroWaitingTask;
}
HostWeight hostWeight = sources.stream().min(Comparator.comparing(HostWeight::getWaitingTaskCount)).get();
List<HostWeight> waitingTask = Lists.newArrayList(hostWeight);
List<HostWeight> equalWaitingTask = sources.stream().filter(h -> !h.getHost().equals(hostWeight.getHost()) && h.getWaitingTaskCount() == hostWeight.getWaitingTaskCount())
.collect(Collectors.toList());
if (!equalWaitingTask.isEmpty()) {
waitingTask.addAll(equalWaitingTask);
}
return waitingTask;
}
}
Spring的@Bean注解用于告诉方法,产生一个Bean对象,然后这个Bean对象交给Spring管理。产生这个Bean对象的方法Spring只会调用一次,随后将这个Bean对象放在自己的IOC容器中。
SpringIOC
容器管理一个或者多个bean,这些bean都需要在@Configuration注解下进行创建,在一个方法上使用@Bean注解就表明这个方法需要交给Spring进行管理。