Java多线程的等待通知机制 - 极悦

Java多线程编程

全部教程

×

Java多线程的等待通知机制

什么是等待通知机制

在单线程编程中,要执行的操作需要满足一定的条件才能执行,可以把这个操作放在if语句块中。

在多线程编程中,可能A线程的条件没有满足只是暂时的, 稍后其他的线程B可能会更新条件使得A线程的条件得到满足. 可以将A线程暂停,直到它的条件得到满足后再将A线程唤醒.它的伪代码:

atomics{		//原子操作
while( 条件不成立 ){
等待
}
当前线程被唤醒条件满足后,继续执行下面的操作
}

等待/通知机制的实现

Object类中的wait()方法可以使执行当前代码的线程等待,暂停执行,直到接到通知或被中断为止。

注意:

● wait()方法只能 在同步代码块中由锁对象调用。

● 调用wait()方法,当前线程会释放锁。

其伪代码如下:

//在调用wait()方法前获得对象的内部锁
synchronized( 锁对象 ){
while( 条件不成立 ){
//通过锁对象调用 wait()方法暂停线程,会释放锁对象
锁对象.wait();
}
//线程的条件满足了继续向下执行
}

Object类的notify()可以唤醒线程,该方法也必须在同步代码块中由锁对象调用. 没有使用锁对象调用 wait()/notify()会抛出IlegalMonitorStateExeption异常. 如果有多个等待的线程,notify()方法只能唤醒其中的一个. 在同步代码块中调用notify()方法后,并不会立即释放锁对象,需要等当前同步代码块执行完后才会释放锁对象,一般将notify()方法放在同步代码块的最后. 它的伪代码如下:

synchronized( 锁对象 ){
//执行修改保护条件 的代码
//唤醒其他线程
锁对象.notify();
}
package com.wkcto.wait;

/**
 * 需要通过notify()唤醒等待的线程
 * 北京极悦老崔
 */
public class Test03 {
    public static void main(String[] args) throws InterruptedException {
        String lock = "wkcto";      //定义一个字符串作为锁对象
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    System.out.println("线程1开始等待: " + System.currentTimeMillis());
                    try {
                        lock.wait();        //线程等待,会释放锁对象,当前线程转入blocked阻塞状态
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("线程1结束等待:" + System.currentTimeMillis());
                }
            }
        });

        //定义第二个线程,在第二个线程中唤醒第一个线程
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                //notify()方法也需要在同步代码块中,由锁对象调用
                synchronized (lock){
                    System.out.println("线程2开始唤醒 : " + System.currentTimeMillis());
                    lock.notify();      //唤醒在lock锁对象上等待的某一个线程
                    System.out.println("线程2结束唤醒 : " + System.currentTimeMillis());
                }
            }
        });

        t1.start();         //开启t1线程,t1线程等待

        Thread.sleep(3000);     //main线程睡眠3秒,确保t1入睡

        t2.start();         //t1线程开启3秒后,再开启t2线程唤醒t1线程
    }
}

notify()方法后不会立即释放锁对象

package com.wkcto.wait;

import java.util.ArrayList;
import java.util.List;

/**
 * notify()不会立即释放锁对象
 * 北京极悦老崔
 */
public class Test04 {
    public static void main(String[] args) throws InterruptedException {
        //定义一个List集合存储String数据
        List<String> list = new ArrayList<>();

        //定义第一个线程,当list集合中元素的数量不等于5时线程等待
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (list){
                    if ( list.size() != 5 ){
                        System.out.println("线程1开始等待: " + System.currentTimeMillis());
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("线程1被唤醒:" + System.currentTimeMillis());
                    }
                }
            }
        });

        //定义第二个线程,向list集合中添加元素
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (list){
                    for (int i = 0; i < 10; i++) {
                        list.add("data--" + i);
                        System.out.println("线程2添加了第" + (i+1) + "个数据");

                        //判断元素的数量是否满足唤醒线程1
                        if (list.size() == 5 ){
                            list.notify();      //唤醒 线程, 不会立即释放锁对象,需要等到当前同步代码块都执行完后才能释放锁对象
                            System.out.println("线程2已经发现唤醒通知");
                        }
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        });

        t1.start();
        //为了确保t2在t1之后开启,即让t1线程先睡眠
        Thread.sleep(500);
        t2.start();
    }
}

interrupt()方法会中断wait()

当线程处于wait()等待状态时, 调用线程对象的interrupt()方法会中断线程的等待状态, 会产生InterruptedException异常。

