并行设计模式
多线程设计模式
什么设计模式
单例模式
不变模式
Future模式
生产者消费者模式
什么是设计模式
对软件设计过程中普遍存在(反复出现)的各种问题,提出的解决方案。
有Eric Gamma 从建筑学引入。
Gof 这四个人《设计模式:可复用面向对象软件的基础》 收录了23种设计模式,后面还有其他人总结一些其他的设计模式。
从广义上来讲,模式有以下三类:
- 架构模式
- MVC
- 分层
- 设计模式
- 提炼系统种的组件
- 代码模式(成例Idiom)
- 低层次,与编码直接相关
- 如DCL
单讲设计模式,一般认为可以分为 结构模式和行为模式两类。
单例模式
单例对象的类,必须保证只有一个实例存在。必须保证多个线程同时操作这个单例是安全的。
许多时候整个系统只需要拥有一个全局对象,这有利于我们协调系统整体的行为。
比如全局信息配置
简单的单例模式
public class Singleton {
private Singleton() {
System.out.println("Singleton is created");
}//在类的初始化时创建实例,这是线程安全的,有jvm保证。
private static Singleton instance = new Singleton();
public static Singleton getInstance() {
return instance;
} }
有个问题是, 何时产生实例,不好控制。(Singleton类第一次被访问的时候,第一次getInstance()的时候)
如果类中有比如 public static int STATUS = 1;
这样的字段,第一次访问Singleton.STATUS的时候,可能会提前创建实例。
public class Singleton {
public static int STATUS = 1;
private Singleton() {
System.out.println("Singleton is created");
}//在类的初始化时创建实例,这是线程安全的,有jvm保证。
private static Singleton instance = new Singleton();
public static Singleton getInstance() {
return instance;
} }
然后执行
System.out.println(Singleton.STATUS);
结果是
Singleton is created
1
如果介意这个问题,有种延迟加载的改进
public class LazySingleton {
private LazySingleton() {
System.out.println("Singleton is created");
}private static Singleton instance = null;
public static synchronized LazySingleton getInstance() {
if(instance == null)
new LazySingleton();
instance = return instance;
} }
但这里有个问题,高并发的时候,synchronized隐式加锁影响了性能。
有一种更绝的方式,是延迟加载的,而且没有锁的影响
public class Singleton {
private Singleton() {
System.out.println("Singleton is created");
}
//静态内部类
private static class SigletonHolder {
private static StaticSingleton instance = new StaticSingleton();
}
//只有第一次访问getInstance的时候才会创建SigletonHolder.instance
public static Singleton getInstance() {
return SigletonHolder.instance;
} }
注意这几种方案, 构造函数都是private
不变模式
一个类的内部状态创建后,在整个生命期间都不会发生变化时,这就是不变类。
不变模式不需要同步。(只需要读取,不会修改,不需要同步)
不需要set
final不会允许二次赋值
final class 不允许有子类(里氏替换原则说,子类可以替换父类。若父类不可变,子类中改变了,就不符合需求了)
对象创建的时候赋值
eg
package org.example;
public final class Product {
private final String no;
private final String name;
private final String price;
public Product(String no, String name, String price) {
this.no = no;
this.name = name;
this.price = price;
}
public String getNo() {
return no;
}
public String getName() {
return name;
}
public String getPrice() {
return price;
} }
Java中不变模式的范例: java.lang.String java.lang.Boolean java.lang.Byte java.lang.Character java.lang.Double java.lang.Float java.lang.Integer java.lang.Long java.lang.Short
比如String中类似有修改String内容的操作,都是生成了新的String,来替换旧的对象。 比如subString等
Integer i = 0;
//实际上这是i=i+1;的语法糖,同时做了自动装箱操作 i++;
Future模式
核心思想是异步调用
同步调用与异步调用的比较:

Future模式简单实现的一个类图:

这里主要关注Data接口,FutureData,RealData。
Future模式的实现demo:
public interface Data<V> {
get();
V
}
public class FutureData<V> implements Data<V> {
Data<V> realData;
boolean isReady = false;
public synchronized void setRealData(Data<V> realData){
if(isReady) return;
this.realData = realData;
true;
isReady = notifyAll();
}
public synchronized V get() {
while(!isReady) {
try {
wait();
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
}return realData.get();
}
}
//依赖于实际场景
public class RealData implements Data<String> {
String answer;
public RealData() {
this(null);
}
public RealData(String name) {
//模拟一个需要很长时间才能获取到数据的调用
try {
Thread.sleep(3000);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}null ? "":name) +" This is the answer.";
answer = (name ==
}public String get() {
return answer;
}
}
//模拟客户端请求
public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
new Client();
Client client =
long startTime = System.currentTimeMillis();
//模拟客户端发起一个需要很长时间才能处理好结果的请求(3s)
Data<String> data = client.request("FirstClient");
//模拟后面的无关业务(2s)
try {
Thread.sleep(2000);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
String result = data.get();
System.out.println(result);
long endTime = System.currentTimeMillis();
//实际大概会是3秒出头,而不是2+3=5秒出头
System.out.println(endTime-startTime);
}
static class Client {
public Data<String> request(String reqName) {
//模拟客户端请求会先返回一个FutureData
final FutureData<String> futureData = new FutureData<>();
//完整的数据会由其他线程稍后返回
new Thread() {
@Override
public void run() {
new RealData();
RealData realData = setRealData(realData);
futureData.
}start();
}.//模拟客户端请求会先返回一个FutureData
return futureData;
}
} }
JDK对Future模式的支持:

