Updated on 2017-07-29

https://docs.oracle.com/javase/10/docs/api/java/util/concurrent | 中文

Concept

  • 串行:多个线程 按照顺序 使用同一个核心。(单核心)(Serial)
  • 并发:多个线程 共同轮流 使用同一个核心。(单核心)(线程 同时存在)(Concurrent)
  • 并行:多个线程 各自分别 使用 一个核心。(多核心)(线程 同时执行)(Parallel)
    • 并行是并发的一个 子集,区别在于 CPU 是否为多核心。

  • Blocking : 阻塞并发,内部使用
  • Concurrent:非阻塞并发,内部使用 CAS 操作

Code

线程体

无返回值:Runnable

import java.util.concurrent.TimeUnit;

public class Action implements Runnable {
    @Override
    public void run() {     方法签名无返回值无检查异常
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Action Done");
    }
}

有返回值:Callable

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class Task implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {     方法签名有返回值有检查异常
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Task Done");
        return 9527;
    }
}

线程池

ThreadPoolExecutor

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();     创建线程池

                                 executorService.execute(new Action());    执行 Runnable 对象无返回值
        Future<Integer> future = executorService.submit (new Task());      执行 Callable 对象有返回值

        executorService.shutdown();     停止接收线程体并等待执行中的线程结束不阻塞当前线程

        System.out.println(future.get());     获得异步执行的结果若运算未完成则阻塞当前线程
    }
}

Note
int        corePoolSize线程池的最小线程数
int     maximumPoolSize线程池的最大线程数
long      keepAliveTime空闲线程的生存时间
TimeUnit       timeUnit指示时间参数的单位
BlockingQueue workQueue存储等待执行的任务

变种
newSingleThreadExecutor所有线程串行执行
     newFixedThreadPool最多 n 个线程并发执行
    newCachedThreadPool所有线程并发执行

ScheduledThreadPoolExecutor

import java.time.Instant;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);

        ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(() -> {
            System.out.println(Date.from(Instant.now()));
        }, 1, 1, TimeUnit.SECONDS);     重复执行每隔 1 秒执行

        scheduledExecutorService.schedule(() -> {
            scheduledFuture.cancel(true);     取消任务
            scheduledExecutorService.shutdown();     关闭线程池
        }, 5, TimeUnit.SECONDS);     延期执行5 秒后执行
    }
}
----
输出
Fri Jul 21 15:55:53 CST 2017
Fri Jul 21 15:55:54 CST 2017
Fri Jul 21 15:55:55 CST 2017
Fri Jul 21 15:55:56 CST 2017
Fri Jul 21 15:55:57 CST 2017

重复执行
scheduleAtFixedRate       任务开始时计时
scheduleWithFixedDelay    任务结束时计时

TimeUnit
 NANOSECONDS纳秒1000
MICROSECONDS微秒1000
MILLISECONDS毫秒1000
     SECONDS一秒60
     MINUTES一分60
       HOURS一时24
        DAYS一天

ExecutorCompletionService

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);     按照完成任务的先后顺序依次将结果存入内部队列

        for (int n = 0; n < 3; n++) {
            completionService.submit(() -> {
                int i = ThreadLocalRandom.current().nextInt(5);     随机数生成器本地线程
                TimeUnit.SECONDS.sleep(i);
                return i;
            });
        }

        executorService.shutdown();

        for (int n = 0; n < 3; n++) {
            Future<Integer> future = completionService.take();     从内部队列中依次取出结果并移除若没有结果则阻塞当前线程
            System.out.println(future.get());
        }
    }
}
----
输出
0
1
4

ForkJoinPool(并行框架)

  • 主要用于 计算密集型 的任务,适合 任务分而治之函数递归调用 的算法。
无返回值:RecursiveAction
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;

public class ComputeAction extends RecursiveAction {
    private final static int THRESHOLD = 200;     影响任务个数
    private final double[] array;
    private final int from, to;

    ComputeAction(double[] array, int from, int to) {
        this.array = array;
        this.from = from;
        this.to = to;
    }

