05JDK并发包

JDK并发包

各种同步控制工具的使用

  • ReentrantLock
  • Condition
  • Semaphore
  • ReadWriteLock
  • CountDownLatch
  • CyclicBarrier
  • LockSupport
  • ReentrantLock的实现

并发容器及典型源码分析

各种同步控制工具的使用

最基本的同步工具有synchronized关键字和Object.wait,Object.notify

下面介绍一些更加高级的工具。功能上更强大一些;使用上,封装一些常用场景,方便大家使用;JDK大牛的实现比较稳定高效

ReentrantLock

ReentrantLock使用简单,但是功能比较薄弱。

jdk1.5之前ReentrantLock性能相对synchronized关键字较好;jdk1.6之后,jdk做了优化,两者性能不相上下。

如果只是简单功能的实现,没有必要追求使用比较复杂或者高级的功能。

ReentrantLock的特点相对synchronized, 除了实现了锁的功能,还有可重入,可中断,可限时,公平锁的功能或特点。

可重入

可重入:对于同一个线程,它必须是可重入的,否则有可能 将自己卡死。

synchronized可以通过语法检查,以及虚拟机实现的加锁与释放,保证了释放锁的正确性。ReentrantLock需要自己手动加锁和释放锁,提供了更大的灵活性,但有更大的危险性。

ReentrantLock使用

//...
class Xxx  implements Runnable{
    //...
        static ReentrantLock lock = new ReentrantLock();

    @Override
    public void run() {
        //...
        lock.lock();
        try {
            //...
        } finally {
            lock.unlock();
        }
    }

}

ReentrantLock同一线程可以获取多次锁(许可),但是获取几次就要释放几次。否则少释放锁时,其他线程会被阻塞。

比如:

        //...
        lock.lock();
        lock.lock();
        try {
            //...
        } finally {
            lock.unlock();
            lock.unlock();
        }

注意lock和unlock次数要一致。

可中断

由synchronized,是不响应中断的。 而ReentrantLock可被中断。

比如某个线程长期等待,想要停止它的等待,好的想法是给它发送中断信号。这时,就可以使用ReentrantLock. 加锁的同时,还会响应中断。

如果线程发生了死锁,或者意料之外的事情,线程在一个锁上卡很久, 给它发送一个中断信号,将其唤醒,不至于一直卡死下去。

可中断demo

import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.locks.ReentrantLock;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello world!");
        Thread thread0 = new TempThread(0, "thread0");
        Thread thread1 = new TempThread(1, "thread1");
        thread0.start();
        thread1.start();
//        thread0.join();
//        thread1.join();
        Thread.sleep(2000);
        DeadLockChecker.checkAndStop();
    }
}

class TempThread extends Thread {
    int flag = 0;
    static ReentrantLock lock0 = new ReentrantLock();
    static ReentrantLock lock1 = new ReentrantLock();

    public TempThread(int flag) {
        this.flag = flag;
    }

    public TempThread(int flag, String name) {
        super(name);
        this.flag = flag;
    }

    @Override
    public void run() {
        try {
            //以下尝试构造一个死锁情况
            if (flag % 2 == 0) {
                //获取锁lock0
//                lock0.lock();
                lock0.lockInterruptibly();
                try {
                    //休眠500毫秒
                    try {
                        sleep(500);
                    } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
                        System.out.println(Thread.currentThread().getName() + " with flag: " + flag + " interrupted in sleep().");
                    }
                    //尝试获取另一个锁
//                    lock1.lock();
                    lock1.lockInterruptibly();
                    lock1.unlock();
                } finally {
                    //释放锁lock0
                    lock0.unlock();
                }
            }
            else {
                //获取锁lock1
//                lock1.lock();
                lock1.lockInterruptibly();
                try {
                    //休眠500毫秒
                    try {
                        sleep(500);
                    } catch (InterruptedException e) {
//                    throw new RuntimeException(e);
                        System.out.println(Thread.currentThread().getName() + " with flag: " + flag + " interrupted in sleep().");
                    }
                    //尝试获取另一个锁
//                    lock0.lock();
                    lock0.lockInterruptibly();
                    lock0.unlock();
                } finally {
                    //释放锁lock1
                    lock1.unlock();
                }
            }
        } catch (InterruptedException exc) {
            System.out.println(Thread.currentThread().getName() + " with flag: " + flag + " interrupted in lockInterruptibly().");
        }
    }


}

class DeadLockChecker {
    private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();

