07并行设计模式

并行设计模式

多线程设计模式

什么设计模式

单例模式

不变模式

Future模式

生产者消费者模式

什么是设计模式

对软件设计过程中普遍存在(反复出现)的各种问题,提出的解决方案。

有Eric Gamma 从建筑学引入。

Gof 这四个人《设计模式:可复用面向对象软件的基础》 收录了23种设计模式,后面还有其他人总结一些其他的设计模式。

从广义上来讲,模式有以下三类:

  • 架构模式
    • MVC
    • 分层
  • 设计模式
    • 提炼系统种的组件
  • 代码模式(成例Idiom)
    • 低层次,与编码直接相关
    • 如DCL

单讲设计模式,一般认为可以分为 结构模式和行为模式两类。

单例模式

单例对象的类,必须保证只有一个实例存在。必须保证多个线程同时操作这个单例是安全的。

许多时候整个系统只需要拥有一个全局对象,这有利于我们协调系统整体的行为。

比如全局信息配置

简单的单例模式

public class Singleton {
    private Singleton() {
        System.out.println("Singleton is created");
    }
    //在类的初始化时创建实例,这是线程安全的,有jvm保证。
    private static Singleton instance = new Singleton();
    
    public static Singleton getInstance() {
        return instance;
    }
}

有个问题是, 何时产生实例,不好控制。(Singleton类第一次被访问的时候,第一次getInstance()的时候)

如果类中有比如 public static int STATUS = 1;这样的字段,第一次访问Singleton.STATUS的时候,可能会提前创建实例。

public class Singleton {
    public static int STATUS = 1;
    private Singleton() {
        System.out.println("Singleton is created");
    }
    //在类的初始化时创建实例,这是线程安全的,有jvm保证。
    private static Singleton instance = new Singleton();
    
    public static Singleton getInstance() {
        return instance;
    }
}

然后执行

System.out.println(Singleton.STATUS);

结果是

Singleton is created
1

如果介意这个问题,有种延迟加载的改进

public class LazySingleton {
    private LazySingleton() {
        System.out.println("Singleton is created");
    }
    private static Singleton instance = null;
    
    public static synchronized LazySingleton getInstance() {
        if(instance == null) 
            instance = new LazySingleton();
        return instance;
    }
}

但这里有个问题,高并发的时候,synchronized隐式加锁影响了性能。

有一种更绝的方式,是延迟加载的,而且没有锁的影响

public class Singleton {
    private Singleton() {
        System.out.println("Singleton is created");
    }
    
    //静态内部类
    private static class SigletonHolder {
        private static StaticSingleton instance = new StaticSingleton();
    }
    
    //只有第一次访问getInstance的时候才会创建SigletonHolder.instance
    public static Singleton getInstance() {
        return SigletonHolder.instance;
    }
}

注意这几种方案, 构造函数都是private

不变模式

一个类的内部状态创建后,在整个生命期间都不会发生变化时,这就是不变类。

不变模式不需要同步。(只需要读取,不会修改,不需要同步)

不需要set

final不会允许二次赋值

final class 不允许有子类(里氏替换原则说,子类可以替换父类。若父类不可变,子类中改变了,就不符合需求了)

对象创建的时候赋值

eg

package org.example;

public final class Product {
    private final String no;

    private final String name;

    private final String price;

    public Product(String no, String name, String price) {
        this.no = no;
        this.name = name;
        this.price = price;
    }

    public String getNo() {
        return no;
    }

    public String getName() {
        return name;
    }

    public String getPrice() {
        return price;
    }
}

Java中不变模式的范例: java.lang.String java.lang.Boolean java.lang.Byte java.lang.Character java.lang.Double java.lang.Float java.lang.Integer java.lang.Long java.lang.Short

比如String中类似有修改String内容的操作,都是生成了新的String,来替换旧的对象。 比如subString等

Integer i = 0;
i++; //实际上这是i=i+1;的语法糖,同时做了自动装箱操作

Future模式

核心思想是异步调用

同步调用与异步调用的比较:

image-20230325111811156

Future模式简单实现的一个类图:

image-20230325112556698

这里主要关注Data接口,FutureData,RealData。

Future模式的实现demo:

public interface Data<V> {
    V get();
}


public class FutureData<V> implements Data<V> {
    Data<V> realData;

    boolean isReady = false;

    public synchronized void setRealData(Data<V> realData){
        if(isReady) return;
        this.realData = realData;
        isReady = true;
        notifyAll();
    }