    @Override
    protected void compute() {
        if (to - from < THRESHOLD) {     当需要计算的资源小于阈值时进行计算
            for (int i = from; i < to; i++) {
                array[i] = Math.sin(array[i]) + Math.cos(array[i]) + Math.tan(array[i]);
            }
        } else {     否则把任务一分为二进行递归
            int mid = (from + to) >>> 1;
            ComputeAction l = new ComputeAction(array, from, mid);
            ComputeAction r = new ComputeAction(array, mid, to);
            ForkJoinTask.invokeAll(l, r);     阻塞
        }
    }
}
有返回值:RecursiveTask
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class FindTask extends RecursiveTask<Double> {
    private final static int THRESHOLD = 200;
    private final double[] array;
    private final int from, to;

    FindTask(double[] array, int from, int to) {
        this.array = array;
        this.from = from;
        this.to = to;
    }

    @Override
    protected Double compute() {
        if (to - from < THRESHOLD) {
            double max = 0;
            for (int i = from; i < to; i++) {
                max = Math.max(max, array[i]);
            }
            return max;
        } else {
            int mid = (from + to) >>> 1;
            FindTask l = new FindTask(array, from, mid);
            FindTask r = new FindTask(array, mid, to);
            ForkJoinTask.invokeAll(l, r);
            return Math.max(l.join(), r.join());
        }
    }
}
Main
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;

public class Main {
    public static void main(String[] args) {
        double[] array = ThreadLocalRandom.current().doubles(1000000).toArray();     生成随机数组

        ForkJoinPool forkJoinPool = new ForkJoinPool();

                        forkJoinPool.invoke(new ComputeAction(array, 0, array.length));  执行 RecursiveAction 对象无返回值阻塞
        Double result = forkJoinPool.invoke(new FindTask(array, 0, array.length));       执行 RecursiveTask   对象有返回值阻塞

        forkJoinPool.shutdown();
    }
}

ForkJoinPool.commonPool() 的并行度默认减 1

并发队列

ArrayBlockingQueue

  • 先入先出 队列,内部实现为数组,支持 公平访问策略

LinkedBlockingQueue

  • 先入先出 队列,内部实现为链表。
  • 生产者-消费者实现:
    • 生产者向队列 添加元素:当队列 已满 时,生产者会被阻塞;
    • 消费者从队列 移除元素:当队列 为空 时,消费者会被阻塞。
Producer
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
    private final static AtomicInteger productID = new AtomicInteger();     原子 int
    private final BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                TimeUnit.MILLISECONDS.sleep(25);
                int id = productID.getAndIncrement();
                queue.put(id);     队列满时阻塞
                System.out.println("生产:" + id);
            }
            queue.put(-1);     队列满时阻塞
            System.out.println("生产结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Consumer
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            int i;
            while ((i = queue.take()) != -1) {     队列空时阻塞
                TimeUnit.MILLISECONDS.sleep(50);
                System.out.println("消费:" + i);
            }
            System.out.println("消费结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Main
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {
    public static void main(String[] args) {
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);     指定队列容量为 10
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            executorService.execute(new Producer(queue));
            executorService.execute(new Consumer(queue));
        }
        executorService.shutdown();
    }
}

PriorityBlockingQueue

  • 队列中的元素会 根据给定规则进行排序

DelayQueue

  • 队列内部持有一个 PriorityBlockingQueue,用于存储元素并 根据延迟时间进行排序
  • 队列中的元素需要 实现 Delayed 接口
    • getDelay 方法用于获取剩余延迟。
    • compareTo 方法对元素进行比较。
DelayObject
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayObject implements Delayed {
    private final static TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS;     内部计时单位
    private final long delay;      延迟时间
    private final long submit;     提交时间
    private final long expired;    到期时间

    public DelayObject(long delay, TimeUnit unit) {
        this.delay = TIME_UNIT.convert(delay, unit);
        this.submit = System.currentTimeMillis();
        this.expired = this.submit + this.delay;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(expired - System.currentTimeMillis(), TIME_UNIT);
    }

    @Override
    public int compareTo(Delayed o) {
        long l1 = this.getDelay(TIME_UNIT);
        long l2 = o.getDelay(TIME_UNIT);
        return Long.compare(l1, l2);
    }

    @Override
    public String toString() {
        return "DelayObject{" +
                "submit=" + submit +
                ", expired=" + expired +
                ", delay=" + delay +
                '}';
    }
}
Main
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayObject> queue = new DelayQueue<>();
        for (int i = 0; i < 5; i++) {
            queue.put(new DelayObject(i, TimeUnit.SECONDS));
        }
        while (!queue.isEmpty()) {
            System.out.println(queue.take());
        }
    }
}
----
输出
DelayObject{submit=1501135689170, expired=1501135689170, delay=0}
DelayObject{submit=1501135689170, expired=1501135690170, delay=1000}
DelayObject{submit=1501135689170, expired=1501135691170, delay=2000}
DelayObject{submit=1501135689170, expired=1501135692170, delay=3000}
DelayObject{submit=1501135689170, expired=1501135693170, delay=4000}