    final static Runnable deadlockCheckRunnable = new Runnable() {
        @Override
        public void run() {
            long[] deadlockedThreads = mbean.findDeadlockedThreads();
            if (deadlockedThreads != null) {
                ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreads);
                for (Thread thread : Thread.getAllStackTraces().keySet()) {
                    for (int i = 0; i < threadInfos.length; i++) {
                        if (thread.getId() == threadInfos[i].getThreadId()) {
                            thread.interrupt();
                        }
                    }
                }
            }
        }
    };

    public static void checkAndStop() {
        Thread thread = new Thread(deadlockCheckRunnable);
        thread.start();
    }

}

可限时

获取锁时,允许指定等待时间。

demo:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Main implements Runnable{
    static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        System.out.println("Hello world!");

        Main runnable = new Main();
        Thread t1 = new Thread(runnable);
        Thread t2 = new Thread(runnable);
        t1.start();
        t2.start();
    }

    @Override
    public void run() {
        try {
            if(lock.tryLock(5, TimeUnit.SECONDS)) {
                System.out.println(Thread.currentThread().getName() + " tryLock() success");
                Thread.sleep(6000);
            } else {
                System.out.println(Thread.currentThread().getName() + " tryLock() timeout");
                //比如释放其他占用的资源
            }
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " interrupted by lock.tryLock() or Thread.sleep()");
        }
        finally {
            if(lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

公平锁

ReentrantLock可以开启公平锁。先来先到。默认不开启。

开启公平锁: new ReentrantLock(true)

Condition

ReentrantLock和Condition的关系有点像 synchronized 和 Object.wait/notify的关系

类似的,使用Condition之前,要先获得ReentrantLock.lock()锁。

await/signal/signalAll

static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();

使用demo

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class Main implements Runnable {
    static ReentrantLock lock = new ReentrantLock();
    static Condition condition = lock.newCondition();
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello world!");

        Runnable runnable = new Main();
        Thread thread1 = new Thread(runnable);
        thread1.start();

        Thread.sleep(2000);

        lock.lock();
        System.out.println(Thread.currentThread().getName() + " before condition.signalAll()");
        condition.signalAll();
        System.out.println(Thread.currentThread().getName() + " after condition.signalAll()");
        lock.unlock();
    }

    @Override
    public void run() {
        lock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " before condition.await()");
            condition.await();
        } catch (InterruptedException e) {
            //throw new RuntimeException(e);
            System.out.println(Thread.currentThread().getName() + " interrupted in condition.await()");
        }
        System.out.println(Thread.currentThread().getName() + " finished await");
        lock.unlock();
    }
}

注意,condition.await()允许响应中断。

condition.awaitUninterruptibly()不响应中断

Semaphore

Semaphore信号量。

ReentrantLock等显式锁/synchronized代表的隐式锁,这些“锁”, 是排他的,互斥的。同一时间内只有一个线程可以进入到临界区。

Semaphore允许一定数量的线程同时进入临界区,但是超过许可数量的线程则必须等待。共享锁。

当Semaphore允许的许可数量等于1时,就相当于一把锁。

信号量实现了一种对资源的分配。

semaphore.aquire() semaphore.aqureUninterruptibly() semaphore.tryAquire() semaphore.tryAquire(long timeout, TimeUnit unit) semaphore.release()

使用demo

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;

public class Main implements Runnable {
    static Semaphore semaphore = new Semaphore(5);
    public static void main(String[] args) {
        System.out.println("Hello world!");

        Runnable runnable = new Main();

        ExecutorService executor = Executors.newFixedThreadPool(20);
        for(int i = 0; i < 20 ; i++ ) {
            executor.submit(runnable);
        }

    }

    @Override
    public void run() {
        try {
//            semaphore.acquire();
            semaphore.acquire(2);
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " finished a job");
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " interrupted in semaphore.acquire() or Thread.sleep()");
        }
        finally {
//            semaphore.release();
            semaphore.release(2);
        }
    }
}

ReadWriteLock

读写锁ReadWriteLock

ReentrantLock和synchronized锁的问题: 不区分线程的功能的,只要可以获取锁,就可以进入临界区。

但是读和写是两种截然不同的操作。 进行读的线程操作时,写线程不允许操作; 进行写的线程操作时,读的线程不允许操作(读写互斥,防止读到中间状态的内容)。某个进行读的线程操作过程中,另一个线程也要进行读,此时应当允许,否则对性能会有一定的影响。两个线程都要进行写操作时,也要互斥。这就是读写锁要实现的功能。

读写锁将锁按功能划分为读锁和写锁。需要进行读操作的线程,使用读锁加锁;需要进行写操作的线程,使用写锁进行加锁。读锁与写锁互斥,读锁与读锁不互斥(共享)。

