线程通信方式有几种(实现线程间通信的几种方法)
-
-
类目:知识大全
-
联系人:
-
微信号:
-
Q Q 号:
-
手机号:
-
浏览量:
300
【商户信息】
【货源详情】
前言
在开发中,不可避免地会遇到需要向主线程通知所有子线程执行完成通知来处理特定逻辑的情况。
或者,线程a执行某个条件通知线程b中的操作。
可以通过以下方法实现。
等待通知机制
通知等待模式是Java中比较经典的线程通信方式。
两个线程通过对同一对象调用wait ()、notify和通知方法进行通信。
例如,两个线程交替打印奇偶校验数:
publicclasstwothreadwaitnotify {
private int start=1;
私有布尔标志=false;
publicstaticvoidmain (string [ ] args ) {
twothreadwaitnotifytwothread=newtwothreadwaitnotify (;
threadT1=newthread(newounum ) twothread );
T1.setname(a );
threadT2=newthread(newJinum ) twothread );
T2.setname(b );
t1.start (;
t2.start (;
}
publicstaticclassounumimplementsrunnable {
privatetwothreadwaitnotifynumber;
publicou num (twothreadwaitnotifynumber ) (
this.number=number;
}
@Override
公共语音运行(}
wile(number.start=100 ) {
同步(twothreadwaitnotify.class )。
System.out.println (偶数线程夺取了锁);
if(number.flag ) {
system.out.println (thread.current thread ).getName () -偶数' number.start );
number.start;
number.flag=false;
twothreadwaitnotify.class.notify (;
}else {
try {
TwoThreadWaitNotify.class.wait (;
} catch (交互扩展) )
e .打印任务跟踪(;
}
}
}
}
}
}
publicstaticclassjinumimplementsrunnable {
privatetwothreadwaitnotifynumber;
publicjinum { twothreadwaitnotifynumber }
this.number=number;
}
@Override
公共语音运行(}
wile(number.start=100 ) {
同步(twothreadwaitnotify.class )。
System.out.println ('奇数线程夺取了锁');
if (! number.flag ) {
system.out.println (thread.current thread ).getName () -奇数' number.start );
number.start;
number.flag=true;
twothreadwaitnotify.class.notify (;
}else {
try {
TwoThreadWaitNotify.class.wait (;
} catch (交互扩展) )
e .打印任务跟踪(;
}
}
}
}
}
}
}
输出结果:
t2 -奇数93
t1 -偶数94
t2 -奇数95
t1 -偶数96
t2 -奇数97
t1 -偶数98
t2 -奇数99
t1 -偶数100
这里的线程a和线程b对同一个对象TwoThreadWaitNotify.class获取锁定,a线程调用同步对象的wait )方法解除锁定,进入等待状态。
b线程调用了notify ) )方法,以便在a线程收到通知后可以从wait ) )方法返回。
这里利用TwoThreadWaitNotify.class对象完成了通信。
:有需要注意的东西
wait ()、notify (notify all ) )的调用都假定已获得对象的锁定)。
wait ) )方法时,线程将解锁并进入等待状态,并且线程也会移动到等待队列。
调用notify (方法时,等待队列的线程将移动到同步队列,并且线程的状态也会更新为BLOCKED
wait ) )从方法返回的前提是调用notify ) )方法的线程解除锁定,wait ) )方法的线程获得锁定。
等待通知有一个经典的范式:
作为消费者的线程a :
获取对象的锁定。
进入while (判断条件),调用wait )方法。
满足条件后,退出循环,执行具体的处理逻辑。
线程b作为生产者:
获取对象锁定。
更改与线程a共享的判断条件。
notify (调用方法。
伪代码是下一个:
//Thread A
同步(object ) {
while (条件)
Object.wait (;
}
//do something
}
//Thread B
同步(object ) {
条件=假; //改变条件
Object.notify (;
}
join() 方法
privatestaticvoidjoin { } throwsinterruptedexception {
threadT1=newthread(newrunnable () {
@Override
公共语音运行(}
Logger.info(running );
try {
thread.sleep(3000;
} catch (交互扩展) )
e .打印任务跟踪(;
}
}
);
threadT2=newthread(newrunnable () {
@Override
公共语音运行(}
Logger.info(running2);
try {
thread.sleep(4000;
} catch (交互扩展) )
e .打印任务跟踪(;
}
}
);
t1.start (;
t2.start (;
//等待线程1结束
t1.join (;
//等待线程2结束
t2.join (;
Logger.info(mainover );
}
输出结果:
2018-03-1620336021336030.967 [ thread-1 ] infoc.c.actual.thread communication-running 2
2018-03-1620336021336030.967 [ thread-0 ] infoc.c.actual.thread communication-running
2018-03-1620336021336034.972 [ main ] infoc.c.actual.thread communication-mainover
t1.join ) )中,由于在t1的执行完成之前会被屏蔽,所以最终主线程会等待t1和t2的线程的执行完成。
实际上,从源代码中可以看出,这是一种也使用join ()的等待通知的机制:
核心逻辑:
wile(isalive ) ) }
wait(0;
}
join线程完成后,将调用notifyAll ()方法,并在JVM实现中调用该方法,因此在此处不可见。
volatile 共享内存
由于Java通过共享存储器方式进行线程通信,所以可以用以下方法通过主线程关闭a线程:
publicclassvolatileimplementsrunnable {
privatestaticvolatilebooleanflag=true;
@Override
公共语音运行(}
while(flag ) {
system.out.println (thread.current thread ().getName ) )正在运行。 请参阅。 );
}
system.out.println (thread.current thread ().getName (执行完毕) );
}
publicstaticvoidmain (string [ ] args ) throws InterruptedException {。
Volatile aVolatile=new Volatile (;
newthread(avolatile,' thread A ' ).start );
system.out.println (主线程正在运行);
time unit.milliseconds.sleep (100;
aVolatile.stopThread (;
}
私有语音停止
flag=false;
}
}
输出结果:
趋势科技正在运行。
趋势科技正在运行。
趋势科技正在运行。
趋势科技正在运行。
thread A执行完成
由于这里的flag存储在主存储器中,所以主线程和线程a都可以看到。
flag采用volatile修饰主要是为了内存的可见性,更多的内容可以看到这里。
CountDownLatch 并发工具
CountDownLatch可以实现与join相同的功能,但更灵活。
privatestaticvoidcountdownlatch { } throws exception {
int thread=3;
long start=system.current time millis (;
finalcountdownlatchcountdown=newcountdownlatch (thread;
for(intI=0; ithread; I ) {
新标题(新运行) )。
@Override
公共语音运行(}
Logger.info(Threadrun );
try {
thread.sleep(2000;
countDown.countDown (;
Logger.info(Threadend );
} catch (交互扩展) )
e .打印任务跟踪(;
}
}
().start );
}
countDown.await (;
long stop=system.current time millis (;
logger.info (mainovertotaltime={ },stop-start );
}
输出结果:
2018-03-1620336019336044.126 [ thread-0 ] infoc.c.actual.thread communication-thread run
2018-03-1620336019336044.126 [ thread-2 ] infoc.c.actual.thread communication-thread run
2018-03-1620336019336044.126 [ thread-1 ] infoc.c.actual.thread communication-thread run
2018-03-1620336019336046.136 [ thread-2 ] infoc.c.actual.thread communication-thread end
2018-03-1620336019336046.136 [ thread-1 ] infoc.c.actual.thread communication-thread end
2018-03-1620336019336046.136 [ thread-0 ] infoc.c.actual.thread communication-thread end
2018-03-1620336019336046.136 [ main ] infoc.c.actual.thread communication-mainovertotaltime=2012
CountDownLatch也基于AQS(abstractqueuedsynchronizer )实现,更多的实现借鉴了ReentrantLock的实现原理
在初始化CountDownLatch时通知并发线程,并在处理每个线程后调用countDown ()方法。
此方法使用内置于AQS中的state状态-1。
最终,在主线程上调用await )方法,阻止并返回,直到state==0。
CyclicBarrier 并发工具
privatestaticvoidcyclicbarrier { } throws exception {
cyclicbarriercyclicbarrier=newcyclicbarrier (3;
新标题(新运行) )。
@Override
公共语音运行(}
Logger.info(Threadrun );
try {
cyclicBarrier.await (;
}catch(exceptione ) {
e .打印任务跟踪(;
}
logger.info (threadenddosomething );
}
().start );
新标题(新运行) )。
@Override
公共语音运行(}
Logger.info(Threadrun );
try {
cyclicBarrier.await (;
}catch(exceptione ) {
e .打印任务跟踪(;
}
logger.info (threadenddosomething );
}
().start );
新标题(新运行) )。
@Override
公共语音运行(}
Logger.info(Threadrun );
try {
thread.sleep(5000;
cyclicBarrier.await (;
}catch(exceptione ) {
e .打印任务跟踪(;
}
logger.info (threadenddosomething );
}
().start );
Logger.info(mainthread );
}
CyclicBarrier的名称称为屏障或屏障,也可用于线程间通信。
您可以等待所有n个线程都存在后再继续运行的效果。
首先初始化线程参与者。
调用await ()时,请等待所有参与者线程被调用。
所有参与者调用await ()后,所有线程都将从await ()返回并继续后续逻辑。
执行结果:
2018-03-1822336040336000.731 [ thread-0 ] infoc.c.actual.thread communication-thread run
2018-03-1822336040336000.731 [ thread-1 ] infoc.c.actual.thread communication-thread run
2018-03-1822336040336000.731 [ thread-2 ] infoc.c.actual.thread communication-thread run
2018-03-1822336040336000.731 [ main ] infoc.c.actual.thread communication-mainthread
2018-03-1822336040336005.741 [ thread-0 ] infoc.c.actual.thread communication-threadenddosomething
2018-03-1822336040336005.741 [ thread-1 ] infoc.c.actual.thread communication-threadenddosomething
2018-03-1822336040336005.741 [ thread-2 ] infoc.c.actual.thread communication-threadenddosomething
您可以看到,由于其中一个线程暂停了5秒钟,所有剩下的线程都必须等待此线程调用await ()。
该工具可以实现与CountDownLatch相同的功能,但必须更灵活。 reset ) )方法来重置并重新运行CyclicBarrier。 (您必须自己捕获BrokenBarrierException处理。
线程响应中断
publicclassstopthreadimplementsrunnable {
@Override
公共语音运行(}
while (! Thread.currentThread ().isInterrupted ) }
//线程执行具体逻辑
system.out.println (thread.current thread ().getName ) )正在运行。 请参阅。 );
() ) ) ) )。
system.out.println (thread.current thread ().getName ) )将退出。 请参阅。 );
() ) ) ) )。
publicstaticvoidmain (string [ ] args ) throws InterruptedException {。
thread thread=new thread (newstopthread (),' thread A );
thread.start (;
system.out.println (主线程正在运行);
timeunit.milliseconds.sleep(10;
thread.interrupt (;
() ) ) ) )。
() ) ) ) )。
输出结果:
thread A正在运行。 请参阅。
thread A正在运行。 请参阅。
thread A退出。 请参阅。
可以中断线程进行通信。 调用thread.interrupt ()方法是将thread的标志属性之一设置为true。
调用这个方法并不意味着线程可以中断,如果不响应这个标志,其实也没有什么用(这里是对这个标志的判断)。
但是如果抛出了 InterruptedException 异常,该标志就会被 JVM 重置为 false。
线程池 awaitTermination() 方法
如果使用线程池管理线程,则可以使用以下方法使主线程等待线程池中的所有任务完成:
privatestaticvoidexecutorservice () throws Exception{
blockingqueuerunnablequeue=newlinkedblockingqueue (10;
threadpoolexecutorpoolexecutor=newthreadpoolexecutor (5,5,1,TimeUnit.MILLISECONDS,queue );
pool executor.execute (new runnable ) )。
@Override
公共语音运行(}
Logger.info(running );
try {
thread.sleep(3000;
} catch (交互扩展) )
e .打印任务跟踪(;
() ) ) ) )。
() ) ) ) )。
);
pool executor.execute (new runnable ) )。
@Override
公共语音运行(}
Logger.info(running2);
try {
thread.sleep(2000;
} catch (交互扩展) )
e .打印任务跟踪(;
() ) ) ) )。
() ) ) ) )。
);
poolExecutor.shutdown (;
while (! pool executor.await termination (1,TimeUnit.SECONDS ) ) }
LOGGER.info (线程仍在运行。 请参阅。 请参阅。 );
() ) ) ) )。
Logger.info(mainover );
() ) ) ) )。
输出结果:
2018-03-1620336018336001.273 [ pool-1-thread-2 ] infoc.c.actual.thread communication-running 2
2018-03-1620336018336001.273 [ pool-1-thread-1 ] infoc.c.actual.thread communication-running
2018-03-1620336018336002.273 [ main ] infoc.c.actual.thread communication -线程仍在运行。 请参阅。 请参阅。
2018-03-1620336018336003.278 [ main ] infoc.c.actual.thread communication -线程仍在运行。 请参阅。 请参阅。
2018-03-1620336018336004.278 [ main ] infoc.c.actual.thread communication-mainover
要使用此awaitTermination () )方法,必须关闭线程池,就像调用shutdown ()方法一样。
调用shutdown ()时,线程池将停止接受新任务,并顺利关闭线程池中的现有任务。
管道通信
公共静态void piped () throws IOException { ) )。
//面向字符的PipedInputStream面向字节
pipedwriterwriter=newpipedwriter (;
pipedreaderreader=newpipedreader (;
//连接输入输出流
writer.connect(reader;
threadT1=newthread(newrunnable () {
@Override
公共语音运行(}
Logger.info(running );
try {
for(intI=0; i 10; I ) {
writer.write(I ' );
thread.sleep(10;
() ) ) ) )。
}catch(exceptione ) {
} finally {
try {
writer.close (;
}catch(ioexceptione ) {
e .打印任务跟踪(;
}
}
}
);
threadT2=newthread(newrunnable () {
@Override
公共语音运行(}
Logger.info(running2);
int msg=0;
try {
while((msg=reader.read ) )!=-1({
Logger.info(msg={},) char ) msg );
}
}catch(exceptione ) {
}
}
);
t1.start (;
t2.start (;
}
输出结果:
2018-03-1619336056336043.014 [ thread-0 ] infoc.c.actual.thread communication-running
2018-03-1619336056336043.014 [ thread-1 ] infoc.c.actual.thread communication-running 2
2018-03-1619336056336043.130 [ thread-1 ] infoc.c.actual.thread communication-msg=0
2018-03-1619336056336043.132 [ thread-1 ] infoc.c.actual.thread communication-msg=1
2018-03-1619336056336043.132 [ thread-1 ] infoc.c.actual.thread communication-msg=2
2018-03-1619336056336043.133 [ thread-1 ] infoc.c.actual.thread communication-msg=3
2018-03-1619336056336043.133 [ thread-1 ] infoc.c.actual.thread communication-msg=4
2018-03-1619336056336043.133 [ thread-1 ] infoc.c.actual.thread communication-msg=5
2018-03-1619336056336043.133 [ thread-1 ] infoc.c.actual.thread communication-msg=6
2018-03-1619336056336043.134 [ thread-1 ] infoc.c.actual.thread communication-msg=7
2018-03-1619336056336043.134 [ thread-1 ] infoc.c.actual.thread communication-msg=8
2018-03-1619336056336043.134 [ thread-1 ] infoc.c.actual.thread communication-msg=9
虽然Java是基于内存的通信,但也可以使用管道通信。
请注意,输入流和输出流必须首先建立连接。 这使得线程b可以接收来自线程a的消息。