JDK并发包
各种同步控制工具的使用
- ReentrantLock
- Condition
- Semaphore
- ReadWriteLock
- CountDownLatch
- CyclicBarrier
- LockSupport
- ReentrantLock的实现
并发容器及典型源码分析
各种同步控制工具的使用
最基本的同步工具有synchronized关键字和Object.wait,Object.notify
下面介绍一些更加高级的工具。功能上更强大一些;使用上,封装一些常用场景,方便大家使用;JDK大牛的实现比较稳定高效
ReentrantLock
ReentrantLock使用简单,但是功能比较薄弱。
jdk1.5之前ReentrantLock性能相对synchronized关键字较好;jdk1.6之后,jdk做了优化,两者性能不相上下。
如果只是简单功能的实现,没有必要追求使用比较复杂或者高级的功能。
ReentrantLock的特点相对synchronized, 除了实现了锁的功能,还有可重入,可中断,可限时,公平锁的功能或特点。
可重入
可重入:对于同一个线程,它必须是可重入的,否则有可能 将自己卡死。
synchronized可以通过语法检查,以及虚拟机实现的加锁与释放,保证了释放锁的正确性。ReentrantLock需要自己手动加锁和释放锁,提供了更大的灵活性,但有更大的危险性。
ReentrantLock使用
//...
class Xxx implements Runnable{
//...
static ReentrantLock lock = new ReentrantLock();
@Override
public void run() {
//...
lock();
lock.try {
//...
finally {
} unlock();
lock.
}
}
}
ReentrantLock同一线程可以获取多次锁(许可),但是获取几次就要释放几次。否则少释放锁时,其他线程会被阻塞。
比如:
//...
lock();
lock.lock();
lock.try {
//...
finally {
} unlock();
lock.unlock();
lock. }
注意lock和unlock次数要一致。
可中断
由synchronized,是不响应中断的。 而ReentrantLock可被中断。
比如某个线程长期等待,想要停止它的等待,好的想法是给它发送中断信号。这时,就可以使用ReentrantLock. 加锁的同时,还会响应中断。
如果线程发生了死锁,或者意料之外的事情,线程在一个锁上卡很久, 给它发送一个中断信号,将其唤醒,不至于一直卡死下去。
可中断demo
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
public static void main(String[] args) throws InterruptedException {
System.out.println("Hello world!");
Thread thread0 = new TempThread(0, "thread0");
Thread thread1 = new TempThread(1, "thread1");
start();
thread0.start();
thread1.// thread0.join();
// thread1.join();
Thread.sleep(2000);
checkAndStop();
DeadLockChecker.
}
}
class TempThread extends Thread {
int flag = 0;
static ReentrantLock lock0 = new ReentrantLock();
static ReentrantLock lock1 = new ReentrantLock();
public TempThread(int flag) {
this.flag = flag;
}
public TempThread(int flag, String name) {
super(name);
this.flag = flag;
}
@Override
public void run() {
try {
//以下尝试构造一个死锁情况
if (flag % 2 == 0) {
//获取锁lock0
// lock0.lock();
lockInterruptibly();
lock0.try {
//休眠500毫秒
try {
sleep(500);
catch (InterruptedException e) {
} // throw new RuntimeException(e);
System.out.println(Thread.currentThread().getName() + " with flag: " + flag + " interrupted in sleep().");
}//尝试获取另一个锁
// lock1.lock();
lockInterruptibly();
lock1.unlock();
lock1.finally {
} //释放锁lock0
unlock();
lock0.
}
}else {
//获取锁lock1
// lock1.lock();
lockInterruptibly();
lock1.try {
//休眠500毫秒
try {
sleep(500);
catch (InterruptedException e) {
} // throw new RuntimeException(e);
System.out.println(Thread.currentThread().getName() + " with flag: " + flag + " interrupted in sleep().");
}//尝试获取另一个锁
// lock0.lock();
lockInterruptibly();
lock0.unlock();
lock0.finally {
} //释放锁lock1
unlock();
lock1.
}
}catch (InterruptedException exc) {
} System.out.println(Thread.currentThread().getName() + " with flag: " + flag + " interrupted in lockInterruptibly().");
}
}
}
class DeadLockChecker {
private final static ThreadMXBean mbean = ManagementFactory.getThreadMXBean();
final static Runnable deadlockCheckRunnable = new Runnable() {
@Override
public void run() {
long[] deadlockedThreads = mbean.findDeadlockedThreads();
if (deadlockedThreads != null) {
ThreadInfo[] threadInfos = mbean.getThreadInfo(deadlockedThreads);
for (Thread thread : Thread.getAllStackTraces().keySet()) {
for (int i = 0; i < threadInfos.length; i++) {
if (thread.getId() == threadInfos[i].getThreadId()) {
interrupt();
thread.
}
}
}
}
}
};
public static void checkAndStop() {
Thread thread = new Thread(deadlockCheckRunnable);
start();
thread.
}
}
可限时
获取锁时,允许指定等待时间。
demo:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class Main implements Runnable{
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
System.out.println("Hello world!");
new Main();
Main runnable = Thread t1 = new Thread(runnable);
Thread t2 = new Thread(runnable);
start();
t1.start();
t2.
}
@Override
public void run() {
try {
if(lock.tryLock(5, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " tryLock() success");
Thread.sleep(6000);
else {
} System.out.println(Thread.currentThread().getName() + " tryLock() timeout");
//比如释放其他占用的资源
}catch (InterruptedException e) {
} System.out.println(Thread.currentThread().getName() + " interrupted by lock.tryLock() or Thread.sleep()");
}finally {
if(lock.isHeldByCurrentThread()) {
unlock();
lock.
}
}
} }
公平锁
ReentrantLock可以开启公平锁。先来先到。默认不开启。
开启公平锁: new ReentrantLock(true)
Condition
ReentrantLock和Condition的关系有点像 synchronized 和 Object.wait/notify的关系
类似的,使用Condition之前,要先获得ReentrantLock.lock()锁。
await/signal/signalAll
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
使用demo
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class Main implements Runnable {
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
System.out.println("Hello world!");
Runnable runnable = new Main();
Thread thread1 = new Thread(runnable);
start();
thread1.
Thread.sleep(2000);
lock();
lock.System.out.println(Thread.currentThread().getName() + " before condition.signalAll()");
signalAll();
condition.System.out.println(Thread.currentThread().getName() + " after condition.signalAll()");
unlock();
lock.
}
@Override
public void run() {
lock();
lock.try {
System.out.println(Thread.currentThread().getName() + " before condition.await()");
await();
condition.catch (InterruptedException e) {
} //throw new RuntimeException(e);
System.out.println(Thread.currentThread().getName() + " interrupted in condition.await()");
}System.out.println(Thread.currentThread().getName() + " finished await");
unlock();
lock.
} }
注意,condition.await()允许响应中断。
condition.awaitUninterruptibly()不响应中断
Semaphore
Semaphore信号量。
ReentrantLock等显式锁/synchronized代表的隐式锁,这些“锁”, 是排他的,互斥的。同一时间内只有一个线程可以进入到临界区。
Semaphore允许一定数量的线程同时进入临界区,但是超过许可数量的线程则必须等待。共享锁。
当Semaphore允许的许可数量等于1时,就相当于一把锁。
信号量实现了一种对资源的分配。
semaphore.aquire() semaphore.aqureUninterruptibly() semaphore.tryAquire() semaphore.tryAquire(long timeout, TimeUnit unit) semaphore.release()
使用demo
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
public class Main implements Runnable {
static Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
System.out.println("Hello world!");
Runnable runnable = new Main();
ExecutorService executor = Executors.newFixedThreadPool(20);
for(int i = 0; i < 20 ; i++ ) {
submit(runnable);
executor.
}
}
@Override
public void run() {
try {
// semaphore.acquire();
acquire(2);
semaphore.Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + " finished a job");
catch (InterruptedException e) {
} System.out.println(Thread.currentThread().getName() + " interrupted in semaphore.acquire() or Thread.sleep()");
}finally {
// semaphore.release();
release(2);
semaphore.
}
} }
ReadWriteLock
读写锁ReadWriteLock
ReentrantLock和synchronized锁的问题: 不区分线程的功能的,只要可以获取锁,就可以进入临界区。
但是读和写是两种截然不同的操作。 进行读的线程操作时,写线程不允许操作; 进行写的线程操作时,读的线程不允许操作(读写互斥,防止读到中间状态的内容)。某个进行读的线程操作过程中,另一个线程也要进行读,此时应当允许,否则对性能会有一定的影响。两个线程都要进行写操作时,也要互斥。这就是读写锁要实现的功能。
读写锁将锁按功能划分为读锁和写锁。需要进行读操作的线程,使用读锁加锁;需要进行写操作的线程,使用写锁进行加锁。读锁与写锁互斥,读锁与读锁不互斥(共享)。
使用读写锁之后,程序的并行度可能会有提升。使用ReentrantLock或者synchronized进入临界区的线程最多有一个,而使用读写锁后,若加的是读锁,则允许有多个线程访问临界区,多个线程同时访问临界区资源。
多线程中使用ReentrantLock和synchronized属于阻塞的并发级别。使用ReadWriteLock且没有只用写锁时,是无等待的并发级别。
static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
//...
lock();
readLock.//...
unlock();
readLock.//...
//...
lock();
writeLock.//...
unlock();
writeLock.//...
CountDownLatch
倒数计数器Count DownLatch