使用读写锁之后,程序的并行度可能会有提升。使用ReentrantLock或者synchronized进入临界区的线程最多有一个,而使用读写锁后,若加的是读锁,则允许有多个线程访问临界区,多个线程同时访问临界区资源。

多线程中使用ReentrantLock和synchronized属于阻塞的并发级别。使用ReadWriteLock且没有只用写锁时,是无等待的并发级别。

static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();

//...
readLock.lock();
//...
readLock.unlock();
//...


//...
writeLock.lock();
//...
writeLock.unlock();
//...

CountDownLatch

倒数计数器Count DownLatch

image-20230312180352877
static final CountDownLatch countDownLatch = new CountDownLatch(3);


//...
//thread1
countDownLatch.await();
//...


//...
//thread2
countDownLatch.countDown();
//...

//...
//thread3
countDownLatch.countDown();
//...

//...
//thread4
countDownLatch.countDown();
//...

使用demo

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main implements Runnable{
    static CountDownLatch countDownLatch = new CountDownLatch(10);
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello world!");

        Runnable runnable = new Main();

        ExecutorService exec = Executors.newFixedThreadPool(10);

        for(int i = 0 ; i < 10; i ++) {
            exec.submit(runnable);
        }

        countDownLatch.await();
        System.out.println("CountDownLatch finished. All work finished");

        exec.shutdown();
    }

    @Override
    public void run() {
        try {
            Thread.sleep(new Random().nextInt(10) * 1000);
            System.out.println(Thread.currentThread().getName() + " finished the work");
            countDownLatch.countDown();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

CyclicBarrier

循环栅栏CyclicBarrier

和CountDownLatch相比,CyclicBarrier可以多次启动。

public CyclicBarrier(int parties, Runnable barrierAction);  //barrierAction是当计数器一次完成计数时,系统会执行的动作 

await();
image-20230312183400243

使用demo

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Main {

    public static void main(String[] args) {
        System.out.println("Hello world!");

        BarrierAction barrierAction = new BarrierAction();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(10, barrierAction);

        Thread[] threads = new Thread[10];
        for(int i = 0; i < 10; i++) {
            threads[i] = new Thread(new Solider("solider" + i, cyclicBarrier));
            threads[i].start();
//            if(i == 5) {
//                threads[0].interrupt();
//            }
        }


    }
    static class BarrierAction implements Runnable {
        int step = 0;
        @Override
        public void run() {
            if(step == 0) {
                System.out.println("士兵报到/集合完毕");
            }else {
                System.out.println("士兵们任务完成");
            }
            step++;
        }
    }

    static class Solider implements Runnable {
        String soliderName;

        CyclicBarrier cyclicBarrier;

        public Solider(String name, CyclicBarrier cyclicBarrier) {
            this.soliderName = name;
            this.cyclicBarrier = cyclicBarrier;
        }
        @Override
        public void run() {
            //如果这里出错了,如何保证CyclicBarrier
            try {
                doArrive();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            try {
                System.out.println(soliderName + " 报到");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                System.out.println(soliderName + " 报到时被中断");
            } catch (BrokenBarrierException e) {
                System.out.println(soliderName + " 报到被取消,因为其他solider报到出现了问题");
            }

            //如果这里出错了,如何保证CyclicBarrier
            try {
                doSomeWork();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            try {
                System.out.println(soliderName + " 完成任务");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                System.out.println(soliderName + " 进行任务时被中断");
            } catch (BrokenBarrierException e) {
                System.out.println(soliderName + " 任务被取消,因为其他solider出现了问题");
            }

        }

        void doArrive() throws InterruptedException {
            Thread.sleep(new Random().nextInt(10) * 1000);
        }

        void doSomeWork() throws InterruptedException {
            Thread.sleep(new Random().nextInt(10) * 1000);
        }
    }
}

LockSupport

提供比较底层的操作原语。线程阻塞,线程唤醒。

LockSupport.park();
LockSupport.unpark(thread1);

LockSupport.park()方法与Thread.suspend()方法的的比较。Thread.suspend()官方已不建议使用,将来可能会废除,使用不当可能会导致线程冻结。

LockSuport的思想类似于信号量Semaphore。LockSupport.park()的时候申请一个许可,LockSupport.unpark()的时候,释放一个许可。如果unpark不幸发生在park之前,并不会将线程阻塞住。这和Thread.suspend()是不一样的.

LockSupport.park()操作允许被中断(会响应中断),但是并不会抛出中断异常,而且会保留中断标志。注意:LockSupport.park()并没有像Thread.sleep()那样清中断。

使用demo

import java.util.concurrent.locks.LockSupport;

public class Main {
    static final Object u = new Object();
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello world!");

        Thread t1 = new TmpThread();
        Thread t2 = new TmpThread();

        t1.start();
        Thread.sleep(1000);
        t2.start();
        LockSupport.unpark(t1);
        LockSupport.unpark(t2); //注意这里unpark实际可能发生在对应的park之前

        t1.join();
        t2.join();
    }

    static class TmpThread extends Thread {

        @Override
        public void run() {
            synchronized (u) {
                LockSupport.park();
            }
        }
    }
}

ReentrantLock的实现

重入锁是一个应用级的实现。而LockSupport才是系统级的实现。

从本质上来说,ReentrantLock的实现都是一些Java实现。

ReentrantLock的实现有三块是比较重要的:CAS状态,等待队列,park()

ReentrantLock的CAS状态

cas操作是一种无锁操作,这里的cas状态用来判断这里的锁有没有被别人占用过。如果我要拿这个锁, 就对相应的状态字段做一次cas(0,1), 如果没有被别人占用,修改就会成功。如果锁已经被别人占用了,修改就会失败。通过它是否修改成功,来返回我是否获取到了这把锁。

ReentrantLock的等待队列和park及unpark

ReentrantLock内部实现了一个等待队列。

如果某个线程获取锁失败了,对应的线程就应该进入等待队列。多个线程进入等待队列,需要进行排队(公平或者不公平)。进入等待队列的线程,都要进行LockSupport.park()操作,将其挂起。

当占用锁的线程,将这个锁unlock的时候,就会从等待队列中挑一个线程出来,进行unpark()

并发容器及典型源码分析

集合包装

有个实用的工具类Collections. 可以将非线程安全的集合类包装为线程安全的集合类。

HashMap是非线程安全的。可通过如下包装变为线程安全的。只适用于并发量比较小的情况。

public Map map = Collections.synchronizedMap(new HashMap());

高并发集合类ConcurrentHashMap

先说说HashMap的实现

HashMap是个数组table,里面存储了表项entry, entry里包含key和value以及next。 每个key进来,要放到哪个entry槽位里,是由hash算法决定的。但是hash算法并不能保证每个key被分配到不同的槽位当中。当出现hash冲突的时候,用next指向另一个被分配到同一个槽位中entry。

综上,HashMap是由数组+链表组成的。当HashMap发生大量hash冲突的时候,就退化为一个链表。

image-20230316220525170

再看看ConcurrentHashMap实现

从主题思想和接口上讲,HashMap和ConcurrentHashMap是一致的。

但是ConcurrentHashMap可能有多个线程同时访问,如果对整个对象加锁,会大大影响多线程性能。它将hashmap分成多个区段Segment,如果多个线程能进入不同的区段,加锁也只对各自区段加锁,将大大提高多线程性能。

ConcurrentHashMap.put(),进入到某个HashMap子区段中,继续put。ConcurrentHashMap继承ReentrantLock, 在子区段中多次使用tryLock(),尽量直接获取到锁,实在不行再lock(), 然后就保证了安全的操作,判断是否hash冲突等,考虑各种情况,放入hashmap子区段中,操作完成后,最后unlock()释放锁。

ConcurrentHashMap.size()需要汇总各个Segment子区段的大小,需要锁定所有的Segment。性能可能不会太高。(锁分离的弊端,获取全局数据,需要拿到所有的锁)

BlockingQueue阻塞队列

首先, 它是线程安全的队列。

队列为空,再取数据时,取数据的线程阻塞。

队列为慢,再放数据时,放数据的线程阻塞。

它是一个非常好的,在多个线程中共享数据的容器。

它不是为高性能服务的。只是提供了必要时阻塞的功能。

image-20230316234926287

BlockingQueue非常适合生产者-消费者情况。

ConcurrentLinkedQueue

高性能的,链表的,队列。

内部也是大量的使用无锁的算法实现的。

线程池的使用与源码分析

线程池的基本使用

扩展和增强线程池

线程池及其核心代码分析

线程池的基本使用

为什么需要线程池

JDK为我们提供了哪些支持

线程池的使用

线程池使用的小例子

为什么需要线程池

线程的创建和销毁也需要一定的代价(虽然相对进程的创建和销毁代价会小一点)。而且有时候重复创建线程这个过程是无效的(没有意义的)。如果每一次任务的提交,都要创建线程;每次任务的完成,都要销毁线程;对于业务来说,这个线程的创建和销毁是和业务没有关系的。我只关心线程执行的任务。因此我们可能希望尽可能多的把cpu用在任务的执行上,而不是用在辅助性质的线程的创建和销毁上。

然后有了线程池这个东西。线程复用。

如果有100个任务需要执行。如果每次都创建线程,线程需要前前后后创建和销毁100次。也可以使用线程池,线程池里放10个线程工作,这十个线程是不退出的。如果有任务的时候,它们会执行任务;如果没有任务,就会阻塞。

JDK内置线程池

image-20230317220627890

Executor可以执行Runnable中的方法,线程池较为顶层的接口

ExecutorService提供了额外的方法,submit可以执行Callable中的方法

ThreadPoolExcutor是线程池的重要实现,执行者。

Executors是工厂类,有一些工厂方法创建线程池

线程池的种类

newFixedThreadPool,固定大小的线程池

newSingleThreadPool,只有一个线程的线程池

newCachedThreadPool,高峰期扩张线程池大小,低负载时缩减线程的线程池

newScheduledThreadPool,带计划任务功能的线程池

都是调用new ThreadPoolExecutor实现的。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters, the
     * {@linkplain Executors#defaultThreadFactory default thread factory}
     * and the {@linkplain ThreadPoolExecutor.AbortPolicy
     * default rejected execution handler}.
     *
     * <p>It may be more convenient to use one of the {@link Executors}
     * factory methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
看一下Executors.newCachedThreadPool的代码
    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

先说一下BlockingQueue,这是一种队列。当队列长度小于某个最小值时,线程再从队列取数据时,就会被阻塞;当队列长度大于某个最大值时,线程再向队列放数据时,也会被阻塞。(当队列不满或者不过短时,也会通知哪些等待的线程 )

这里用到了SynchronousQueue, 这是一个最大允许队列长度为0的BlockingQueue的实现。若某一个线程要往队列放元素时,此时没有线程正要或者正在等待从队列中取元素,就阻塞这个线程;反之亦然(若某一个线程要从队列中取元素时,此时没有线程正要或者正在等待从队列中取元素,就阻塞这个线程)。比较适合两个线程间hand-off数据。

当有新任务提交时,进入SynchronousQueue,如果没有取任务的线程等待(空闲线程),就会创建新线程。如果有空闲线程阻塞在这里(还没有超时消亡),就会直接使用空闲线程。

线程池使用的例子

简单使用的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        System.out.println("Hello world!");

        MyTask task = new MyTask();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        for(int i = 0; i < 1000; i++) {
            executor.submit(task);
//            executor.execute();
        }

        executor.shutdown();
    }

    static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println("当前时间:" + System.currentTimeMillis() + "\t当前线程:"+ Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

计划任务线程池的使用

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        System.out.println("Hello world!");

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
        MyTask task = new MyTask();
        //每隔2s执行一次,但是每次执行都会花费1s。所以实际上这里运行起来是3秒一次。
        scheduledExecutorService.scheduleWithFixedDelay(task, 0, 2, TimeUnit.SECONDS);
    }

    static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis());
            //每次执行都会花费1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

        }
    }
}

