即 Guarded Suspension,用在一个线程等待另一个线程的执行结果
// 增加超时效果
class GuardedObject {
// 结果
private Object response;
// 获取结果
// timeout 表示要等待多久 2000
public Object get(long timeout) {
synchronized (this) {
// 开始时间 15:00:00
long begin = System.currentTimeMillis();
// 经历的时间
long passedTime = 0;
while (response == null) {
// 这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
// 经历的时间超过了最大等待时间时,退出循环
if (timeout - passedTime <= 0) {
break;
}
try {
this.wait(waitTime); // 虚假唤醒 15:00:01
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求得经历时间
passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
}
return response;
}
}
// 产生结果
public void complete(Object response) {
synchronized (this) {
// 给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
}
解耦等待和生产:
package cn.itcast.test;
import cn.itcast.n2.util.Sleeper;
import lombok.extern.slf4j.Slf4j;
import java.util.Hashtable;
import java.util.Map;
import java.util.Set;
@Slf4j(topic = "c.Test20")
public class Test20 {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
Sleeper.sleep(1);
for (Integer id : Mailboxes.getIds()) {
new Postman(id, "内容" + id).start();
}
}
}
@Slf4j(topic = "c.People")
class People extends Thread{
@Override
public void run() {
// 收信
GuardedObject guardedObject = Mailboxes.createGuardedObject();
log.debug("开始收信 id:{}", guardedObject.getId());
Object mail = guardedObject.get(5000);
log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
}
}
@Slf4j(topic = "c.Postman")
class Postman extends Thread {
private int id;
private String mail;
public Postman(int id, String mail) {
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
log.debug("送信 id:{}, 内容:{}", id, mail);
guardedObject.complete(mail);
}
}
class Mailboxes {
private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
private static int id = 1;
// 产生唯一 id
private static synchronized int generateId() {
return id++;
}
public static GuardedObject getGuardedObject(int id) {
return boxes.remove(id);
}
public static GuardedObject createGuardedObject() {
GuardedObject go = new GuardedObject(generateId());
boxes.put(go.getId(), go);
return go;
}
public static Set<Integer> getIds() {
return boxes.keySet();
}
}
// 增加超时效果
class GuardedObject {
// 标识 Guarded Object
private int id;
public GuardedObject(int id) {
this.id = id;
}
public int getId() {
return id;
}
// 结果
private Object response;
// 获取结果
// timeout 表示要等待多久 2000
public Object get(long timeout) {
synchronized (this) {
// 开始时间 15:00:00
long begin = System.currentTimeMillis();
// 经历的时间
long passedTime = 0;
while (response == null) {
// 这一轮循环应该等待的时间
long waitTime = timeout - passedTime;
// 经历的时间超过了最大等待时间时,退出循环
if (timeout - passedTime <= 0) {
break;
}
try {
this.wait(waitTime); // 虚假唤醒 15:00:01
} catch (InterruptedException e) {
e.printStackTrace();
}
// 求得经历时间
passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s
}
return response;
}
}
// 产生结果
public void complete(Object response) {
synchronized (this) {
// 给结果成员变量赋值
this.response = response;
this.notifyAll();
}
}
}
package cn.itcast.test;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import static cn.itcast.n2.util.Sleeper.sleep;
@Slf4j(topic = "c.Test21")
public class Test21 {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id , "值"+id));
}, "生产者" + i).start();
}
new Thread(() -> {
while(true) {
sleep(1);
Message message = queue.take();
}
}, "消费者").start();
}
}
// 消息队列类 , java 线程之间通信
@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
// 消息的队列集合
private LinkedList<Message> list = new LinkedList<>();
// 队列容量
private int capcity;
public MessageQueue(int capcity) {
this.capcity = capcity;
}
// 获取消息
public Message take() {
// 检查队列是否为空
synchronized (list) {
while(list.isEmpty()) {
try {
log.debug("队列为空, 消费者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从队列头部获取消息并返回
Message message = list.removeFirst();
log.debug("已消费消息 {}", message);
list.notifyAll();
return message;
}
}
// 存入消息
public void put(Message message) {
synchronized (list) {
// 检查对象是否已满
while(list.size() == capcity) {
try {
log.debug("队列已满, 生产者线程等待");
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 将消息加入队列尾部
list.addLast(message);
log.debug("已生产消息 {}", message);
list.notifyAll();
}
}
}
final class Message {
private int id;
private Object value;
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
'}';
}
}