Contents
比较 ConcurrentLinkedQueue 和 BlockingQueue的性能,并说明为什么。给出你的测试代码,和运行结果的截图。
ConcurrentLinkedQueue是基于CAS用无锁算法实现的。 LinkedBlockingQueu是基于ReentrantLock锁和Condition同步实现的。 一般情况,应该是ConcurrentLinkedQueue性能好一点的。
分析CopyOnWriteArrayList的核心代码,说明CopyOnWriteArrayList如何提高并行度。
每次变更操作(增删改),都是复制新的arraylist。
遍历和查询使用当前的snapshot(没有变更操作,多线程并行读不涉及线程安全,不会加锁)。
适用于遍历或查找多的情景。
线程池会吃掉堆栈,能不能想办法,在异常的时候 打印出 提交任务的线程的堆栈?给出你的实现代码和截图。
ExecutorService.submit().get()等待结果,如果报错,包含了主线程和线程池内执行此任务的堆栈。但是主线程的堆栈错误信息是获取结果位置的堆栈,不是提交任务时的堆栈。
覆写ThreadPoolExecutor类的protected void afterExecute(Runnable r, Throwable t) 方法,log或printStackTrace, 可以打印执行线程的堆栈,但是这样不会打印提交任务线程的堆栈。
进一步的想法是覆写 ThreadPoolExecutor,对 submit()和execute()做一层Callable/Runnable包裹。主线程提交任务时, new Exception()保存下来, 内层有异常时,catch时 同时抛出 之前保存的Exception和刚刚抛出的异常。
参考:https://www.cnblogs.com/satire/articles/14479707.html 参考:https://blog.csdn.net/qq_38658642/article/details/93890451 参考:https://blog.csdn.net/weixin_45987961/article/details/122783745
import java.util.concurrent.*;
public class Main2 {
public static void main(String[] args) {
// ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
ExecutorService executorService = new MyThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 5; i++) {
new DivideTask(100, i);
DivideTask task = submit(task);
executorService.
}shutdown();
executorService.
}
static class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public <T> Future<T> submit(Callable<T> task){
return super.submit(new WrapCallable<>(task));
}
static class WrapCallable<K> implements Callable<K> {
Callable<K> callable;
Exception submitException;
public WrapCallable(Callable<K> callable) {
this.submitException = new Exception();
this.callable = callable;
}@Override
public K call() {
null;
K result = try {
call();
result = callable.catch (Exception runnerException) {
} initCause(submitException);
runnerException.printStackTrace();
runnerException.
}return result;
}
}
}
static class DivideTask implements Callable<Integer> {
Integer dividend;
Integer divisor;
Integer quotient;
public DivideTask(Integer dividend, Integer divisor) {
this.dividend = dividend;
this.divisor = divisor;
}
@Override
public Integer call() {
quotient = dividend / divisor;System.out.println(quotient);
return quotient;
}
} }
请你阅读以下JDK 8的ForkJoinPool的代码,为它的scan()方法写一下详细的注释,说明它的基本原理。
源码在此(jdk17,好像和jdk8有点不一样),以后再说。mark#待学习
/**
* Scans for and if found executes top-level tasks: Tries to poll
* each queue starting at a random index with random stride,
* returning source id or retry indicator if contended or
* inconsistent.
*
* @param w caller's WorkQueue 调用者线程的任务队列
* @param prevSrc the previous queue stolen from in current phase, or 0
* @param r random seed 随机种子,用于生成随机数
* @return id of queue if taken, negative if none found, prevSrc for retry
*/
private int scan(WorkQueue w, int prevSrc, int r) {
//所有线程的任务队列
WorkQueue[] qs = queues;
int n = (w == null || qs == null) ? 0 : qs.length;
for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) {
int j, cap, b; WorkQueue q; ForkJoinTask<?>[] a;
if ((q = qs[j = r & (n - 1)]) != null && // poll at qs[j].array[k]
(a = q.array) != null && (cap = a.length) > 0) {
int k = (cap - 1) & (b = q.base), nextBase = b + 1;
int nextIndex = (cap - 1) & nextBase, src = j | SRC;
ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
if (q.base != b) // inconsistent
return prevSrc;
else if (t != null && WorkQueue.casSlotToNull(a, k, t)) {
q.base = nextBase;
ForkJoinTask<?> next = a[nextIndex];
if ((w.source = src) != prevSrc && next != null)
signalWork(); // propagate
w.topLevelExec(t, q);
return src;
}
else if (a[nextIndex] != null) // revisit
return prevSrc;
}
}
return (queues != qs) ? prevSrc: -1; // possibly resized
}
发表回复