ExecutorService的十个使用技巧

ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析

扩展和增强线程池

重载方法或者回调方法

线程池ThreadPoolExecutor提供了一些可以重载的方法和回调接口,可用于增强和扩展线程池功能。

重载线程池中某些方法的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        MyTask task = new MyTask();
        //匿名子类实现,覆写了三个方法
        ExecutorService exec = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("beforeExecute: 当前线程是:" + Thread.currentThread().getName() + "\t准备用来执行任务的线程是:" + t.getName() + "\t将要执行的任务是:" + r.toString());
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t)  {
                System.out.println("afterExecute: 当前线程是:" + Thread.currentThread().getName() + "\t执行完的任务是:" + r.toString() + "\t可能有异常抛出:" + (t == null ? "" : t.toString()));
            }

            @Override
            protected void terminated() {
                System.out.println("terminated:线程池终结");
            }
        };
        for(int i = 0; i < 10; i++ )
            exec.submit(task);

        exec.shutdown();
    }

    static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println("当前线程是:" + Thread.currentThread().getName() + "\t当前时间是:" + System.currentTimeMillis() + "\t正在执行的任务是:" + this.toString());
        }
    }
}

拒绝策略

高负载时,队列可能过大,内存激增,可能OOM(out of memory)。

当负载过大的时候,可能需要丢弃一些任务,而不是放在内存当中。丢弃任务也不应当是直接丢弃,可能需要记下来,丢了几个,丢了什么任务。

