Callable接口类似于Runnable ,因为它们都是为其实例可能由另一个线程执行的类设计的。 然而Runnable不返回结果,也不能抛出被检查的异常
Callable是创建线程的第三种方式,是一个函数式接口
-
- /**
- * A task that returns a result and may throw an exception.
- * Implementors define a single method with no arguments called
- * {@code call}.
- *
- *
The {@code Callable} interface is similar to {@link
- * java.lang.Runnable}, in that both are designed for classes whose
- * instances are potentially executed by another thread. A
- * {@code Runnable}, however, does not return a result and cannot
- * throw a checked exception.
- *
- *
The {@link Executors} class contains utility methods to
- * convert from other common forms to {@code Callable} classes.
- *
- * @see Executor
- * @since 1.5
- * @author Doug Lea
- * @param
the result type of method {@code call} - */
- @FunctionalInterface
- public interface Callable
{ - /**
- * Computes a result, or throws an exception if unable to do so.
- *
- * @return computed result
- * @throws Exception if unable to compute a result
- */
- V call() throws Exception;
- }
使用Callable接口创建线程时,发现Thread中构造方法并没有Callable参数
但是Java中提供了一实现Runnable接口的实现类 FutureTask
1、Future接口
FutureTask 实现了 RunnableFuture 接口,RunnableFuture 接口继承自 Runnable 接口
- public class FutureTask
implements RunnableFuture { - public FutureTask(Callable
callable) { - if (callable == null)
- throw new NullPointerException();
- this.callable = callable;
- this.state = NEW; // ensure visibility of callable
- }
-
- public FutureTask(Runnable runnable, V result) {
- this.callable = Executors.callable(runnable, result);
- this.state = NEW; // ensure visibility of callable
- }
- }
-
-
- public interface RunnableFuture
extends Runnable, Future { - void run();
- }
因此我们可以先创建一个 FutureTask 类,将 Callable 参数传进去,再将 FutureTask 作为参数传入创建 Thread类中
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.FutureTask;
-
- public class CallableTest {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // new Thread(new Runnable()).start();
- // new Thread(new FutureTask
()).start(); - // new Thread(new FutureTask
(Callable)).start(); -
- MyCallable myCallable = new MyCallable();
- FutureTask futureTask = new FutureTask(myCallable);
-
- new Thread(futureTask,"A").start();
- new Thread(futureTask,"B").start();
-
- Integer o = (Integer) futureTask.get(); //这个get 方法可能会产生阻塞!把他放到最后
- // 或者使用异步通信来处理!
- System.out.println(o);
- }
- }
-
- class MyCallable implements Callable
{ -
- @Override
- public Integer call() throws Exception {
- // 耗时操作
- System.out.println("call()");
- return 200;
- }
- }
输出
- call()
- 200
注:
1、Callable接口会有缓存,开启多个线程但是只返回一个 输出
2、返回结果会阻塞线程,比较耗时,尽量放在代码最后 或者使用异步通信处理
- //Callable 接口
- public interface Callable
{ - V call() throws Exception;
- }
-
- // Runnable 接口
- public interface Runnable {
- public abstract void run();
- }
1、Callable规定的方法是call(),Runnable规定的方法是run().
2、Callable的任务执行后可返回值,而Runnable的任务是不能返回值的
3、call方法可以抛出异常,run方法不可以
4、运行Callable任务可以拿到一个Future对象,Future 表示异步计算的结果(executorService.submit(Runnable task) 也会返回future, 但是没有future的效果 )
一般在多线程下,使用List
ArrayList是非线程安全的,在多线程的情况下,向list插入数据的时候,可能会造成数据丢失的情况。并且一个线程在遍历List,另一个线程修改List,会报ConcurrentModificationException(并发修改异常)错误
java.util.ConcurrentModificationException 并发修改异常
一般有三种解决方案
Vector是一个线程安全的List,但是它的线程安全实现方式是对所有操作都加上了synchronized关键字,这种方式严重影响效率.所以并不推荐使用Vector
首先 Collections.synchronizedList(new ArrayList<>());
- public static
List synchronizedList(List list) { - return (list instanceof RandomAccess ?
- new SynchronizedRandomAccessList<>(list) :
- new SynchronizedList<>(list));
- }
这个方法回根据你传入的List是否实现RandomAccess这个接口来返回的SynchronizedRandomAccessList还是SynchronizedList
SynchronizedList源码
- static class SynchronizedList
- extends SynchronizedCollection
- implements List
{ - private static final long serialVersionUID = -7754090372962971524L;
-
- final List
list; -
- SynchronizedList(List
list) { - super(list);
- this.list = list;
- }
- SynchronizedList(List
list, Object mutex) { - super(list, mutex);
- this.list = list;
- }
-
- public boolean equals(Object o) {
- if (this == o)
- return true;
- synchronized (mutex) {return list.equals(o);}
- }
- public int hashCode() {
- synchronized (mutex) {return list.hashCode();}
- }
-
- public E get(int index) {
- synchronized (mutex) {return list.get(index);}
- }
- public E set(int index, E element) {
- synchronized (mutex) {return list.set(index, element);}
- }
- public void add(int index, E element) {
- synchronized (mutex) {list.add(index, element);}
- }
- public E remove(int index) {
- synchronized (mutex) {return list.remove(index);}
- }
-
- public int indexOf(Object o) {
- synchronized (mutex) {return list.indexOf(o);}
- }
- public int lastIndexOf(Object o) {
- synchronized (mutex) {return list.lastIndexOf(o);}
- }
-
- public boolean addAll(int index, Collection extends E> c) {
- synchronized (mutex) {return list.addAll(index, c);}
- }
-
- public ListIterator
listIterator() { - return list.listIterator(); // Must be manually synched by user
- }
-
- public ListIterator
listIterator(int index) { - return list.listIterator(index); // Must be manually synched by user
- }
-
- public List
subList(int fromIndex, int toIndex) { - synchronized (mutex) {
- return new SynchronizedList<>(list.subList(fromIndex, toIndex),
- mutex);
- }
- }
-
- @Override
- public void replaceAll(UnaryOperator
operator) { - synchronized (mutex) {list.replaceAll(operator);}
- }
- @Override
- public void sort(Comparator super E> c) {
- synchronized (mutex) {list.sort(c);}
- }
- ... ...
- }
执行add()等方法的时候加了synchronized关键字,但是listIterator(),iterator()方法却没有加
CopyOnWriteArrayList 底层是数组实现的,主要有以下两个变量
- public class CopyOnWriteArrayList
- implements List
, RandomAccess, Cloneable, java.io.Serializable { -
- /** The lock protecting all mutators */
- final transient ReentrantLock lock = new ReentrantLock();
-
- /** The array, accessed only via getArray/setArray. */
- private transient volatile Object[] array;
- }
1、lock:ReentrantLock,独占锁,多线程运行的情况下,只有一个线程会获得这个锁,只有释放锁后其他线程才能获得
2、array:存放数据的数组,关键是被volatile修饰了,被volatile修饰,就保证了可见性,也就是一个线程修改后,其他线程立即可见
CopyOnWriteArrayList原理
1、CopyOnWriteArrayList实现了List接口,因此它是一个队列
2、CopyOnWriteArrayList包含了成员lock。每一个CopyOnWriteArrayList都和一个监视器锁lock绑定,通过lock,实现了对CopyOnWriteArrayList的互斥访问
3、CopyOnWriteArrayList包含了成员array数组,这说明CopyOnWriteArrayList本质上通过数组实现的
4、CopyOnWriteArrayList的“动态数组”机制 -- 它内部有个“volatile数组”(array)来保持数据。在“添加/修改/删除”数据时,都会新建一个数组,并将更新后的数据拷贝到新建的数组中,最后再将该数组赋值给“volatile数组”。这就是它叫做CopyOnWriteArrayList的原因!CopyOnWriteArrayList就是通过这种方式实现的动态数组;不过正由于它在“添加/修改/删除”数据时,都会新建数组,所以涉及到修改数据的操作,CopyOnWriteArrayList效率很 低;但是单单只是进行遍历查找的话,效率比较高
5、CopyOnWriteArrayList的“线程安全”机制 -- 是通过volatile和监视器锁Synchrnoized来实现的
6、CopyOnWriteArrayList是通过“volatile数组”来保存数据的。一个线程读取volatile数组时,总能看到其它线程对该volatile变量最后的写入;就这样,通过volatile提供了“读取到的数据总是最新的”这个机制的 保证
7、CopyOnWriteArrayList通过监视器锁Synchrnoized来保护数据。在“添加/修改/删除”数据时,会先“获取监视器锁”,再修改完毕之后,先将数据更新到“volatile数组”中,然后再“释放互斥锁”;这样,就达到了保护数据的目的
可以看到 add()方法使用 lock进行加锁
-
- /**
- * Appends the specified element to the end of this list.
- *
- * @param e element to be appended to this list
- * @return {@code true} (as specified by {@link Collection#add})
- */
- public boolean add(E e) {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- Object[] elements = getArray();
- int len = elements.length;
- Object[] newElements = Arrays.copyOf(elements, len + 1);
- newElements[len] = e;
- setArray(newElements);
- return true;
- } finally {
- lock.unlock();
- }
- }
CopyOnWriteArrayList 添加数组的步骤如下:
1、获得独占锁,将添加功能加锁
2、获取原来的数组,并得到其长度
3、创建一个长度为原来数组长度+1的数组,并拷贝原来的元素给新数组
4、追加元素到新数组末尾
5、指向新数组
6、释放锁
这个过程是线程安全的,写入时复制(COW)的核心思想就是每次修改的时候拷贝一个新的资源去修改,add()方法再拷贝新资源的时候将数组容量+1,这样虽然每次添加元素都会浪费一定的空间,但是数组的长度正好是元素的长度,也在一定程度上节省了扩容的开销
- /**
- * List 线程不安全
- * java.util.ConcurrentModificationException 并发修改异常
- */
- public class ListTest {
- public static void main(String[] args) {
- // 单线程下安全;并发下 线程不安全
- //List
list = new ArrayList<>(); - /* 解决方案
- 1、List
list = new Vector<>(); // 线程安全,源码中方法都有 synchronized 修饰 - 2、List
list = Collections.synchronizedList(new ArrayList<>()); - 3、List
list = new CopyOnWriteArrayList<>(); - CopyOnWrite 写入时复制(COW);计算机程序设计领域的一种优化策略
- 多个线程调用的时候,list,读取的时候,固定的,写入(覆盖);在写入的时候避免覆盖,造成数据问题
- 读写分离
- * */
- //List
list = new Vector<>(); // 线程安全,源码中方法都有 synchronized 修饰 - //List
list = Collections.synchronizedList(new ArrayList<>()); - List
list = new CopyOnWriteArrayList<>(); -
- for (int i = 0; i < 10; i++) {
- new Thread(() -> {
- list.add(UUID.randomUUID().toString().substring(0,5));
- System.out.println(list);
- },String.valueOf(i)).start();
- }
- }
- }
- /**
- * java.util.ConcurrentModificationException
- */
- public class SetTest {
- public static void main(String[] args) {
- //Set
set = new HashSet<>(); -
- //Set
set = Collections.synchronizedSet(new HashSet<>()); -
- Set
set = new CopyOnWriteArraySet<>(); -
- for (int i = 1; i <=10 ; i++) {
- new Thread(()->{
- set.add(UUID.randomUUID().toString().substring(0,5));
- System.out.println(set);
- },String.valueOf(i)).start();
- }
-
- }
- }
- // java.util.ConcurrentModificationException
- public class MapTest {
-
- public static void main(String[] args) {
- // 默认等价于 new HashMap<>(16,0.75);
- // Map
map = new HashMap<>(); -
- Map
map = new ConcurrentHashMap<>(); -
- for (int i = 1; i <=10; i++) {
- new Thread(()->{
- map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
- System.out.println(map);
- },String.valueOf(i)).start();
- }
-
- }
- }
ConcurrentHashMap源码分析
1、添加元素put/putVal方法
- public V put(K key, V value) {
- return putVal(key, value, false);
- }
-
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- //如果有空值或者空键,直接抛异常
- if (key == null || value == null) throw new NullPointerException();
- //基于key计算hash值,并进行一定的扰动
- int hash = spread(key.hashCode());
- //记录某个桶上元素的个数,如果超过8个,会转成红黑树
- int binCount = 0;
- for (Node
[] tab = table;;) { - Node
f; int n, i, fh; - //如果数组还未初始化,先对数组进行初始化
- if (tab == null || (n = tab.length) == 0)
- tab = initTable();
- //如果hash计算得到的桶位置没有元素,利用cas将元素添加
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- //cas+自旋(和外侧的for构成自旋循环),保证元素添加安全
- if (casTabAt(tab, i, null,
- new Node
(hash, key, value, null))) - break; // no lock when adding to empty bin
- }
- //如果hash计算得到的桶位置元素的hash值为MOVED,证明正在扩容,那么协助扩容
- else if ((fh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
- else {
- //hash计算的桶位置元素不为空,且当前没有处于扩容操作,进行元素添加
- V oldVal = null;
- //对当前桶进行加锁,保证线程安全,执行元素添加操作
- synchronized (f) {
- if (tabAt(tab, i) == f) {
- //普通链表节点
- if (fh >= 0) {
- binCount = 1;
- for (Node
e = f;; ++binCount) { - K ek;
- if (e.hash == hash &&
- ((ek = e.key) == key ||
- (ek != null && key.equals(ek)))) {
- oldVal = e.val;
- if (!onlyIfAbsent)
- e.val = value;
- break;
- }
- Node
pred = e; - if ((e = e.next) == null) {
- pred.next = new Node
(hash, key, - value, null);
- break;
- }
- }
- }
- //树节点,将元素添加到红黑树中
- else if (f instanceof TreeBin) {
- Node
p; - binCount = 2;
- if ((p = ((TreeBin
)f).putTreeVal(hash, key, - value)) != null) {
- oldVal = p.val;
- if (!onlyIfAbsent)
- p.val = value;
- }
- }
- }
- }
- if (binCount != 0) {
- //链表长度大于/等于8,将链表转成红黑树
- if (binCount >= TREEIFY_THRESHOLD)
- treeifyBin(tab, i);
- //如果是重复键,直接将旧值返回
- if (oldVal != null)
- return oldVal;
- break;
- }
- }
- }
- //添加的是新元素,维护集合长度,并判断是否要进行扩容操作
- addCount(1L, binCount);
- return null;
- }
需要添加元素时,会针对当前元素所对应的桶位进行加锁操作,这样一方面保证元素添加时,多线程的安全,同时对某个桶位加锁不会影响其他桶位的操作,进一步提升多线程的并发效率
2、数组初始化,initTable方法
- private final Node
[] initTable() { - Node
[] tab; int sc; - //cas+自旋,保证线程安全,对数组进行初始化操作
- while ((tab = table) == null || tab.length == 0) {
- //如果sizeCtl的值(-1)小于0,说明此时正在初始化, 让出cpu
- if ((sc = sizeCtl) < 0)
- Thread.yield(); // lost initialization race; just spin
- //cas修改sizeCtl的值为-1,修改成功,进行数组初始化,失败,继续自旋
- else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
- try {
- if ((tab = table) == null || tab.length == 0) {
- //sizeCtl为0,取默认长度16,否则去sizeCtl的值
- int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
- @SuppressWarnings("unchecked")
- //基于初始长度,构建数组对象
- Node
[] nt = (Node[])new Node,?>[n]; - table = tab = nt;
- //计算扩容阈值,并赋值给sc
- sc = n - (n >>> 2);
- }
- } finally {
- //将扩容阈值,赋值给sizeCtl
- sizeCtl = sc;
- }
- break;
- }
- }
- return tab;
- }
3、数组扩容
- private final void transfer(Node
[] tab, Node[] nextTab) { - int n = tab.length, stride;
- //如果是多cpu,那么每个线程划分任务,最小任务量是16个桶位的迁移
- if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
- stride = MIN_TRANSFER_STRIDE; // subdivide range
- //如果是扩容线程,此时新数组为null
- if (nextTab == null) { // initiating
- try {
- @SuppressWarnings("unchecked")
- //两倍扩容创建新数组
- Node
[] nt = (Node[])new Node,?>[n << 1]; - nextTab = nt;
- } catch (Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- nextTable = nextTab;
- //记录线程开始迁移的桶位,从后往前迁移
- transferIndex = n;
- }
- //记录新数组的末尾
- int nextn = nextTab.length;
- //已经迁移的桶位,会用这个节点占位(这个节点的hash值为-1--MOVED)
- ForwardingNode
fwd = new ForwardingNode(nextTab); - boolean advance = true;
- boolean finishing = false; // to ensure sweep before committing nextTab
- for (int i = 0, bound = 0;;) {
- Node
f; int fh; - while (advance) {
- int nextIndex, nextBound;
- //i记录当前正在迁移桶位的索引值
- //bound记录下一次任务迁移的开始桶位
-
- //--i >= bound 成立表示当前线程分配的迁移任务还没有完成
- if (--i >= bound || finishing)
- advance = false;
- //没有元素需要迁移 -- 后续会去将扩容线程数减1,并判断扩容是否完成
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- }
- //计算下一次任务迁移的开始桶位,并将这个值赋值给transferIndex
- else if (U.compareAndSwapInt
- (this, TRANSFERINDEX, nextIndex,
- nextBound = (nextIndex > stride ?
- nextIndex - stride : 0))) {
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
- //如果没有更多的需要迁移的桶位,就进入该if
- if (i < 0 || i >= n || i + n >= nextn) {
- int sc;
- //扩容结束后,保存新数组,并重新计算扩容阈值,赋值给sizeCtl
- if (finishing) {
- nextTable = null;
- table = nextTab;
- sizeCtl = (n << 1) - (n >>> 1);
- return;
- }
- //扩容任务线程数减1
- if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
- //判断当前所有扩容任务线程是否都执行完成
- if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
- return;
- //所有扩容线程都执行完,标识结束
- finishing = advance = true;
- i = n; // recheck before commit
- }
- }
- //当前迁移的桶位没有元素,直接在该位置添加一个fwd节点
- else if ((f = tabAt(tab, i)) == null)
- advance = casTabAt(tab, i, null, fwd);
- //当前节点已经被迁移
- else if ((fh = f.hash) == MOVED)
- advance = true; // already processed
- else {
- //当前节点需要迁移,加锁迁移,保证多线程安全
- //此处迁移逻辑和jdk7的ConcurrentHashMap相同,不再赘述
- synchronized (f) {
- if (tabAt(tab, i) == f) {
- Node
ln, hn; - if (fh >= 0) {
- int runBit = fh & n;
- Node
lastRun = f; - for (Node
p = f.next; p != null; p = p.next) { - int b = p.hash & n;
- if (b != runBit) {
- runBit = b;
- lastRun = p;
- }
- }
- if (runBit == 0) {
- ln = lastRun;
- hn = null;
- }
- else {
- hn = lastRun;
- ln = null;
- }
- for (Node
p = f; p != lastRun; p = p.next) { - int ph = p.hash; K pk = p.key; V pv = p.val;
- if ((ph & n) == 0)
- ln = new Node
(ph, pk, pv, ln); - else
- hn = new Node
(ph, pk, pv, hn); - }
- setTabAt(nextTab, i, ln);
- setTabAt(nextTab, i + n, hn);
- setTabAt(tab, i, fwd);
- advance = true;
- }
- else if (f instanceof TreeBin) {
- TreeBin
t = (TreeBin)f; - TreeNode
lo = null, loTail = null; - TreeNode
hi = null, hiTail = null; - int lc = 0, hc = 0;
- for (Node
e = t.first; e != null; e = e.next) { - int h = e.hash;
- TreeNode
p = new TreeNode - (h, e.key, e.val, null, null);
- if ((h & n) == 0) {
- if ((p.prev = loTail) == null)
- lo = p;
- else
- loTail.next = p;
- loTail = p;
- ++lc;
- }
- else {
- if ((p.prev = hiTail) == null)
- hi = p;
- else
- hiTail.next = p;
- hiTail = p;
- ++hc;
- }
- }
- ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
- (hc != 0) ? new TreeBin
(lo) : t; - hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
- (lc != 0) ? new TreeBin
(hi) : t; - setTabAt(nextTab, i, ln);
- setTabAt(nextTab, i + n, hn);
- setTabAt(tab, i, fwd);
- advance = true;
- }
- }
- }
- }
- }
- }