package com.wkcto.wait;

/**
 * Interrupt()会中断线程的wait()等待
 * 北京极悦老崔
 */
public class Test05 {
    public static void main(String[] args) throws InterruptedException {
        SubThread t = new SubThread();
        t.start();

        Thread.sleep(2000);     //主线程睡眠2秒, 确保子线程处于Wait等待状态
        t.interrupt();
    }

    private  static final  Object LOCK = new Object();      //定义常量作为锁对象
    static  class  SubThread extends  Thread{
        @Override
        public void run() {
            synchronized (LOCK){
                try {
                    System.out.println("begin wait...");
                    LOCK.wait();
                    System.out.println("end wait..");
                } catch (InterruptedException e) {
                    System.out.println("wait等待被中断了****");
                }
            }
        }
    }
}

notify()与notifyAll()

notify()一次只能唤醒一个线程,如果有多个等待的线程,只能随机唤醒其中的某一个; 想要唤醒所有等待线程,需要调用notifyAll()。

package com.wkcto.wait;

/**
 * notify()与notifyAll()
 * 北京极悦老崔
 */
public class Test06 {
    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();         //定义一个对象作为子线程的锁对象
        SubThread t1 = new SubThread(lock);
        SubThread t2 = new SubThread(lock);
        SubThread t3 = new SubThread(lock);
        t1.setName("t1");
        t2.setName("t2");
        t3.setName("t3");
        t1.start();
        t2.start();
        t3.start();