拒绝策略RejectedExecutionHandler. 有以下几种拒绝策略:

AbortPolicy, 抛出异常

DiscardPolicy,什么都不做,直接丢掉,不报错,不提示

CallerRunsPolicy,提交的任务,被拒绝时,交由提交任务的线程来做

DiscardOldestPolicy,也是丢弃任务,但是丢的是,最老的没有处理的请求

拒绝策略使用例子:

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        System.out.println("Hello world!");


        MyTask task = new MyTask();

        ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
//                new SynchronousQueue<>(),//同步队列,队列长度为0。如果线程都繁忙,没有工作线程从队列中取任务时; (非阻塞方式提交)新提交的任务会失败,触发拒绝策略
                new LinkedBlockingQueue<>(20),
                Executors.defaultThreadFactory(),
                new MyDiscardExecutionHandler());

        for(int i = 0; i < 1000; i++) {
            exec.submit(task);
        }

        exec.shutdown();
    }

    static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println("任务正在执行。执行线程:" + Thread.currentThread().getName());
            //假设任务执行消耗100ms
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static class MyDiscardExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("Runnable : " + r + " is discard.");
        }
    }
}

线程工厂

ThreadFactory

相关源码

public interface ThreadFactory {
    Thread newThread(Runnable r);
}


public class Executors {
    //...
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }

    //...
    //这是Executors类的内部类
    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            @SuppressWarnings("removal")
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    //...
}

