05JDK并发包_作业

比较 ConcurrentLinkedQueue 和 BlockingQueue的性能,并说明为什么。给出你的测试代码,和运行结果的截图。

ConcurrentLinkedQueue是基于CAS用无锁算法实现的。 LinkedBlockingQueu是基于ReentrantLock锁和Condition同步实现的。 一般情况,应该是ConcurrentLinkedQueue性能好一点的。

分析CopyOnWriteArrayList的核心代码,说明CopyOnWriteArrayList如何提高并行度。

每次变更操作(增删改),都是复制新的arraylist。

遍历和查询使用当前的snapshot(没有变更操作,多线程并行读不涉及线程安全,不会加锁)。

适用于遍历或查找多的情景。

线程池会吃掉堆栈,能不能想办法,在异常的时候 打印出 提交任务的线程的堆栈?给出你的实现代码和截图。

ExecutorService.submit().get()等待结果,如果报错,包含了主线程和线程池内执行此任务的堆栈。但是主线程的堆栈错误信息是获取结果位置的堆栈,不是提交任务时的堆栈。

覆写ThreadPoolExecutor类的protected void afterExecute(Runnable r, Throwable t) 方法,log或printStackTrace, 可以打印执行线程的堆栈,但是这样不会打印提交任务线程的堆栈。

进一步的想法是覆写 ThreadPoolExecutor,对 submit()和execute()做一层Callable/Runnable包裹。主线程提交任务时, new Exception()保存下来, 内层有异常时,catch时 同时抛出 之前保存的Exception和刚刚抛出的异常。

参考:https://www.cnblogs.com/satire/articles/14479707.html 参考:https://blog.csdn.net/qq_38658642/article/details/93890451 参考:https://blog.csdn.net/weixin_45987961/article/details/122783745


import java.util.concurrent.*;

public class Main2 {
    public static void main(String[] args) {
//        ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        ExecutorService executorService = new MyThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        for (int i = 0; i < 5; i++) {
            DivideTask task = new DivideTask(100, i);
            executorService.submit(task);
        }
        executorService.shutdown();
    }

    static class MyThreadPoolExecutor extends ThreadPoolExecutor {
        public MyThreadPoolExecutor(int corePoolSize,
                                    int maximumPoolSize,
                                    long keepAliveTime,
                                    TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }

        @Override
        public <T> Future<T> submit(Callable<T> task){
            return super.submit(new WrapCallable<>(task));
        }


        static class WrapCallable<K> implements Callable<K> {
            Callable<K> callable;
            Exception submitException;
            public WrapCallable(Callable<K> callable) {
                this.submitException = new Exception();
                this.callable = callable;
            }
            @Override
            public K call() {
                K result = null;
                try {
                   result = callable.call();
                } catch (Exception runnerException) {
                    runnerException.initCause(submitException);
                    runnerException.printStackTrace();
                }
                return result;
            }
        }
    }

    static class DivideTask implements Callable<Integer> {
        Integer dividend;
        Integer divisor;
        Integer quotient;

        public DivideTask(Integer dividend, Integer divisor) {
            this.dividend = dividend;
            this.divisor = divisor;
        }

        @Override
        public Integer call() {
            quotient = dividend / divisor;
            System.out.println(quotient);
            return quotient;
        }
    }
}

请你阅读以下JDK 8的ForkJoinPool的代码,为它的scan()方法写一下详细的注释,说明它的基本原理。

源码在此(jdk17,好像和jdk8有点不一样),以后再说。mark#待学习

/**
 * Scans for and if found executes top-level tasks: Tries to poll
 * each queue starting at a random index with random stride,
 * returning source id or retry indicator if contended or
 * inconsistent.
 *
 * @param w caller's WorkQueue  调用者线程的任务队列
 * @param prevSrc the previous queue stolen from in current phase, or 0
 * @param r random seed  随机种子,用于生成随机数
 * @return id of queue if taken, negative if none found, prevSrc for retry
 */
private int scan(WorkQueue w, int prevSrc, int r) {
    //所有线程的任务队列
    WorkQueue[] qs = queues;
    int n = (w == null || qs == null) ? 0 : qs.length;
    for (int step = (r >>> 16) | 1, i = n; i > 0; --i, r += step) {
        int j, cap, b; WorkQueue q; ForkJoinTask<?>[] a;
        if ((q = qs[j = r & (n - 1)]) != null && // poll at qs[j].array[k]
            (a = q.array) != null && (cap = a.length) > 0) {
            int k = (cap - 1) & (b = q.base), nextBase = b + 1;
            int nextIndex = (cap - 1) & nextBase, src = j | SRC;
            ForkJoinTask<?> t = WorkQueue.getSlot(a, k);
            if (q.base != b)                // inconsistent
                return prevSrc;
            else if (t != null && WorkQueue.casSlotToNull(a, k, t)) {
                q.base = nextBase;
                ForkJoinTask<?> next = a[nextIndex];
                if ((w.source = src) != prevSrc && next != null)
                    signalWork();           // propagate
                w.topLevelExec(t, q);
                return src;
            }
            else if (a[nextIndex] != null)  // revisit
                return prevSrc;
        }
    }
    return (queues != qs) ? prevSrc: -1;    // possibly resized
}

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注