使用JDK的api实现上面的逻辑:
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
System.out.println("Hello world!");
ExecutorService executorService = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
//一个需要很长时间才能处理好结果的请求(3s)
//方式一:
// FutureTask<String> futureTask = new FutureTask<>(new RealData("readTask"));
// executorService.submit(futureTask);
//方式二:
Future<String> futureTask = executorService.submit(new RealData("readTask"));
//模拟后面的无关业务(2s)
try {
Thread.sleep(2000);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
String answer = null;
try {
get();
answer = futureTask.catch (InterruptedException e) {
} throw new RuntimeException(e);
catch (ExecutionException e) {
} throw new RuntimeException(e);
}System.out.println(answer);
long endTime = System.currentTimeMillis();
long duration = endTime - startTime;
System.out.println(duration);
shutdown();
executorService.
}
static class RealData implements Callable<String> {
String name;
String answer;
public RealData() {
this(null);
}
public RealData(String name) {
this.name = name;
}@Override
public String call() {
//模拟一个需要很长时间才能获取到数据的调用
try {
Thread.sleep(3000);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}null ? "":name) +" This is the answer.";
answer = (name == return answer;
}
} }
同时,同步的调用,变成了异步的调用。
生产者消费者模式
生产者消费者模式是经典的线程间共享数据(线程协作)的模式。
但是生产者消费者模式不太符合 松散耦合的原则(系统尽可能保持松散,多个线程之间最好不要知道对方是存在的,知道的东西越少越好,将来改了其他线程其他东西也不会有影响 )
生产者消费者模式中,通常分为两类线程,即,若干个生产者线程和若干个消费者线程。
生产者线程负责提交用户请求(任务),消费者线程负责处理生产者提交的任务。
生产者和消费者之间通过共享内存缓冲区进行通信。
生产者和消费者(线程),彼此之间不要知道对方的存在,往公共缓冲区(一般是BlockingQueue)存、取数据。

一个基本实现

demo(有点问题,terminate时,最后一个在执行中的任务,可能会被直接关掉)
import java.io.PrintStream;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
ExecutorService consumers = Executors.newCachedThreadPool();
ExecutorService producers = Executors.newCachedThreadPool();
submit(new Consumer());
consumers.submit(new Consumer());
consumers.submit(new Producer());
producers.submit(new Producer());
producers.
Scanner scanner = new Scanner(System.in);
PrintStream ps = new PrintStream(System.out);
println("usage:");
ps.println("start\t开启生产者线程");
ps.println("stop\t关闭生产者线程");
ps.println("terminate\t关闭生产者和消费者线程(队列中的已有任务执行完才会关闭),并结束程序");
ps.while (true) {
String s = scanner.next();
if ("start".equals(s)) {
if(Producer.isRunning == true) continue;
//volatile类型保证修改实时可见。只有此处赋值,且是boolean类型,自然是原子的。不必再加锁了
isRunning = true;
Producer.submit(new Producer());
producers.submit(new Producer());
producers.
}else if ("stop".equals(s)) {
isRunning = false;
Producer.
}else if ("terminate".equals(s)) {
//停生产者
isRunning = false;
Producer.//关生产者
shutdown();
producers.//如果队列中还有没完成的任务,先处理完再说
while (!messageQueue.isEmpty()) {
println("任务队列还有未完成的任务");
ps.//停1s后再次检查
try {
Thread.sleep(1000);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
}//关消费者
shutdown();
consumers.try {
if (!consumers.awaitTermination(2, TimeUnit.SECONDS)) {
shutdownNow();
consumers.
}catch (InterruptedException e) {
} shutdownNow();
consumers.// throw new RuntimeException(e);
}//跳出循环
break;
}else {
//忽略
}
}
}
static BlockingDeque<String> messageQueue = new LinkedBlockingDeque<>();
static class Producer implements Runnable {
volatile static boolean isRunning = true;
@Override
public void run() {
while (isRunning) {
//任务生产慢一点
int taskTime = new Random().nextInt(5) * 1000;
try {
Thread.sleep(taskTime);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}String msg = "message:" + taskTime;
System.out.println(Thread.currentThread().getName() + ":生产者准备发出:" + msg);
try {
if (!messageQueue.offer(msg, 2, TimeUnit.SECONDS)) {
System.out.println("生产者提交任务失败");
}catch (InterruptedException e) {
} throw new RuntimeException(e);
}
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
try {
String taskContent = messageQueue.take();
System.out.println(Thread.currentThread().getName() + ":消费者接收到:" + taskContent);
catch (InterruptedException e) {
} throw new RuntimeException(e);
}
}
}
} }
发表回复