static final CountDownLatch countDownLatch = new CountDownLatch(3);
//...
//thread1
await();
countDownLatch.//...
//...
//thread2
countDown();
countDownLatch.//...
//...
//thread3
countDown();
countDownLatch.//...
//...
//thread4
countDown();
countDownLatch.//...
使用demo
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main implements Runnable{
static CountDownLatch countDownLatch = new CountDownLatch(10);
public static void main(String[] args) throws InterruptedException {
System.out.println("Hello world!");
Runnable runnable = new Main();
ExecutorService exec = Executors.newFixedThreadPool(10);
for(int i = 0 ; i < 10; i ++) {
submit(runnable);
exec.
}
await();
countDownLatch.System.out.println("CountDownLatch finished. All work finished");
shutdown();
exec.
}
@Override
public void run() {
try {
Thread.sleep(new Random().nextInt(10) * 1000);
System.out.println(Thread.currentThread().getName() + " finished the work");
countDown();
countDownLatch.catch (InterruptedException e) {
} throw new RuntimeException(e);
}
} }
CyclicBarrier
循环栅栏CyclicBarrier
和CountDownLatch相比,CyclicBarrier可以多次启动。
public CyclicBarrier(int parties, Runnable barrierAction); //barrierAction是当计数器一次完成计数时,系统会执行的动作
await();