SynchronousQueue

  • 提供线程间进行 数据传递的场所,支持 公平访问策略
  • 调用其插入方法时,必须 等待另一个线程调用其移除方法,队列本身 不存储任何元素

LinkedTransferQueue(推荐)

  • SynchronousQueue、ConcurrentLinkedQueue、LinkedBlockingQueue 的超集。
Producer
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class Producer implements Runnable {
    private final static AtomicInteger productID = new AtomicInteger();
    private final TransferQueue<Integer> queue;

    public Producer(TransferQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 10; i++) {
                int id = productID.getAndIncrement();
                queue.transfer(id);     等待另一个线程调用其移除方法
                System.out.println("生产:" + id);
            }
            queue.transfer(-1);     等待另一个线程调用其移除方法
            System.out.println("生产结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

void       transfer(E e)
boolean tryTransfer(E e)
boolean tryTransfer(E e, long timeout, TimeUnit unit)
----
若没有线程调用其移除方法则等待另一个线程调用其移除方法
若没有线程调用其移除方法一一一一一一一一一一一一一一则丢弃该元素并立即返回 false
若没有线程调用其移除方法则在指定时间内等待若超时则丢弃该元素并立即返回 false
Consumer
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

public class Consumer implements Runnable {
    private final TransferQueue<Integer> queue;

    public Consumer(TransferQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            int i;
            while ((i = queue.take()) != -1) {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("消费:" + i);
            }
            System.out.println("消费结束");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Main
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class Main {
    public static void main(String[] args) {
        TransferQueue<Integer> queue = new LinkedTransferQueue<>();
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 2; i++) {
            executorService.execute(new Producer(queue));
            executorService.execute(new Consumer(queue));
        }
        executorService.shutdown();
    }
}
----
输出
生产:0
生产:1
消费:0
消费:1
生产:2
生产:3
消费:2
消费:3
生产:4
生产:5
消费:5
消费:4
...
生产:16
生产:17
消费:16
消费:17
生产:18
生产:19
消费:18
消费:19
消费结束
生产结束
生产结束
消费结束

原子变量

  • 原子性 -> Unsafe 类 -> CAS 操作 -> cmpxchg 指令

基本数据类型:AtomicInteger

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        AtomicInteger i = new AtomicInteger();

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int n = 0; n < 5; n++) {
            executorService.execute(() -> {
                for (int j = 0; j < 1000; j++) {
                    i.getAndIncrement();     原子操作
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);

        System.out.println(i.get());
    }
}
----
输出
5000

引用数据类型:AtomicReference

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        AtomicReference<Element> reference = new AtomicReference<>(new Element(0, 0));

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int n = 0; n < 5; n++) {
            executorService.execute(() -> {
                for (int j = 0; j < 1000; j++) {
                    boolean flag = false;
                    while (!flag) {     自旋锁
                        Element oldElement = reference.get();
                        Element newElement = new Element(oldElement.x + 1, oldElement.y + 1);
                        flag = reference.compareAndSet(oldElement, newElement);     原子替换
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);

        System.out.println(reference.get());
    }
}
----
输出
Element{x=5000, y=5000}

class Element {
    public int x;
    public int y;

    public Element(int x, int y) {
        this.x = x;
        this.y = y;
    }

    @Override
    public String toString() {
        return "Element{" +
                "x=" + x +
                ", y=" + y +
                '}';
    }
}

属性原子更新:AtomicIntegerFieldUpdater

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerFieldUpdater<Element> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(Element.class, "id");     内部使用反射
        Element element = new Element(0);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int n = 0; n < 5; n++) {
            executorService.execute(() -> {
                for (int j = 0; j < 1000; j++) {
                    fieldUpdater.getAndIncrement(element);     原子更新属性
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);

        System.out.println(element);
    }
}
----
输出
Element{id=5000}

class Element {
    public volatile int id;     应用原子更新的属性必须为 volatile保证可见性

    public Element(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Element{" +
                "id=" + id +
                '}';
    }
}

  原子数组:AtomicIntegerArray

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerArray;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        AtomicIntegerArray ints = new AtomicIntegerArray(5);
        ints.set(0, 1);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int n = 0; n < 5; n++) {
            executorService.execute(() -> {
                for (int j = 0; j < 1000; j++) {
                    ints.getAndIncrement(4);
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);

        System.out.println(ints);
    }
}
----
输出
[1, 0, 0, 0, 5000]

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Action Done");
        });

        executorService.shutdown();     停止接收线程体并等待执行中的线程结束不阻塞当前线程

        System.out.println(executorService.isShutdown());
        System.out.println(executorService.isTerminated());

        executorService.awaitTermination(1, TimeUnit.DAYS);     阻塞当前线程等待执行中的线程结束

        System.out.println(executorService.isShutdown());
        System.out.println(executorService.isTerminated());
    }
}
----
输出
true
false
Action Done
true
true
import java.util.Arrays;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ThreadLocalRandom;

public class Main {
    private final static int arraySize = 100_000_000;
    private final static int arrayMin = 0;
    private final static int arrayMax = 100_000_000;
    private final static int[] array = ThreadLocalRandom.current().ints(arraySize, arrayMin, arrayMax).toArray();

    private final static int bucketCount = 1000;
    private final static int bucketSize = (arraySize / bucketCount) * 10;
    private final static int bucketInterval = arrayMax / bucketCount;
    private final static int[] bucketIndex = new int[bucketCount];
    private final static int[][] bucketArr = new int[bucketCount][bucketSize];

    public static void main(String[] args) {
        //分桶(可以使用多线程,但效益不大)
        long l1 = System.currentTimeMillis();
        for (int i : array) {
            int j = i / bucketInterval;
            bucketArr[j][bucketIndex[j]++] = i;
        }

        //排序
        long l2 = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        forkJoinPool.invoke(new SortAction(0, bucketIndex.length));
        forkJoinPool.shutdown();

        //合并
        long l3 = System.currentTimeMillis();
        int[] ints = new int[array.length];
        for (int i = 0, n = 0; i < bucketIndex.length; i++) {
            System.arraycopy(bucketArr[i], 0, ints, n, bucketIndex[i]);
            n += bucketIndex[i];
        }

        //显示
        long l4 = System.currentTimeMillis();
        String format1 = formatNum(arrayMax) + " - " + formatNum(arrayMax);
        String format2 = formatNum(bucketSize) + " 个:";
        for (int i = 0, n = arrayMin; i < bucketIndex.length; i++) {
            System.out.printf(format1, n, n += bucketInterval);
            System.out.printf(format2, bucketIndex[i]);
//            for (int j = 0; j < bucketIndex[i]; j++) {
//                System.out.print(bucketArr[i][j] + " ");
//            }
            System.out.println();
        }

        System.out.println("分桶:" + (l2 - l1));
        System.out.println("排序:" + (l3 - l2));
        System.out.println("合并:" + (l4 - l3));
        System.out.println("总共:" + (l4 - l1));

        check(ints);
    }

    private static void check(int[] ints) {
        for (int i = 0; i < ints.length - 1; i++) {
            if (ints[i] > ints[i + 1]) {
                System.out.println(false);
                return;
            }
        }
        System.out.println(true);
    }

    private static String formatNum(int i) {
        return "%" + countDigits(i) + "d";
    }

    private static int countDigits(int i) {
        int count = 0;
        for (; i != 0; i /= 10) {
            count++;
        }
        return count;
    }

    private static class SortAction extends RecursiveAction {
        private final static int THRESHOLD = 200;
        private final int from, to;

        private SortAction(int from, int to) {
            this.from = from;
            this.to = to;
        }

        @Override
        protected void compute() {
            if (to - from < THRESHOLD) {
                for (int i = from; i < to; i++) {
                    Arrays.sort(bucketArr[i], 0, bucketIndex[i]);
                }
            } else {
                int mid = (from + to) >>> 1;
                SortAction l = new SortAction(from, mid);
                SortAction r = new SortAction(mid, to);
                ForkJoinTask.invokeAll(l, r);
            }
        }
    }
}
----
输出
...
分桶:1571
排序:1697
合并:94
总共:3362
true