在Java中线程之间通信一般使用共享变量的方式,用同步锁来实现。在Golang中不推荐使用共享变量的方式,而是更多使用管道来实现信息同步。本文总结一下在Java中线程间通信的方式
要实现线程间通信的模型有两种:共享内存和消息传递,以下方式都是基本这两种模型来实现的。
使用 volatile 关键字
基于 volatile 关键字来实现线程间相互通信是使用共享内存的思想,大致意思就是多个线程同时监听一个变量,当这个变量发生变化的时候 ,线程能够感知并执行相应的业务。这也是最简单的一种实现方式
public class TestSync {
// 定义一个共享变量来实现通信,它需要是volatile修饰,否则线程不能及时感知
static volatile boolean notice = false;
public static void main(String[] args) {
List<String> list = new ArrayList<>();
// 实现线程A
Thread threadA = new Thread(() -> {
running = true;
});
// 实现线程B
Thread threadB = new Thread(() -> {
// 开关
while(!notice){
Thread.sleep(1000);
}
// 执行线程任务
System.out.println("线程B收到通知,开始执行自己的业务...");
doSometing();
});
// 需要先启动线程B
Thread.sleep(1000);
// 再启动线程A
threadA.start();
}
}
等待/通知机制
等待/通知机制的实现由Java完成,由Object类提供了线程间通信的方法:wait()、notify()、notifyaAl()。对于wait()和notify()的理解,还是要从JDK官方文档中开始,在Object类方法中有:
void notify()
Wakes up a single thread that is waiting on this object’s monitor.
译:唤醒在此对象监视器上等待的单个线程void notifyAll()
Wakes up all threads that are waiting on this object’s monitor.
译:唤醒在此对象监视器上等待的所有线程void wait( )
Causes the current thread to wait until another thread invokes the notify() method or the notifyAll( ) method for this object.
译:导致当前的线程等待,直到其他线程调用此对象的notify( ) 方法或 notifyAll( ) 方法void wait(long timeout)
Causes the current thread to wait until either another thread invokes the notify( ) method or the notifyAll( ) method for this object, or a specified amount of time has elapsed.
译:导致当前的线程等待,直到其他线程调用此对象的notify() 方法或 notifyAll() 方法,或者指定的时间过完。void wait(long timeout, int nanos)
Causes the current thread to wait until another thread invokes the notify( ) method or the notifyAll( ) method for this object, or some other thread interrupts the current thread, or a certain amount of real time has elapsed.
译:导致当前的线程等待,直到其他线程调用此对象的notify( ) 方法或 notifyAll( ) 方法,或者其他线程打断了当前线程,或者指定的时间过完。
需要注意的是:不管是等待线程还是唤醒线程都在同步块里调用wait()和notify()。这是强制性的!一个线程如果没有持有对象锁,将不能调用wait(),notify()或者notifyAll()。否则,会抛出IllegalMonitorStateException异常。下面根据官方文档总结一下:
- wait( ),notify( ),notifyAll() 是Object基础类中的方法,因为每个对象都有锁,当然操作锁的方法也是最基础的方法了。
- 当需要调用以上的方法的时候,一定要对竞争资源进行加锁,如果不加锁的话,则会报 IllegalMonitorStateException 异常
- 调用obj.wait()方法会释放了obj的锁,否则其他线程也无法获得obj的锁,也就无法在synchronized(obj){ obj.notify() } 代码段内唤醒A。
- notify()方法只会通知等待队列中的第一个相关线程(不会通知优先级比较高的线程)
- notifyAll()通知所有等待该竞争资源的线程(也不会按照线程的优先级来执行)
- 锁对象obj.notify()/notifyAll()只能唤醒由锁对象obj wait的线程
- 假设有三个线程执行了obj.wait( ),那么obj.notifyAll()则能全部唤醒tread1,thread2,thread3,但是要继续执行obj.wait() 的下一条语句,必须获得obj锁。因此,tread1,thread2,thread3只有一个有机会获得锁继续执行,例如tread1,其余的需要等待thread1释放obj锁之后才能继续执行。
- 当调用obj.notify()/notifyAll()后,调用线程依旧持有obj锁,因此,thread1,thread2,thread3虽被唤醒,但是仍无法获得obj锁。直到调用线程退出synchronized块,释放obj锁后,thread1,thread2,thread3中的一个才有机会获得锁继续执行
public class TestSync {
public static void main(String[] args) {
// 定义一个锁对象
Object lock = new Object();
// 实现线程A
Thread threadA = new Thread(() -> {
synchronized (lock) {
lock.notify();// 唤醒B线程
}
});
// 实现线程B
Thread threadB = new Thread(() -> {
synchronized (lock) {
lock.wait();
System.out.println("线程B收到通知,开始执行自己的业务...");
}
});
// 需要先启动线程B
threadB.start();
Thread.sleep(1000);
// 再启动线程A
threadA.start();
}
}
使用LockSupport
LockSupport 是一种非常灵活的实现线程间阻塞和唤醒的工具,使用它不用关注是等待线程先进行还是唤醒线程先运行,但是得知道线程的名字。
public class TestSync {
public static void main(String[] args) {
// 实现线程B
final Thread threadB = new Thread(() -> {
System.out.println("线程A准备park");
LockSupport.park();
System.out.println("线程B收到通知,开始执行自己的业务...");
});
// 实现线程A
Thread threadA = new Thread(() -> {
System.out.println("开始通知线程B");
LockSupport.unpark(threadB);
System.out.println("结束通知线程B");
});
// 需要先启动线程B
threadB.start();
Thread.sleep(2000);
// 再启动线程A
threadA.start();
}
}
使用 join 方法
join能将并发执行的多条线程串行执行,join函数属于Thread类,通过一个thread对象调用。当在线程B中执行threadA.join()时,线程B将会被阻塞(底层调用wait方法),等到threadA线程运行结束后才会返回join方法。
public static void main(String[] args){
// 开启一条线程
Thread t = new Thread(new Runnable(){
public void run(){
// doSometing
}
}).start();
// 主线程调用join,等待t线程执行完毕
try{
t.join();
}catch(InterruptedException e){
// 中断处理……
}
}
管道流
管道流用于在两个线程之间进行字节流或字符流的传递。管道流的实现依靠PipedOutputStream、PipedInputStream、PipedWriter、PipedReader。分别对应字节流和字符流。但是Java中的Stream是单向的,所以在两个线程中分别建了一个input和output。实现步骤如下:
- 在一条线程中分别创建输入流和输出流;
- 将输入流和输出流连接起来;
- 将输入流和输出流分别传递给两条线程;
- 调用read和write方法就可以实现线程间通信。
// 创建输入流与输出流对象
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 连接输入输出流
out.connect(in);
// 创建写线程
class WriteThread extends Thread{
private PipedWriter out;
public WriteThread(PipedWriter out){
this.out = out;
}
public void run(){
out.write("hello concurrent world!");
}
}
// 创建读线程
class ReaderThread extends Thread{
private PipedReader in;
public ReaderThread(PipedReader in){
this.in = in;
}
public void run(){
BufferedReader reader=new BufferedReader(in);
String str = reader.readLine();
System.out.println("收到消息:"+str);
}
}
使用BlockingQueue
BlockingQueue阻塞队列定义的常用方法如下:
- add(Object):把Object加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常。
- offer(Object):表示如果可能的话,将Object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false。
- put(Object):把Object加到BlockingQueue里,如果BlockingQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里有空间再继续。
- poll(time):获取并删除BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null。当不传入time值时,立刻返回。
- peek():立刻获取BlockingQueue里排在首位的对象,但不从队列里删除,如果队列为空,则返回null。
- take():获取并删除BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的对象被加入为止。
BlockingQueue有四个具体的实现类:
- ArrayBlockingQueue:数组阻塞队列,规定大小,其构造函数必须带一个int参数来指明其大小。其所含的对象是以FIFO(先入先出)顺序排序的。
- LinkedBlockingQueue:链表阻塞队列,大小不定,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。其所含的对象是以FIFO顺序排序的。
- PriorityBlockingQueue:类似于LinkedBlockingQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序。
- SynchronousQueue:特殊的BlockingQueue,它的内部同时只能够容纳单个元素,对其的操作必须是放和取交替完成的。
- DelayQueue:延迟队列,注入其中的元素必须实现 java.util.concurrent.Delayed 接口
所有BlockingQueue的使用方式类似,以下例子一个线程写入,一个线程读取,操作的是同一个Queue。生产者消费者模型
public class BlockingQueueDemo {
public static void main(String[] args) {
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
//读线程 消费者
new Thread(() -> {
// take方法会阻塞直到读到消息
String item = queue.take();
}).start();
//写线程 生产者
new Thread(() -> {
queue.put("生产消息!");
}).start();
}
}