线程通信
时间:2021-03-03 12:10:36
收藏:0
阅读:0
线程通信
- 等待:
public final void wait();
public final void wait(long timeout);
//必须在对obj加锁的同步代码块中,在一个线程中,调用obj.wait()时,此线程会释放其拥有的所有锁标记,同时此线程在无限期等待的状态中,释放锁,进入等待队列。
- 通知:
public final void notify();
public final void notifyAll();
//必须在对obj加锁的同步代码块中,从obj的Waiting中释放一个或全部线程。对自身没有任何影响。
例如:
public class getClassTest {
//全局锁对象
static final Object lock = new Object();
public static void main(String[] args) {
//创建线程1
new Thread(() -> {
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "执行");
}
}, "t1").start();
//创建线程2
new Thread(() -> {
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + "执行");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.notify();
}
}, "t2").start();
}
}
//运行结果:
//> t2执行
// 两秒后...
//> t1执行
sleep(long n)和wait(long n)的区别
- sleep是Thread方法,而wait是Object的方法
- sleep不需要强制和synchronized配合使用,但wait需要和synchronized一起用
- sleep在会面的同时,不会释放锁对象,但wait在等待的时候会释放锁对象
- 他们的状态都是TIMED_WAITING
wait/notify原理

- Owner线程发现条件不满足,调用wait方法,即可进入WaitSet变为WAITING状态
- BLOCKED和WAITING的线程都处于阻塞状态,不占用CPU时间片
- BLOCKED线程会在Owner线程释放锁时唤醒
- WAITING线程会在Owner线程条用notify或notifyAll时唤醒,但唤醒后并不意味着立刻获得锁,仍需进入EntryList重新竞争
wait/notify正确使用姿势
//线程1
synchronized(lock){
while(条件不成立){
lock.wait();
}
// 执行接下来的操作
}
//另一个线程
synchronized(lock){
//用notifyAll()来唤醒,防止虚假唤醒
//当一个条件满足时,很多线程都被唤醒了,但是只有其中部分是有用的唤醒,其它的唤醒都是无用功
//1.比如说买货,如果商品本来没有货物,突然进了一件商品,这是所有的线程都被唤醒了,但是只能一个人买,所以其他人都是假唤醒,获取不到对象的锁
lock.notifyAll();
}
同步模式之保护性暂停