        Thread.sleep(2000);
        //调用notify()唤醒 子线程
        synchronized (lock){
//            lock.notify();      //调用一次notify()只能唤醒其中的一个线程,其他等待的线程依然处于等待状态,对于处于等待状态的线程来说,错过了通知信号,这种现象也称为信号丢失
            lock.notifyAll();       //唤醒所有的线程
        }
    }

    static  class  SubThread extends  Thread{
        private Object lock;        //定义实例变量作为锁对象
        public SubThread(Object lock) {
            this.lock = lock;
        }

        @Override
        public void run() {
            synchronized (lock){
                try {
                    System.out.println(Thread.currentThread().getName() + " -- begin wait...");
                    lock.wait();
                    System.out.println( Thread.currentThread().getName() + " -- end wait...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

wait(long)的使用

wait(long)带有long类型参数的wait()等待,如果在参数指定的时间内没有被唤醒,超时后会自动唤醒。

package com.wkcto.wait;

/**
 * wait(long)
 * 北京极悦老崔
 */
public class Test07 {
    public static void main(String[] args) {
        final Object obj = new Object();
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized ( obj ){
                    try {
                        System.out.println("thread begin wait");
                        obj.wait(5000);         //如果5000毫秒内没有被唤醒 ,会自动唤醒
                        System.out.println("end wait....");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        t.start();
    }
}

通知过早

线程wait()等待后,可以调用notify()唤醒线程, 如果notify()唤醒的过早,在等待之前就调用了notify()可能会打乱程序正常的运行逻辑。

package com.wkcto.wait;

/**
 * notify()通知过早
 * 北京极悦老崔
 */
public class Test08 {
    public static void main(String[] args) {
        final  Object Lock = new Object();      //定义对象作为锁对象
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock){
                    try {
                        System.out.println("begin wait");
                        Lock.wait();
                        System.out.println("wait end...");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock){
                    System.out.println("begin notify");
                    Lock.notify();;
                    System.out.println("end nofity");
                }
            }
        });

        //如果先开启t1,再开启t2线程,大多数情况下, t1先等待,t1再把t1唤醒
//        t1.start();
//        t2.start();

        //如果先开启t2通知线程,再开启t1等待线程,可能会出现t1线程等待没有收到通知的情况,
        t2.start();
        t1.start();

    }
}
package com.wkcto.wait;

/**
 * notify()通知过早, 就不让线程等待了
 * 北京极悦老崔
 */
public class Test09 {
    static boolean isFirst = true;      //定义静态变量作为是否第一个运行的线程标志
    public static void main(String[] args) {
        final  Object Lock = new Object();      //定义对象作为锁对象

        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock){
                    while ( isFirst ) {         //当线程是第一个开启的线程就等待
                        try {
                            System.out.println("begin wait");
                            Lock.wait();
                            System.out.println("wait end...");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (Lock){
                    System.out.println("begin notify");
                    Lock.notify();;
                    System.out.println("end nofity");
                    isFirst = false;        //通知后,就把第一个线程标志修改为false
                }
            }
        });

        //如果先开启t1,再开启t2线程,大多数情况下, t1先等待,t1再把t1唤醒
//        t1.start();
//        t2.start();

        //如果先开启t2通知线程,再开启t1等待线程,可能会出现t1线程等待没有收到通知的情况,
        t2.start();
        t1.start();

        //实际上,调用start()就是告诉线程调度器,当前线程准备就绪,线程调度器在什么时候开启这个线程不确定,即调用start()方法的顺序,并不一定就是线程实际开启的顺序.
        //在当前示例中,t1等待后让t2线程唤醒 , 如果t2线程先唤醒了,就不让t1线程等待了


    }
}

wait等待条件发生了变化

在使用wait/nofity模式时,注意wait条件发生了变化,也可能会造成逻辑的混乱。

package com.wkcto.wait;

import java.util.ArrayList;
import java.util.List;

/**
 * wait条件发生变化
 *  定义一个集合
 *  定义一个线程向集合中添加数据,添加完数据后通知另外的线程从集合中取数据
 *  定义一个线程从集合中取数据,如果集合中没有数据就等待
 * 北京极悦老崔
 */
public class Test10 {
    public static void main(String[] args) {
        //定义添加数据的线程对象
        ThreadAdd threadAdd = new ThreadAdd();
        //定义取数据的线程对象
        ThreadSubtract threadSubtract = new ThreadSubtract();
        threadSubtract.setName("subtract 1 ");
        //测试一: 先开启添加数据的线程,再开启一个取数据的线程,大多数情况下会正常取数据
//        threadAdd.start();
//        threadSubtract.start();
        //测试二: 先开启取数据的线程,再开启添加数据的线程, 取数据的线程会先等待,等到添加数据之后 ,再取数据
//        threadSubtract.start();
//        threadAdd.start();

        //测试三: 开启两个 取数据的线程,再开启添加数据的线程
        ThreadSubtract threadSubtract2 = new ThreadSubtract();
        threadSubtract2.setName("subtract 2 ");
        threadSubtract.start();
        threadSubtract2.start();
        threadAdd.start();
        /*
            某一次执行结果如下:
                subtract 1  begin wait....
                subtract 2 从集合中取了data后,集合中数据的数量:0
                subtract 1  end wait..
                Exception in thread "subtract 1 " java.lang.IndexOutOfBoundsException:
           分析可能的执行顺序:
                threadSubtract线程先启动, 取数据时,集合中没有数据,wait()等待
                threadAdd线程获得CPU执行权, 添加数据 , 把threadSubtract线程唤醒,
                threadSubtract2线程开启后获得CPU执行权, 正常取数据
                threadSubtract线程获得CPU执行权, 打印 end wait..., 然后再执行list.remove(0)取数据时,现在list集合中已经没有数据了, 这时会产生java.lang.IndexOutOfBoundsException异常
           出现异常的原因是: 向list集合中添加了一个数据,remove()了两次
           如何解决?
                当等待的线程被唤醒后, 再判断一次集合中是否有数据可取. 即需要把sutract()方法中的if判断改为while
         */
    }
    //1)定义List集合
    static List list = new ArrayList<>();

    //2)定义方法从集合中取数据
    public static void subtract(){
        synchronized (list) {
//            if (list.size() == 0) {
            while (list.size() == 0) {
                try {
                    System.out.println(Thread.currentThread().getName() + " begin wait....");
                    list.wait();        //等待
                    System.out.println(Thread.currentThread().getName() + " end wait..");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Object data = list.remove(0);   //从集合中取出一个数据
            System.out.println( Thread.currentThread().getName() + "从集合中取了" + data + "后,集合中数据的数量:" + list.size());
        }
    }

    //3)定义方法向集合中添加数据后,通知等待的线程取数据
    public static void add(){
        synchronized (list){
            list.add("data");
            System.out.println( Thread.currentThread().getName() + "存储了一个数据");
            list.notifyAll();
        }
    }

    //4)定义线程类调用add()取数据的方法
    static class ThreadAdd extends  Thread{
        @Override
        public void run() {
            add();
        }
    }
    //定义线程类调用subtract()方法
    static class ThreadSubtract extends  Thread{
        @Override
        public void run() {
            subtract();
        }
    }
}

 

技术文档推荐

更多>>

视频教程推荐

更多>>