线程间的通信


在Java中线程之间通信一般使用共享变量的方式,用同步锁来实现。在Golang中不推荐使用共享变量的方式,而是更多使用管道来实现信息同步。本文总结一下在Java中线程间通信的方式

要实现线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。

使用 volatile 关键字

基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想,大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式

public class TestSync {
    // 定义一个共享变量来实现通信,它需要是volatile修饰,否则线程不能及时感知
    static volatile boolean notice = false;

    public static void main(String[] args) {
        List<String>  list = new ArrayList<>();

        // 实现线程A
        Thread threadA = new Thread(() -> {
            running = true;
        });

        // 实现线程B
        Thread threadB = new Thread(() -> {
             // 开关
            while(!notice){
                Thread.sleep(1000);
            }
            // 执行线程任务
            System.out.println("线程B收到通知,开始执行自己的业务...");
            doSometing();
        });
        // 需要先启动线程B
        Thread.sleep(1000);
        // 再启动线程A
        threadA.start();
    }
}

等待/通知机制

等待/通知机制的实现由Java完成,由Object类提供了线程间通信的方法:wait()notify()notifyaAl()。对于wait()和notify()的理解,还是要从JDK官方文档中开始,在Object类方法中有:

  • void notify()
    Wakes up a single thread that is waiting on this object’s monitor.
    译:唤醒在此对象监视器上等待的单个线程

  • void notifyAll()
    Wakes up all threads that are waiting on this object’s monitor.
    译:唤醒在此对象监视器上等待的所有线程

  • void wait( )
    Causes the current thread to wait until another thread invokes the notify() method or the notifyAll( ) method for this object.
    译:导致当前的线程等待,直到其他线程调用此对象的notify( ) 方法或 notifyAll( ) 方法

  • void wait(long timeout)
    Causes the current thread to wait until either another thread invokes the notify( ) method or the notifyAll( ) method for this object, or a specified amount of time has elapsed.
    译:导致当前的线程等待,直到其他线程调用此对象的notify() 方法或 notifyAll() 方法,或者指定的时间过完。

  • void wait(long timeout, int nanos)
    Causes the current thread to wait until another thread invokes the notify( ) method or the notifyAll( ) method for this object, or some other thread interrupts the current thread, or a certain amount of real time has elapsed.
    译:导致当前的线程等待,直到其他线程调用此对象的notify( ) 方法或 notifyAll( ) 方法,或者其他线程打断了当前线程,或者指定的时间过完。

需要注意的是:不管是等待线程还是唤醒线程都在同步块里调用wait()和notify()。这是强制性的!一个线程如果没有持有对象锁,将不能调用wait(),notify()或者notifyAll()。否则,会抛出IllegalMonitorStateException异常。下面根据官方文档总结一下:

  1. wait( ),notify( ),notifyAll() 是Object基础类中的方法,因为每个对象都有锁,当然操作锁的方法也是最基础的方法了。
  2. 当需要调用以上的方法的时候,一定要对竞争资源进行加锁,如果不加锁的话,则会报 IllegalMonitorStateException 异常
  3. 调用obj.wait()方法会释放了obj的锁,否则其他线程也无法获得obj的锁,也就无法在synchronized(obj){ obj.notify() } 代码段内唤醒A。
  4. notify()方法只会通知等待队列中的第一个相关线程(不会通知优先级比较高的线程)
  5. notifyAll()通知所有等待该竞争资源的线程(也不会按照线程的优先级来执行)
  6. 锁对象obj.notify()/notifyAll()只能唤醒由锁对象obj wait的线程
  7. 假设有三个线程执行了obj.wait( ),那么obj.notifyAll()则能全部唤醒tread1,thread2,thread3,但是要继续执行obj.wait() 的下一条语句,必须获得obj锁。因此,tread1,thread2,thread3只有一个有机会获得锁继续执行,例如tread1,其余的需要等待thread1释放obj锁之后才能继续执行。
  8. 当调用obj.notify()/notifyAll()后,调用线程依旧持有obj锁,因此,thread1,thread2,thread3虽被唤醒,但是仍无法获得obj锁。直到调用线程退出synchronized块,释放obj锁后,thread1,thread2,thread3中的一个才有机会获得锁继续执行
public class TestSync {
    public static void main(String[] args) {
        // 定义一个锁对象
        Object lock = new Object();

        // 实现线程A
        Thread threadA = new Thread(() -> {
            synchronized (lock) {
                lock.notify();// 唤醒B线程
            }
        });
        // 实现线程B
        Thread threadB = new Thread(() -> {
                synchronized (lock) {
                    lock.wait();
                    System.out.println("线程B收到通知,开始执行自己的业务...");
                }
        });
        // 需要先启动线程B
        threadB.start();
        Thread.sleep(1000);
        // 再启动线程A
        threadA.start();
    }
}