线程池及其核心源码

image-20230317220627890

Executor接口

This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc.


package java.util.concurrent;

/**
 * An object that executes submitted {@link Runnable} tasks. This
 * interface provides a way of decoupling task submission from the
 * mechanics of how each task will be run, including details of thread
 * use, scheduling, etc.  An {@code Executor} is normally used
 * instead of explicitly creating threads. For example, rather than
 * invoking {@code new Thread(new RunnableTask()).start()} for each
 * of a set of tasks, you might use:
 *
 * <pre> {@code
 * Executor executor = anExecutor();
 * executor.execute(new RunnableTask1());
 * executor.execute(new RunnableTask2());
 * ...}</pre>
 *
 * However, the {@code Executor} interface does not strictly require
 * that execution be asynchronous. 
 * ...
 */
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

ExecutorService接口

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
 *
 * This code is free software; you can redistribute it and/or modify it
 * under the terms of the GNU General Public License version 2 only, as
 * published by the Free Software Foundation.  Oracle designates this
 * particular file as subject to the "Classpath" exception as provided
 * by Oracle in the LICENSE file that accompanied this code.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
 * version 2 for more details (a copy is included in the LICENSE file that
 * accompanied this code).
 *
 * You should have received a copy of the GNU General Public License version
 * 2 along with this work; if not, write to the Free Software Foundation,
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
 *
 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
 * or visit www.oracle.com if you need additional information or have any
 * questions.
 */

/*
 * This file is available under and governed by the GNU General Public
 * License version 2 only, as published by the Free Software Foundation.
 * However, the following notice accompanied the original version of this
 * file:
 *
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/publicdomain/zero/1.0/
 */

package java.util.concurrent;

import java.util.Collection;
import java.util.List;