使用demo
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
new BarrierAction();
BarrierAction barrierAction = CyclicBarrier cyclicBarrier = new CyclicBarrier(10, barrierAction);
Thread[] threads = new Thread[10];
for(int i = 0; i < 10; i++) {
new Thread(new Solider("solider" + i, cyclicBarrier));
threads[i] = start();
threads[i].// if(i == 5) {
// threads[0].interrupt();
// }
}
}static class BarrierAction implements Runnable {
int step = 0;
@Override
public void run() {
if(step == 0) {
System.out.println("士兵报到/集合完毕");
else {
}System.out.println("士兵们任务完成");
}
step++;
}
}
static class Solider implements Runnable {
String soliderName;
CyclicBarrier cyclicBarrier;
public Solider(String name, CyclicBarrier cyclicBarrier) {
this.soliderName = name;
this.cyclicBarrier = cyclicBarrier;
}@Override
public void run() {
//如果这里出错了,如何保证CyclicBarrier
try {
doArrive();
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
try {
System.out.println(soliderName + " 报到");
await();
cyclicBarrier.catch (InterruptedException e) {
} System.out.println(soliderName + " 报到时被中断");
catch (BrokenBarrierException e) {
} System.out.println(soliderName + " 报到被取消,因为其他solider报到出现了问题");
}
//如果这里出错了,如何保证CyclicBarrier
try {
doSomeWork();
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
try {
System.out.println(soliderName + " 完成任务");
await();
cyclicBarrier.catch (InterruptedException e) {
} System.out.println(soliderName + " 进行任务时被中断");
catch (BrokenBarrierException e) {
} System.out.println(soliderName + " 任务被取消,因为其他solider出现了问题");
}
}
void doArrive() throws InterruptedException {
Thread.sleep(new Random().nextInt(10) * 1000);
}
void doSomeWork() throws InterruptedException {
Thread.sleep(new Random().nextInt(10) * 1000);
}
} }
LockSupport
提供比较底层的操作原语。线程阻塞,线程唤醒。
LockSupport.park();
LockSupport.unpark(thread1);
LockSupport.park()方法与Thread.suspend()方法的的比较。Thread.suspend()官方已不建议使用,将来可能会废除,使用不当可能会导致线程冻结。
LockSuport的思想类似于信号量Semaphore。LockSupport.park()的时候申请一个许可,LockSupport.unpark()的时候,释放一个许可。如果unpark不幸发生在park之前,并不会将线程阻塞住。这和Thread.suspend()是不一样的.
LockSupport.park()操作允许被中断(会响应中断),但是并不会抛出中断异常,而且会保留中断标志。注意:LockSupport.park()并没有像Thread.sleep()那样清中断。
使用demo
import java.util.concurrent.locks.LockSupport;
public class Main {
static final Object u = new Object();
public static void main(String[] args) throws InterruptedException {
System.out.println("Hello world!");
Thread t1 = new TmpThread();
Thread t2 = new TmpThread();
start();
t1.Thread.sleep(1000);
start();
t2.LockSupport.unpark(t1);
LockSupport.unpark(t2); //注意这里unpark实际可能发生在对应的park之前
join();
t1.join();
t2.
}
static class TmpThread extends Thread {
@Override
public void run() {
synchronized (u) {
LockSupport.park();
}
}
} }
ReentrantLock的实现
重入锁是一个应用级的实现。而LockSupport才是系统级的实现。
从本质上来说,ReentrantLock的实现都是一些Java实现。
ReentrantLock的实现有三块是比较重要的:CAS状态,等待队列,park()
ReentrantLock的CAS状态
cas操作是一种无锁操作,这里的cas状态用来判断这里的锁有没有被别人占用过。如果我要拿这个锁, 就对相应的状态字段做一次cas(0,1), 如果没有被别人占用,修改就会成功。如果锁已经被别人占用了,修改就会失败。通过它是否修改成功,来返回我是否获取到了这把锁。
ReentrantLock的等待队列和park及unpark
ReentrantLock内部实现了一个等待队列。
如果某个线程获取锁失败了,对应的线程就应该进入等待队列。多个线程进入等待队列,需要进行排队(公平或者不公平)。进入等待队列的线程,都要进行LockSupport.park()
操作,将其挂起。
当占用锁的线程,将这个锁unlock的时候,就会从等待队列中挑一个线程出来,进行unpark()
并发容器及典型源码分析
集合包装
有个实用的工具类Collections. 可以将非线程安全的集合类包装为线程安全的集合类。
HashMap是非线程安全的。可通过如下包装变为线程安全的。只适用于并发量比较小的情况。
public Map map = Collections.synchronizedMap(new HashMap());
高并发集合类ConcurrentHashMap
先说说HashMap的实现
HashMap是个数组table,里面存储了表项entry, entry里包含key和value以及next。 每个key进来,要放到哪个entry槽位里,是由hash算法决定的。但是hash算法并不能保证每个key被分配到不同的槽位当中。当出现hash冲突的时候,用next指向另一个被分配到同一个槽位中entry。
综上,HashMap是由数组+链表组成的。当HashMap发生大量hash冲突的时候,就退化为一个链表。

再看看ConcurrentHashMap实现
从主题思想和接口上讲,HashMap和ConcurrentHashMap是一致的。
但是ConcurrentHashMap可能有多个线程同时访问,如果对整个对象加锁,会大大影响多线程性能。它将hashmap分成多个区段Segment,如果多个线程能进入不同的区段,加锁也只对各自区段加锁,将大大提高多线程性能。
ConcurrentHashMap.put(),进入到某个HashMap子区段中,继续put。ConcurrentHashMap继承ReentrantLock, 在子区段中多次使用tryLock(),尽量直接获取到锁,实在不行再lock(), 然后就保证了安全的操作,判断是否hash冲突等,考虑各种情况,放入hashmap子区段中,操作完成后,最后unlock()释放锁。
ConcurrentHashMap.size()需要汇总各个Segment子区段的大小,需要锁定所有的Segment。性能可能不会太高。(锁分离的弊端,获取全局数据,需要拿到所有的锁)
BlockingQueue阻塞队列
首先, 它是线程安全的队列。
队列为空,再取数据时,取数据的线程阻塞。
队列为慢,再放数据时,放数据的线程阻塞。
它是一个非常好的,在多个线程中共享数据的容器。
它不是为高性能服务的。只是提供了必要时阻塞的功能。

BlockingQueue非常适合生产者-消费者情况。
ConcurrentLinkedQueue
高性能的,链表的,队列。
内部也是大量的使用无锁的算法实现的。
线程池的使用与源码分析
线程池的基本使用
扩展和增强线程池
线程池及其核心代码分析
线程池的基本使用
为什么需要线程池
JDK为我们提供了哪些支持
线程池的使用
线程池使用的小例子
为什么需要线程池
线程的创建和销毁也需要一定的代价(虽然相对进程的创建和销毁代价会小一点)。而且有时候重复创建线程这个过程是无效的(没有意义的)。如果每一次任务的提交,都要创建线程;每次任务的完成,都要销毁线程;对于业务来说,这个线程的创建和销毁是和业务没有关系的。我只关心线程执行的任务。因此我们可能希望尽可能多的把cpu用在任务的执行上,而不是用在辅助性质的线程的创建和销毁上。
然后有了线程池这个东西。线程复用。
如果有100个任务需要执行。如果每次都创建线程,线程需要前前后后创建和销毁100次。也可以使用线程池,线程池里放10个线程工作,这十个线程是不退出的。如果有任务的时候,它们会执行任务;如果没有任务,就会阻塞。
JDK内置线程池

Executor可以执行Runnable中的方法,线程池较为顶层的接口
ExecutorService提供了额外的方法,submit可以执行Callable中的方法
ThreadPoolExcutor是线程池的重要实现,执行者。
Executors是工厂类,有一些工厂方法创建线程池
线程池的种类
newFixedThreadPool,固定大小的线程池
newSingleThreadPool,只有一个线程的线程池
newCachedThreadPool,高峰期扩张线程池大小,低负载时缩减线程的线程池
newScheduledThreadPool,带计划任务功能的线程池
都是调用new ThreadPoolExecutor实现的。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters, the
* {@linkplain Executors#defaultThreadFactory default thread factory}
* and the {@linkplain ThreadPoolExecutor.AbortPolicy
* default rejected execution handler}.
*
* <p>It may be more convenient to use one of the {@link Executors}
* factory methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
看一下Executors.newCachedThreadPool的代码
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available. These pools will typically improve the performance
* of programs that execute many short-lived asynchronous tasks.
* Calls to {@code execute} will reuse previously constructed
* threads if available. If no existing thread is available, a new
* thread will be created and added to the pool. Threads that have
* not been used for sixty seconds are terminated and removed from
* the cache. Thus, a pool that remains idle for long enough will
* not consume any resources. Note that pools with similar
* properties but different details (for example, timeout parameters)
* may be created using {@link ThreadPoolExecutor} constructors.
*
* @return the newly created thread pool
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
先说一下BlockingQueue,这是一种队列。当队列长度小于某个最小值时,线程再从队列取数据时,就会被阻塞;当队列长度大于某个最大值时,线程再向队列放数据时,也会被阻塞。(当队列不满或者不过短时,也会通知哪些等待的线程 )
这里用到了SynchronousQueue, 这是一个最大允许队列长度为0的BlockingQueue的实现。若某一个线程要往队列放元素时,此时没有线程正要或者正在等待从队列中取元素,就阻塞这个线程;反之亦然(若某一个线程要从队列中取元素时,此时没有线程正要或者正在等待从队列中取元素,就阻塞这个线程)。比较适合两个线程间hand-off数据。
当有新任务提交时,进入SynchronousQueue,如果没有取任务的线程等待(空闲线程),就会创建新线程。如果有空闲线程阻塞在这里(还没有超时消亡),就会直接使用空闲线程。
线程池使用的例子
简单使用的例子:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
new MyTask();
MyTask task = ExecutorService executor = Executors.newFixedThreadPool(10);
for(int i = 0; i < 1000; i++) {
submit(task);
executor.// executor.execute();
}
shutdown();
executor.
}
static class MyTask implements Runnable {
@Override
public void run() {
System.out.println("当前时间:" + System.currentTimeMillis() + "\t当前线程:"+ Thread.currentThread().getName());
try {
Thread.sleep(1000);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
}
} }
计划任务线程池的使用
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
new MyTask();
MyTask task = //每隔2s执行一次,但是每次执行都会花费1s。所以实际上这里运行起来是3秒一次。
scheduleWithFixedDelay(task, 0, 2, TimeUnit.SECONDS);
scheduledExecutorService.
}
static class MyTask implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
//每次执行都会花费1s
try {
Thread.sleep(1000);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
}
} }
ExecutorService.invokeAny()和ExecutorService.invokeAll()的使用剖析
扩展和增强线程池
重载方法或者回调方法
线程池ThreadPoolExecutor提供了一些可以重载的方法和回调接口,可用于增强和扩展线程池功能。
重载线程池中某些方法的例子:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
new MyTask();
MyTask task = //匿名子类实现,覆写了三个方法
ExecutorService exec = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("beforeExecute: 当前线程是:" + Thread.currentThread().getName() + "\t准备用来执行任务的线程是:" + t.getName() + "\t将要执行的任务是:" + r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("afterExecute: 当前线程是:" + Thread.currentThread().getName() + "\t执行完的任务是:" + r.toString() + "\t可能有异常抛出:" + (t == null ? "" : t.toString()));
}
@Override
protected void terminated() {
System.out.println("terminated:线程池终结");
}
};for(int i = 0; i < 10; i++ )
submit(task);
exec.
shutdown();
exec.
}
static class MyTask implements Runnable {
@Override
public void run() {
System.out.println("当前线程是:" + Thread.currentThread().getName() + "\t当前时间是:" + System.currentTimeMillis() + "\t正在执行的任务是:" + this.toString());
}
} }
拒绝策略
高负载时,队列可能过大,内存激增,可能OOM(out of memory)。
当负载过大的时候,可能需要丢弃一些任务,而不是放在内存当中。丢弃任务也不应当是直接丢弃,可能需要记下来,丢了几个,丢了什么任务。
拒绝策略RejectedExecutionHandler. 有以下几种拒绝策略:
AbortPolicy, 抛出异常
DiscardPolicy,什么都不做,直接丢掉,不报错,不提示
CallerRunsPolicy,提交的任务,被拒绝时,交由提交任务的线程来做
DiscardOldestPolicy,也是丢弃任务,但是丢的是,最老的没有处理的请求
拒绝策略使用例子:
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
new MyTask();
MyTask task =
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS,
// new SynchronousQueue<>(),//同步队列,队列长度为0。如果线程都繁忙,没有工作线程从队列中取任务时; (非阻塞方式提交)新提交的任务会失败,触发拒绝策略
new LinkedBlockingQueue<>(20),
Executors.defaultThreadFactory(),
new MyDiscardExecutionHandler());
for(int i = 0; i < 1000; i++) {
submit(task);
exec.
}
shutdown();
exec.
}
static class MyTask implements Runnable {
@Override
public void run() {
System.out.println("任务正在执行。执行线程:" + Thread.currentThread().getName());
//假设任务执行消耗100ms
try {
Thread.sleep(100);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
}
}
static class MyDiscardExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Runnable : " + r + " is discard.");
}
} }
线程工厂
ThreadFactory
相关源码
public interface ThreadFactory {
Thread newThread(Runnable r);
}
public class Executors {
//...
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
//...
//这是Executors类的内部类
private static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
@SuppressWarnings("removal")
SecurityManager s = System.getSecurityManager();
null) ? s.getThreadGroup() :
group = (s != Thread.currentThread().getThreadGroup();
"pool-" +
namePrefix = getAndIncrement() +
poolNumber."-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
getAndIncrement(),
namePrefix + threadNumber.0);
if (t.isDaemon())
setDaemon(false);
t.if (t.getPriority() != Thread.NORM_PRIORITY)
setPriority(Thread.NORM_PRIORITY);
t.return t;
}
}//...
}
线程池及其核心源码