使用LockSupport

LockSupport 是一种非常灵活的实现线程间阻塞和唤醒的工具,使用它不用关注是等待线程先进行还是唤醒线程先运行,但是得知道线程的名字。

public class TestSync {
    public static void main(String[] args) {

         // 实现线程B
        final Thread threadB = new Thread(() -> {
            System.out.println("线程A准备park");
            LockSupport.park();
            System.out.println("线程B收到通知,开始执行自己的业务...");
        });
        // 实现线程A
        Thread threadA = new Thread(() -> {
            System.out.println("开始通知线程B");
            LockSupport.unpark(threadB);
            System.out.println("结束通知线程B");
        });
        // 需要先启动线程B
        threadB.start();
        Thread.sleep(2000);
        // 再启动线程A
        threadA.start();
    }
}

使用 join 方法

join能将并发执行的多条线程串行执行,join函数属于Thread类,通过一个thread对象调用。当在线程B中执行threadA.join()时,线程B将会被阻塞(底层调用wait方法),等到threadA线程运行结束后才会返回join方法。

public static void main(String[] args){
    // 开启一条线程
    Thread t = new Thread(new Runnable(){
        public void run(){
            // doSometing
        }
    }).start();
    // 主线程调用join,等待t线程执行完毕
    try{
        t.join();
    }catch(InterruptedException e){
        // 中断处理……
    }
}

管道流

管道流用于在两个线程之间进行字节流或字符流的传递。管道流的实现依靠PipedOutputStream、PipedInputStream、PipedWriter、PipedReader。分别对应字节流和字符流。但是Java中的Stream是单向的,所以在两个线程中分别建了一个input和output。实现步骤如下:

  1. 在一条线程中分别创建输入流和输出流;
  2. 将输入流和输出流连接起来;
  3. 将输入流和输出流分别传递给两条线程;
  4. 调用read和write方法就可以实现线程间通信。
// 创建输入流与输出流对象
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 连接输入输出流
out.connect(in);
// 创建写线程
class WriteThread extends Thread{
    private PipedWriter out;
    public WriteThread(PipedWriter out){
        this.out = out;
    }
    public void run(){
        out.write("hello concurrent world!");
    }
}
// 创建读线程
class ReaderThread extends Thread{
    private PipedReader in;
    public ReaderThread(PipedReader in){
        this.in = in;
    }
    public void run(){
        BufferedReader reader=new BufferedReader(in);
        String str = reader.readLine();
        System.out.println("收到消息:"+str);
    }
}

使用BlockingQueue

BlockingQueue阻塞队列定义的常用方法如下:

  • add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常。
  • offer(Object):表示如果可能的话,将Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
  • put(Object):把Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。
  • poll(time):获取并删除BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。当不传入time值时,立刻返回。
  • peek():立刻获取BlockingQueue里排在首位的对象,但不从队列里删除,如果队列为空,则返回null。
  • take():获取并删除BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。

BlockingQueue有四个具体的实现类

  • ArrayBlockingQueue:数组阻塞队列,规定大小,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
  • LinkedBlockingQueue:链表阻塞队列,大小不定,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO顺序排序的。
  • PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
  • SynchronousQueue:特殊的BlockingQueue,它的内部同时只能够容纳单个元素,对其的操作必须是放和取交替完成的。
  • DelayQueue:延迟队列,注入其中的元素必须实现 java.util.concurrent.Delayed 接口

所有BlockingQueue的使用方式类似,以下例子一个线程写入,一个线程读取,操作的是同一个Queue。生产者消费者模型

public class BlockingQueueDemo {

    public static void main(String[] args) {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
        //读线程 消费者
        new Thread(() -> {
            // take方法会阻塞直到读到消息
            String item = queue.take();
        }).start();

        //写线程 生产者
        new Thread(() -> {
            queue.put("生产消息!");
        }).start();
    }
}

Author: 顺坚
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source 顺坚 !
评论
 Previous
图灵机与冯诺依曼模型 图灵机与冯诺依曼模型
图灵机是─种抽象的机器,一种抽象的计算模型。由数学家阿兰·图灵提出来的,尽管这个机器很简单,但它可以模拟计算机的任何算法,无论这个算法有多复杂。 当今的计算机都是图灵机的实现,要想知道程序执行的原理,我们可以先从「图灵机」的工作原理说起,图
2022-05-04
Next 
详细解读Raft共识算法 详细解读Raft共识算法
业界最著名的一致性算法就是大名鼎鼎的Paxos(Chubby的作者曾说过:世上只有一种一致性算法,就是Paxos)。但Paxos是出了名的难懂,而Raft正是为了探索一种更易于理解的一致性算法而产生的。Raft是分布式环境下的一致性算法,它
2022-05-01
  TOC