用在一个线程等待另一个线程的执行结果
要点:
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列
- JDK中,join的实现,Future的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
public static void main(String[] args) {
final guardedObject lock = new guardedObject();
new Thread(() -> {
String o = (String) lock.get();
System.out.println(o);
}, "t1").start();
new Thread(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.set("这是一个小秘密");
}, "t2").start();
}
}
class guardedObject {
//结果
private Object response;
//获取结果
public Object get() {
synchronized (this) {
//没有结果则等待
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
//设置一个结果,并唤醒线程
public void set(Object o) {
synchronized (this) {
this.response = o;
this.notifyAll();
}
}
}
生产者消费者问题
- 消费队列可以用来平衡生产和消费的线程资源
- 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
- 消息队列容量有限,满时不会再加入数据,空时不再消耗数据
- JDK中各种阻塞队列,采用的就是这种模式
举个栗子:
- 首先先定义我们要put和get的Message类
//设置成final的,不可被子类继承,防止子类继承篡改方法
//不提供set()方法,防止属性被修改
final class Message {
private int id;
//具体结果
private Object value;
@Override
public String toString() {
return "Message{" +
"id=" + id +
", value=" + value +
‘}‘;
}
public Message(int id, Object value) {
this.id = id;
this.value = value;
}
public int getId() {
return id;
}
public Object getValue() {
return value;
}
}
- 接下来实现我们的消息队列类
class MessageQueue {
//消息队列集合
final ConcurrentLinkedDeque<Message> queue = new ConcurrentLinkedDeque();
//消息队列最大容量
private int capacity;
//初始化方法
public MessageQueue(int capacity) {
this.capacity = capacity;
}
//存入消息
public void put(Message message) {
synchronized (queue) {
//如果说队列满了,则不能再放入消息
while (queue.size() == capacity) {
try {
System.out.println("队列满了...");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf(Thread.currentThread().getName());
System.out.println("->放入消息" + message);
queue.offer(message);
//唤醒正在等待消息的线程
queue.notifyAll();
}
}
//获取消息
public Message get() {
synchronized (queue) {
//如果说队列空了,就不能取消息了,调用wait()方法等待
while (queue.isEmpty()) {
try {
System.out.println("没有消息可以获取...");
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = queue.poll();
System.out.printf(Thread.currentThread().getName());
System.out.println("->取出消息" + message);
//唤醒需要存入消息的线程
queue.notifyAll();
return message;
}
}
}
- 进行一下测试:
public class producerAndConsumer {
public static void main(String[] args) {
MessageQueue queue = new MessageQueue(2);
//创建3个生产者
for (int i = 0; i < 3; i++) {
int id = i;
new Thread(() -> {
queue.put(new Message(id, "值:" + id));
}, "生产者" + i).start();
}
//和一个不停get的消费者
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("一秒后..");
queue.get();
}
}, "消费者").start();
}
}
- 结果:
> 生产者2->放入消息Message{id=2, value=值:2}
> 生产者0->放入消息Message{id=0, value=值:0}
> 队列满了...
> 一秒后..
> 消费者->取出消息Message{id=2, value=值:2}
> 生产者1->放入消息Message{id=1, value=值:1}
> 一秒后..
> 消费者->取出消息Message{id=0, value=值:0}
> 一秒后..
> 消费者->取出消息Message{id=1, value=值:1}
> 一秒后..
> 没有消息可以获取...
park与unpark的使用
基本使用
它们是LockSupport类中的方法
//暂停当前线程
LockSupport.park();
//恢复某个线程的运行
LockSupport.unpark(暂停线程对象);
举个栗子:
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println("start...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("park....");
LockSupport.park();
System.out.println("resume...");
}, "t1");
t1.start();
new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("unpark...");
LockSupport.unpark(t1);
}, "t2").start();
}
结果
> start...
> unpark...
> park....
> resume...
特点:
与Object的wait¬ify相比
- wait¬ify必须配合Object Monitor一起使用 而park,unpark不必
- park&unpark是以线程为单位来阻塞和唤醒线程,而notify只能随机唤醒一个等待线程,notifyAll是唤醒所有等待线程,就不那么精确
- park&unpark可以先unpark,而wait¬ify不能先notify
park使用原理
- 当前线程调用
Unsafe.park()
方法 - 检查
_counter
,本情况为0,这时,获得_mutex
互斥锁 - 线程进入
_cond
条件变量阻塞 - 设置
_counter=0
unpark使用原理
- 调用
Unsafe.unpark(Thread-0)
方法,设置_counter
为1 - 唤醒
_cond
条件变量中的Thread_0
Thread_0
恢复运行- 设置
_counter
为0
活跃性
死锁:
- 当第一个线程拥有A对象锁标记,并等待B对象锁标记,同时第二个线程拥有B对象锁标记,并等待A对象锁标记时,产生死锁。
- 一个线程可以同时拥有多个对象的锁标记,当线程阻塞时,不会释放已经拥有的锁标记,由此可能造成死锁。
例如:
t1线程获得 A对象 锁,接下来想获取 B对象 的锁
t2 线程 获得 B对象 锁,接下来想获取 A对象 的锁
public class Test {
private static final Object A = new Object();
private static final Object B = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (A){
System.out.println(Thread.currentThread().getName() + "获得A锁");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (B){
System.out.println(Thread.currentThread().getName() + "获得B锁");
System.out.println("其他操作");
}
}
}, "t1");
t1.start();
new Thread(() -> {
synchronized (B){
System.out.println(Thread.currentThread().getName() + "获得B锁");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (A){
System.out.println(Thread.currentThread().getName() + "获得A锁");
System.out.println("其他操作");
}
}
}, "t2").start();
}
}
活锁
活锁出现在两个先很互相改变对方的结束条件,最后谁也无法结束,例如:
public class getClassTest {
private static final Object A = new Object();
private static volatile int count = 10;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
while(count > 0){
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
count--;
System.out.println(Thread.currentThread().getName() + ":" + count);
}
}, "t1");
t1.start();
Thread t2 = new Thread(() -> {
while(count < 20){
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
System.out.println(Thread.currentThread().getName() + ":" + count);
}
}, "t2");
t2.start();
}
}
解决方法就是将 t1 或者 t2 的sleep时间设置成随机数,使两个线程交错开来
ReentrantLock
相较于synchronized 它具备如下特点
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量
与synchronized一样,都支持可重入
基本语法:
//获取锁
reentrantLock.lock();
try {
//临界区
} finally {
//释放锁
reentrantLock.unlock();
}
可重入
可重入是指同一个线程如果首次获得了这把锁,那么因为他是这把锁的拥有者,因此有权利再次获取这把锁
如果是不可重入锁,那么第二次获得锁时,自己也会被锁住
public class ThreadTest2 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try{
System.out.println("进入 main 方法");
m1();
}finally {
lock.unlock();
}
}
public static void m1(){
lock.lock();
try{
System.out.println("进入 m1 方法");
m2();
}finally {
lock.unlock();
}
}
public static void m2(){
lock.lock();
try{
System.out.println("进入 m2 方法");
}finally {
lock.unlock();
}
}
}
结果
> 进入 main 方法
> 进入 m1 方法
> 进入 m2 方法
可打断
public class ThreadTest2 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "尝试获得锁");
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + "获得锁失败");
return;
}
//测试不使用可打断锁
// System.out.println(Thread.currentThread().getName() + "尝试获得锁");
// lock.lock();
try{
System.out.println(Thread.currentThread().getName() + "获得到锁");
}finally {
lock.unlock();
}
},"t1");
//主线程获得锁
lock.lock();
System.out.println(Thread.currentThread().getName() + "获得到锁");
t1.start();
Thread.sleep(1000);
t1.interrupt();
}
}
锁超时
立刻失败
public class ThreadTest2 {
private static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
//测试不使用可打断锁
System.out.println(Thread.currentThread().getName() + "尝试获得锁");
try {
//带参数的tryLock()方法,等待指定时间后停止等待
if (!lock.tryLock(2, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + "获得锁失败");
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println(Thread.currentThread().getName() + "获得锁失败");
return;
}
//不带参数的tryLock()方法,立刻停止等待
// if (!lock.tryLock()) {
// System.out.println(Thread.currentThread().getName() + "获得锁失败");
// return;
// }
try {
System.out.println(Thread.currentThread().getName() + "获得到锁");
} finally {
lock.unlock();
}
}, "t1");
//主线程获得锁
lock.lock();
System.out.println(Thread.currentThread().getName() + "获得到锁");
t1.start();
try {
Thread.sleep(1000);
} finally {
lock.unlock();
}
}
}
条件变量
synchronized中也有条件变量,就是之前原理时的那个waitSet休息室,当条件不满足时进入waitSet等待
ReentrantLock的条件变量比synchronized强大之处在于,它是支持多个条件变量的,这就好比
- synchronized是那些不满足条件的线程都在一间休息室等消息
- 而ReentrantLock支持多间休息时
使用流程
- await前需要获得锁
- await执行后,会释放锁,进入conditionObject等待
- await的线程被唤醒(或打断或超时)去重新竞争lock锁
- 竞争lock锁成功后,从await后继续执行
顺序控制
如何控制t1线程保证在t2线程之后运行?
- 方法一:wait()¬ify()
public class ThreadTest2 {
private static final Object obj = new Object();
//表示t2是否运行过
static boolean state = false;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
synchronized (obj){
//如果说t2没有运行过,就等待
while (!state) {
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "执行");
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (obj){
System.out.println(Thread.currentThread().getName() + "执行");
state = true;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
obj.notify();
}
}, "t2");
t1.start();
t2.start();
}
}
- 方法二: LockSupport.park()&LockSupport.unpark(t1);
public class ThreadTest2 {
private static final Object obj = new Object();
//表示t2是否运行过
static boolean state = false;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
//如果说t2没有运行过,就等待
while (!state) {
LockSupport.park();
}
System.out.println(Thread.currentThread().getName() + "执行");
}, "t1");
Thread t2 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "执行");
state = true;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LockSupport.unpark(t1);
}, "t2");
t1.start();
t2.start();
}
}
交替输出
线程1输出 A 5次,线程2输出 B 5次,线程3输出 C 5次,要求输出 ABC ABC ABC ABC ABC
- 方法一:wait()¬ify()
public class ThreadTest2 {
//表示三个线程的状态,1 代表t1可运行 2 代表t2可运行 3 代表t3可运行
static int state = 1;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (obj){
//交替打印5次的操作
for (int i = 0 ; i < 5 ; i ++) {
//因为有3个变量,所以用一个int变量来表示三种状态
while(state != 1){
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("A");
state = 2;
obj.notifyAll();
}
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (obj){
for (int i = 0 ; i < 5 ; i ++) {
while(state != 2){
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("B");
state = 3;
obj.notifyAll();
}
}
}, "t2");
Thread t3 = new Thread(() -> {
synchronized (obj){
for (int i = 0 ; i < 5 ; i ++) {
while(state != 3){
try {
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.printf("C\t");
state = 1;
obj.notifyAll();
}
}
}, "t3");
t1.start();
t2.start();
t3.start();
}
}
- 方法二:LockSupport.park()&LockSupport.unpark(t1);
public class ThreadTest2 {
private static Thread t1,t2,t3;
public static void main(String[] args) {
parkAndUnpark parkAndUnpark = new parkAndUnpark();
t1 = new Thread(() -> {
parkAndUnpark.Print("A",t2);
}, "t1");
t2 = new Thread(() -> {
parkAndUnpark.Print("B",t3);
}, "t2");
t3 = new Thread(() -> {
parkAndUnpark.Print("C\t",t1);
}, "t3");
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1);
}
}
class parkAndUnpark{
private final int LoopNumber = 5;
public void Print(String s,Thread next){
for (int i = 0; i < LoopNumber; i++) {
LockSupport.park();
System.out.printf(s);
LockSupport.unpark(next);
}
}
}
评论(0)