    public synchronized V get() {
        while(!isReady) {
            try {
                wait();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return realData.get();
    }
}

//依赖于实际场景
public class RealData implements Data<String> {
    String answer;
    public RealData() {
        this(null);
    }

    public RealData(String name) {
        //模拟一个需要很长时间才能获取到数据的调用
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        answer = (name == null ? "":name) +" This is the answer.";
    }
    public String get() {

        return answer;
    }
}


//模拟客户端请求
public class Main {
    public static void main(String[] args) {
        System.out.println("Hello world!");

        Client client = new Client();

        long startTime = System.currentTimeMillis();

        //模拟客户端发起一个需要很长时间才能处理好结果的请求(3s)
        Data<String> data = client.request("FirstClient");

        //模拟后面的无关业务(2s)
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        String result = data.get();
        System.out.println(result);

        long endTime = System.currentTimeMillis();
        //实际大概会是3秒出头,而不是2+3=5秒出头
        System.out.println(endTime-startTime);
    }

    static class Client {
        public Data<String> request(String reqName) {
            //模拟客户端请求会先返回一个FutureData
            final FutureData<String> futureData = new FutureData<>();

            //完整的数据会由其他线程稍后返回
            new Thread() {
                @Override
                public void run() {
                    RealData realData = new RealData();
                    futureData.setRealData(realData);
                }
            }.start();
            //模拟客户端请求会先返回一个FutureData
            return futureData;
        }
    }
}

JDK对Future模式的支持:

image-20230326155356144

使用JDK的api实现上面的逻辑:

import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        System.out.println("Hello world!");
        ExecutorService executorService = Executors.newFixedThreadPool(1);

        long startTime = System.currentTimeMillis();
        //一个需要很长时间才能处理好结果的请求(3s)
        //方式一:
//        FutureTask<String> futureTask = new FutureTask<>(new RealData("readTask"));
//        executorService.submit(futureTask);
        //方式二:
        Future<String> futureTask = executorService.submit(new RealData("readTask"));



        //模拟后面的无关业务(2s)
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        String answer = null;
        try {
            answer = futureTask.get();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
        System.out.println(answer);

        long endTime = System.currentTimeMillis();

        long duration = endTime - startTime;
        System.out.println(duration);

        executorService.shutdown();
    }


    static class RealData implements Callable<String> {
        String name;
        String answer;
        public RealData() {
            this(null);
        }

        public RealData(String name) {
            this.name = name;
        }
        @Override
        public String call() {
            //模拟一个需要很长时间才能获取到数据的调用
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            answer = (name == null ? "":name) +" This is the answer.";
            return answer;
        }
    }
}

同时,同步的调用,变成了异步的调用。

生产者消费者模式

生产者消费者模式是经典的线程间共享数据(线程协作)的模式。

但是生产者消费者模式不太符合 松散耦合的原则(系统尽可能保持松散,多个线程之间最好不要知道对方是存在的,知道的东西越少越好,将来改了其他线程其他东西也不会有影响 )

生产者消费者模式中,通常分为两类线程,即,若干个生产者线程和若干个消费者线程。

生产者线程负责提交用户请求(任务),消费者线程负责处理生产者提交的任务。

生产者和消费者之间通过共享内存缓冲区进行通信。

生产者和消费者(线程),彼此之间不要知道对方的存在,往公共缓冲区(一般是BlockingQueue)存、取数据。

image-20230326164505084

一个基本实现

image-20230326164645027

demo(有点问题,terminate时,最后一个在执行中的任务,可能会被直接关掉)

import java.io.PrintStream;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.*;

public class Main {
    public static void main(String[] args) {
        ExecutorService consumers = Executors.newCachedThreadPool();
        ExecutorService producers = Executors.newCachedThreadPool();
        consumers.submit(new Consumer());
        consumers.submit(new Consumer());
        producers.submit(new Producer());
        producers.submit(new Producer());

        Scanner scanner = new Scanner(System.in);
        PrintStream ps = new PrintStream(System.out);
        ps.println("usage:");
        ps.println("start\t开启生产者线程");
        ps.println("stop\t关闭生产者线程");
        ps.println("terminate\t关闭生产者和消费者线程(队列中的已有任务执行完才会关闭),并结束程序");
        while (true) {
            String s = scanner.next();
            if ("start".equals(s)) {
                if(Producer.isRunning == true) continue;

                //volatile类型保证修改实时可见。只有此处赋值,且是boolean类型,自然是原子的。不必再加锁了
                Producer.isRunning = true;
                producers.submit(new Producer());
                producers.submit(new Producer());
            }
            else if ("stop".equals(s)) {
                Producer.isRunning = false;
            }
            else if ("terminate".equals(s)) {
                //停生产者
                Producer.isRunning = false;
                //关生产者
                producers.shutdown();
                //如果队列中还有没完成的任务,先处理完再说
                while (!messageQueue.isEmpty()) {
                    ps.println("任务队列还有未完成的任务");
                    //停1s后再次检查
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                //关消费者
                consumers.shutdown();
                try {
                    if (!consumers.awaitTermination(2, TimeUnit.SECONDS)) {
                        consumers.shutdownNow();
                    }
                } catch (InterruptedException e) {
                    consumers.shutdownNow();
//                    throw new RuntimeException(e);
                }
                //跳出循环
                break;
            }
            else {
                //忽略
            }
        }
    }

    static BlockingDeque<String> messageQueue = new LinkedBlockingDeque<>();

    static class Producer implements Runnable {
        volatile static boolean isRunning = true;

        @Override
        public void run() {
            while (isRunning) {
                //任务生产慢一点
                int taskTime = new Random().nextInt(5) * 1000;
                try {
                    Thread.sleep(taskTime);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                String msg = "message:" + taskTime;
                System.out.println(Thread.currentThread().getName() + ":生产者准备发出:" + msg);
                try {
                    if (!messageQueue.offer(msg, 2, TimeUnit.SECONDS)) {
                        System.out.println("生产者提交任务失败");
                    }
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    static class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    String taskContent = messageQueue.take();
                    System.out.println(Thread.currentThread().getName() + ":消费者接收到:" + taskContent);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注