/**
 * An {@link Executor} that provides methods to manage termination and
 * methods that can produce a {@link Future} for tracking progress of
 * one or more asynchronous tasks.
 *
 * <p>An {@code ExecutorService} can be shut down, which will cause
 * it to reject new tasks.  Two different methods are provided for
 * shutting down an {@code ExecutorService}. The {@link #shutdown}
 * method will allow previously submitted tasks to execute before
 * terminating, while the {@link #shutdownNow} method prevents waiting
 * tasks from starting and attempts to stop currently executing tasks.
 * Upon termination, an executor has no tasks actively executing, no
 * tasks awaiting execution, and no new tasks can be submitted.  An
 * unused {@code ExecutorService} should be shut down to allow
 * reclamation of its resources.
 *
 * <p>Method {@code submit} extends base method {@link
 * Executor#execute(Runnable)} by creating and returning a {@link Future}
 * that can be used to cancel execution and/or wait for completion.
 * Methods {@code invokeAny} and {@code invokeAll} perform the most
 * commonly useful forms of bulk execution, executing a collection of
 * tasks and then waiting for at least one, or all, to
 * complete. (Class {@link ExecutorCompletionService} can be used to
 * write customized variants of these methods.)
 *
 * <p>The {@link Executors} class provides factory methods for the
 * executor services provided in this package.
 *
 * <h2>Usage Examples</h2>
 *
 * Here is a sketch of a network service in which threads in a thread
 * pool service incoming requests. It uses the preconfigured {@link
 * Executors#newFixedThreadPool} factory method:
 *
 * <pre> {@code
 * class NetworkService implements Runnable {
 *   private final ServerSocket serverSocket;
 *   private final ExecutorService pool;
 *
 *   public NetworkService(int port, int poolSize)
 *       throws IOException {
 *     serverSocket = new ServerSocket(port);
 *     pool = Executors.newFixedThreadPool(poolSize);
 *   }
 *
 *   public void run() { // run the service
 *     try {
 *       for (;;) {
 *         pool.execute(new Handler(serverSocket.accept()));
 *       }
 *     } catch (IOException ex) {
 *       pool.shutdown();
 *     }
 *   }
 * }
 *
 * class Handler implements Runnable {
 *   private final Socket socket;
 *   Handler(Socket socket) { this.socket = socket; }
 *   public void run() {
 *     // read and service request on socket
 *   }
 * }}</pre>
 *
 * The following method shuts down an {@code ExecutorService} in two phases,
 * first by calling {@code shutdown} to reject incoming tasks, and then
 * calling {@code shutdownNow}, if necessary, to cancel any lingering tasks:
 *
 * <pre> {@code
 * void shutdownAndAwaitTermination(ExecutorService pool) {
 *   pool.shutdown(); // Disable new tasks from being submitted
 *   try {
 *     // Wait a while for existing tasks to terminate
 *     if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
 *       pool.shutdownNow(); // Cancel currently executing tasks
 *       // Wait a while for tasks to respond to being cancelled
 *       if (!pool.awaitTermination(60, TimeUnit.SECONDS))
 *           System.err.println("Pool did not terminate");
 *     }
 *   } catch (InterruptedException ex) {
 *     // (Re-)Cancel if current thread also interrupted
 *     pool.shutdownNow();
 *     // Preserve interrupt status
 *     Thread.currentThread().interrupt();
 *   }
 * }}</pre>
 *
 * <p>Memory consistency effects: Actions in a thread prior to the
 * submission of a {@code Runnable} or {@code Callable} task to an
 * {@code ExecutorService}
 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 * any actions taken by that task, which in turn <i>happen-before</i> the
 * result is retrieved via {@code Future.get()}.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface ExecutorService extends Executor {

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    void shutdown();

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  For example, typical
     * implementations will cancel via {@link Thread#interrupt}, so any
     * task that fails to respond to interrupts may never terminate.
     *
     * @return list of tasks that never commenced execution
     * @throws SecurityException if a security manager exists and
     *         shutting down this ExecutorService may manipulate
     *         threads that the caller is not permitted to modify
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")},
     *         or the security manager's {@code checkAccess} method
     *         denies access.
     */
    List<Runnable> shutdownNow();

    /**
     * Returns {@code true} if this executor has been shut down.
     *
     * @return {@code true} if this executor has been shut down
     */
    boolean isShutdown();

    /**
     * Returns {@code true} if all tasks have completed following shut down.
     * Note that {@code isTerminated} is never {@code true} unless
     * either {@code shutdown} or {@code shutdownNow} was called first.
     *
     * @return {@code true} if all tasks have completed following shut down
     */
    boolean isTerminated();

    /**
     * Blocks until all tasks have completed execution after a shutdown
     * request, or the timeout occurs, or the current thread is
     * interrupted, whichever happens first.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return {@code true} if this executor terminated and
     *         {@code false} if the timeout elapsed before termination
     * @throws InterruptedException if interrupted while waiting
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Submits a value-returning task for execution and returns a
     * Future representing the pending results of the task. The
     * Future's {@code get} method will return the task's result upon
     * successful completion.
     *
     * <p>
     * If you would like to immediately block waiting
     * for a task, you can use constructions of the form
     * {@code result = exec.submit(aCallable).get();}
     *
     * <p>Note: The {@link Executors} class includes a set of methods
     * that can convert some other common closure-like objects,
     * for example, {@link java.security.PrivilegedAction} to
     * {@link Callable} form so they can be submitted.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return the given result upon successful completion.
     *
     * @param task the task to submit
     * @param result the result to return
     * @param <T> the type of the result
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return {@code null} upon <em>successful</em> completion.
     *
     * @param task the task to submit
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    Future<?> submit(Runnable task);

    /**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results when all complete.
     * {@link Future#isDone} is {@code true} for each
     * element of the returned list.
     * Note that a <em>completed</em> task could have
     * terminated either normally or by throwing an exception.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param <T> the type of the values returned from the tasks
     * @return a list of Futures representing the tasks, in the same
     *         sequential order as produced by the iterator for the
     *         given task list, each of which has completed
     * @throws InterruptedException if interrupted while waiting, in
     *         which case unfinished tasks are cancelled
     * @throws NullPointerException if tasks or any of its elements are {@code null}
     * @throws RejectedExecutionException if any task cannot be
     *         scheduled for execution
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     * Executes the given tasks, returning a list of Futures holding
     * their status and results
     * when all complete or the timeout expires, whichever happens first.
     * {@link Future#isDone} is {@code true} for each
     * element of the returned list.
     * Upon return, tasks that have not completed are cancelled.
     * Note that a <em>completed</em> task could have
     * terminated either normally or by throwing an exception.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @param <T> the type of the values returned from the tasks
     * @return a list of Futures representing the tasks, in the same
     *         sequential order as produced by the iterator for the
     *         given task list. If the operation did not time out,
     *         each task will have completed. If it did time out, some
     *         of these tasks will not have completed.
     * @throws InterruptedException if interrupted while waiting, in
     *         which case unfinished tasks are cancelled
     * @throws NullPointerException if tasks, any of its elements, or
     *         unit are {@code null}
     * @throws RejectedExecutionException if any task cannot be scheduled
     *         for execution
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Executes the given tasks, returning the result
     * of one that has completed successfully (i.e., without throwing
     * an exception), if any do. Upon normal or exceptional return,
     * tasks that have not completed are cancelled.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param <T> the type of the values returned from the tasks
     * @return the result returned by one of the tasks
     * @throws InterruptedException if interrupted while waiting
     * @throws NullPointerException if tasks or any element task
     *         subject to execution is {@code null}
     * @throws IllegalArgumentException if tasks is empty
     * @throws ExecutionException if no task successfully completes
     * @throws RejectedExecutionException if tasks cannot be scheduled
     *         for execution
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * Executes the given tasks, returning the result
     * of one that has completed successfully (i.e., without throwing
     * an exception), if any do before the given timeout elapses.
     * Upon normal or exceptional return, tasks that have not
     * completed are cancelled.
     * The results of this method are undefined if the given
     * collection is modified while this operation is in progress.
     *
     * @param tasks the collection of tasks
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @param <T> the type of the values returned from the tasks
     * @return the result returned by one of the tasks
     * @throws InterruptedException if interrupted while waiting
     * @throws NullPointerException if tasks, or unit, or any element
     *         task subject to execution is {@code null}
     * @throws TimeoutException if the given timeout elapses before
     *         any task successfully completes
     * @throws ExecutionException if no task successfully completes
     * @throws RejectedExecutionException if tasks cannot be scheduled
     *         for execution
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

ThreadPoolExecutor类

略,以及相应的Abstract类

ForkJoin

主要关注两个类: RecursiveAction,无返回值 RecursiveTask,有返回值

image-20230320230318736

工作窃取:ForkJoinPool有意思的设计是,线程取任务的时候,先从任选一个其他线程任务队列,取任务执行(帮其他队列执行任务),再执行一个自己工作队列中的任务。 这种设计有利于减少线程饥饿。

image-20230320230132200

将多个字段位打包到一个基础类型中,可以进行执行多字段位的同时更新。 因为OS级别只存在基本类型的CAS,不支持多个变量的CAS同时进行。

使用demo

比如要实现1+2++…+1000

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class Main2 {

    static class MyTask extends RecursiveTask<Long> {
        final int THREADHOLD = 100;
        long start;
        long end;

        public MyTask(long start, long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            long count = 0;
            if (end - start < THREADHOLD) {
                for (long i = start; i <= end; i++) {
                    count += i;
                    //模拟其他工作,每次用时10ms
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                return count;
            }

            //分成多个小任务
//            long step = (end - start) / THREADHOLD;
            long step = THREADHOLD;
            List<MyTask> subTasks = new ArrayList<>();
            for (long i = start; i <= end; i+=step) {
                long s = i;
                long e = Math.min(i + step - 1, end);
                MyTask t = new MyTask(s, e);
                subTasks.add(t);
                t.fork();

            }
            for (MyTask t : subTasks)
                count += t.join();
            return count;
        }
    }

    public static void main(String[] args) {
        long sum = 0;
        long startTime = System.currentTimeMillis();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        MyTask task = new MyTask(1, 1000);
        try {
            sum = forkJoinPool.submit(task).get();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }

        long endTime = System.currentTimeMillis();
        System.out.println(sum);
        long duration = endTime - startTime;
        System.out.println("用时" + duration + "ms");
    }
}

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注