Executor接口
This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc.
package java.util.concurrent;
/**
* An object that executes submitted {@link Runnable} tasks. This
* interface provides a way of decoupling task submission from the
* mechanics of how each task will be run, including details of thread
* use, scheduling, etc. An {@code Executor} is normally used
* instead of explicitly creating threads. For example, rather than
* invoking {@code new Thread(new RunnableTask()).start()} for each
* of a set of tasks, you might use:
*
* <pre> {@code
* Executor executor = anExecutor();
* executor.execute(new RunnableTask1());
* executor.execute(new RunnableTask2());
* ...}</pre>
*
* However, the {@code Executor} interface does not strictly require
* that execution be asynchronous.
* ...
*/
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
ExecutorService接口
/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation. Oracle designates this
* particular file as subject to the "Classpath" exception as provided
* by Oracle in the LICENSE file that accompanied this code.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
/*
* This file is available under and governed by the GNU General Public
* License version 2 only, as published by the Free Software Foundation.
* However, the following notice accompanied the original version of this
* file:
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/
package java.util.concurrent;
import java.util.Collection;
import java.util.List;
/**
* An {@link Executor} that provides methods to manage termination and
* methods that can produce a {@link Future} for tracking progress of
* one or more asynchronous tasks.
*
* <p>An {@code ExecutorService} can be shut down, which will cause
* it to reject new tasks. Two different methods are provided for
* shutting down an {@code ExecutorService}. The {@link #shutdown}
* method will allow previously submitted tasks to execute before
* terminating, while the {@link #shutdownNow} method prevents waiting
* tasks from starting and attempts to stop currently executing tasks.
* Upon termination, an executor has no tasks actively executing, no
* tasks awaiting execution, and no new tasks can be submitted. An
* unused {@code ExecutorService} should be shut down to allow
* reclamation of its resources.
*
* <p>Method {@code submit} extends base method {@link
* Executor#execute(Runnable)} by creating and returning a {@link Future}
* that can be used to cancel execution and/or wait for completion.
* Methods {@code invokeAny} and {@code invokeAll} perform the most
* commonly useful forms of bulk execution, executing a collection of
* tasks and then waiting for at least one, or all, to
* complete. (Class {@link ExecutorCompletionService} can be used to
* write customized variants of these methods.)
*
* <p>The {@link Executors} class provides factory methods for the
* executor services provided in this package.
*
* <h2>Usage Examples</h2>
*
* Here is a sketch of a network service in which threads in a thread
* pool service incoming requests. It uses the preconfigured {@link
* Executors#newFixedThreadPool} factory method:
*
* <pre> {@code
* class NetworkService implements Runnable {
* private final ServerSocket serverSocket;
* private final ExecutorService pool;
*
* public NetworkService(int port, int poolSize)
* throws IOException {
* serverSocket = new ServerSocket(port);
* pool = Executors.newFixedThreadPool(poolSize);
* }
*
* public void run() { // run the service
* try {
* for (;;) {
* pool.execute(new Handler(serverSocket.accept()));
* }
* } catch (IOException ex) {
* pool.shutdown();
* }
* }
* }
*
* class Handler implements Runnable {
* private final Socket socket;
* Handler(Socket socket) { this.socket = socket; }
* public void run() {
* // read and service request on socket
* }
* }}</pre>
*
* The following method shuts down an {@code ExecutorService} in two phases,
* first by calling {@code shutdown} to reject incoming tasks, and then
* calling {@code shutdownNow}, if necessary, to cancel any lingering tasks:
*
* <pre> {@code
* void shutdownAndAwaitTermination(ExecutorService pool) {
* pool.shutdown(); // Disable new tasks from being submitted
* try {
* // Wait a while for existing tasks to terminate
* if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
* pool.shutdownNow(); // Cancel currently executing tasks
* // Wait a while for tasks to respond to being cancelled
* if (!pool.awaitTermination(60, TimeUnit.SECONDS))
* System.err.println("Pool did not terminate");
* }
* } catch (InterruptedException ex) {
* // (Re-)Cancel if current thread also interrupted
* pool.shutdownNow();
* // Preserve interrupt status
* Thread.currentThread().interrupt();
* }
* }}</pre>
*
* <p>Memory consistency effects: Actions in a thread prior to the
* submission of a {@code Runnable} or {@code Callable} task to an
* {@code ExecutorService}
* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
* any actions taken by that task, which in turn <i>happen-before</i> the
* result is retrieved via {@code Future.get()}.
*
* @since 1.5
* @author Doug Lea
*/
public interface ExecutorService extends Executor {
/**
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.
*
* <p>This method does not wait for previously submitted tasks to
* complete execution. Use {@link #awaitTermination awaitTermination}
* to do that.
*
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
void shutdown();
/**
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.
*
* <p>This method does not wait for actively executing tasks to
* terminate. Use {@link #awaitTermination awaitTermination} to
* do that.
*
* <p>There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.
*
* @return list of tasks that never commenced execution
* @throws SecurityException if a security manager exists and
* shutting down this ExecutorService may manipulate
* threads that the caller is not permitted to modify
* because it does not hold {@link
* java.lang.RuntimePermission}{@code ("modifyThread")},
* or the security manager's {@code checkAccess} method
* denies access.
*/
List<Runnable> shutdownNow();
/**
* Returns {@code true} if this executor has been shut down.
*
* @return {@code true} if this executor has been shut down
*/
boolean isShutdown();
/**
* Returns {@code true} if all tasks have completed following shut down.
* Note that {@code isTerminated} is never {@code true} unless
* either {@code shutdown} or {@code shutdownNow} was called first.
*
* @return {@code true} if all tasks have completed following shut down
*/
boolean isTerminated();
/**
* Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return {@code true} if this executor terminated and
* {@code false} if the timeout elapsed before termination
* @throws InterruptedException if interrupted while waiting
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
*
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<T> submit(Callable<T> task);
<T>
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return the given result upon successful completion.
*
* @param task the task to submit
* @param result the result to return
* @param <T> the type of the result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<T> submit(Runnable task, T result);
<T>
/**
* Submits a Runnable task for execution and returns a Future
* representing that task. The Future's {@code get} method will
* return {@code null} upon <em>successful</em> completion.
*
* @param task the task to submit
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
Future<?> submit(Runnable task);
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results when all complete.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list, each of which has completed
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks or any of its elements are {@code null}
* @throws RejectedExecutionException if any task cannot be
* scheduled for execution
*/
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
<T> throws InterruptedException;
/**
* Executes the given tasks, returning a list of Futures holding
* their status and results
* when all complete or the timeout expires, whichever happens first.
* {@link Future#isDone} is {@code true} for each
* element of the returned list.
* Upon return, tasks that have not completed are cancelled.
* Note that a <em>completed</em> task could have
* terminated either normally or by throwing an exception.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return a list of Futures representing the tasks, in the same
* sequential order as produced by the iterator for the
* given task list. If the operation did not time out,
* each task will have completed. If it did time out, some
* of these tasks will not have completed.
* @throws InterruptedException if interrupted while waiting, in
* which case unfinished tasks are cancelled
* @throws NullPointerException if tasks, any of its elements, or
* unit are {@code null}
* @throws RejectedExecutionException if any task cannot be scheduled
* for execution
*/
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
<T> long timeout, TimeUnit unit)
throws InterruptedException;
/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do. Upon normal or exceptional return,
* tasks that have not completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param <T> the type of the values returned from the tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks or any element task
* subject to execution is {@code null}
* @throws IllegalArgumentException if tasks is empty
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
* for execution
*/
invokeAny(Collection<? extends Callable<T>> tasks)
<T> T throws InterruptedException, ExecutionException;
/**
* Executes the given tasks, returning the result
* of one that has completed successfully (i.e., without throwing
* an exception), if any do before the given timeout elapses.
* Upon normal or exceptional return, tasks that have not
* completed are cancelled.
* The results of this method are undefined if the given
* collection is modified while this operation is in progress.
*
* @param tasks the collection of tasks
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param <T> the type of the values returned from the tasks
* @return the result returned by one of the tasks
* @throws InterruptedException if interrupted while waiting
* @throws NullPointerException if tasks, or unit, or any element
* task subject to execution is {@code null}
* @throws TimeoutException if the given timeout elapses before
* any task successfully completes
* @throws ExecutionException if no task successfully completes
* @throws RejectedExecutionException if tasks cannot be scheduled
* for execution
*/
invokeAny(Collection<? extends Callable<T>> tasks,
<T> T long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ThreadPoolExecutor类
略,以及相应的Abstract类
ForkJoin
主要关注两个类: RecursiveAction,无返回值 RecursiveTask,有返回值

工作窃取:ForkJoinPool有意思的设计是,线程取任务的时候,先从任选一个其他线程任务队列,取任务执行(帮其他队列执行任务),再执行一个自己工作队列中的任务。 这种设计有利于减少线程饥饿。

将多个字段位打包到一个基础类型中,可以进行执行多字段位的同时更新。 因为OS级别只存在基本类型的CAS,不支持多个变量的CAS同时进行。
使用demo
比如要实现1+2++…+1000
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class Main2 {
static class MyTask extends RecursiveTask<Long> {
final int THREADHOLD = 100;
long start;
long end;
public MyTask(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long count = 0;
if (end - start < THREADHOLD) {
for (long i = start; i <= end; i++) {
count += i;//模拟其他工作,每次用时10ms
try {
Thread.sleep(10);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
}return count;
}
//分成多个小任务
// long step = (end - start) / THREADHOLD;
long step = THREADHOLD;
List<MyTask> subTasks = new ArrayList<>();
for (long i = start; i <= end; i+=step) {
long s = i;
long e = Math.min(i + step - 1, end);
new MyTask(s, e);
MyTask t = add(t);
subTasks.fork();
t.
}for (MyTask t : subTasks)
join();
count += t.return count;
}
}
public static void main(String[] args) {
long sum = 0;
long startTime = System.currentTimeMillis();
new ForkJoinPool();
ForkJoinPool forkJoinPool = new MyTask(1, 1000);
MyTask task = try {
submit(task).get();
sum = forkJoinPool.catch (InterruptedException e) {
} throw new RuntimeException(e);
catch (ExecutionException e) {
} throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
System.out.println(sum);
long duration = endTime - startTime;
System.out.println("用时" + duration + "ms");
} }
发表回复