Reactor模式的理解


Reactor模式在服务端中被广泛使用,Reactor是一个设计模式。Reactor把服务端的请求处理分为几个部分处理,提高了CPU的利用率。同时Reactor需要和NIO一起使用,才能使效率最高。

Class Reactor:

/** 
 * 
 * 经典的网络服务在每个线程中完成对数据的处理:
 * 但这种模式在用户负载增加时,性能将下降非常的快。
 * 系统运行的性能瓶颈通常在I/O读写,包括对端口和文件的操作上,过去,在打 开一个I/O通道后,
 * read()将一直等待在端口一边读取字节内容,如果没有内容进来,read()也是傻傻的等,
 * 这会影响我们程序继续做其他事情,那 么改进做法就是开设线程,让线程去等待,但是这样做也是相当耗费资源(传统socket通讯服务器设计模式) 的。
 * 
 * Java NIO非堵塞技术实际是采取Reactor模式,或者说是Observer模式为我们监察I/O端口,
 * 如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。 
 * NIO 有一个主要的类Selector,这个类似一个观察者 ,只要我们把需要探知的 socketchannel告诉Selector,
 * 我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些 Key,就会获得我们刚刚注册过的socketchannel,
 * 然后,我们从这个Channel中读取数据,放心,包准能够读到,接着我们可以处理这些数据。 
 * Selector内部原理实际是在做一个对所注册的channel的轮询访问 ,不断的轮询(目前就这一个算法),一旦轮询到一个channel有所注册的事情发生,
 * 比如数据来了,他就会站起来报告,交出一把钥匙,
 * 让我们通过这把钥匙(SelectionKey 表示 SelectableChannel 在 Selector 中的注册的标记。 )来读取这个channel的内容。 
 * 
 * 反应器模式 
 * 用于解决多用户访问并发问题 
 * 举个例子:餐厅服务问题 
 * 传统线程池做法:来一个客人(请求)去一个服务员(线程) 
 * 反应器模式做法:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员” 
 */  
public class Reactor implements Runnable{ 
    //同步事件分离器,阻塞等待Handles中的事件发生
    public final Selector selector;  
    public final ServerSocketChannel serverSocketChannel;  

    public Reactor(int port) throws IOException{  
        selector=Selector.open();  
        serverSocketChannel=ServerSocketChannel.open();  
        InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);  
        serverSocketChannel.socket().bind(inetSocketAddress);  
        /*
         * ServerSocketChannel可以设置成非阻塞模式。
         * 在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。
         * 因此,需要检查返回的SocketChannel是否是null.
         */
        serverSocketChannel.configureBlocking(false);  

        /*
         * 向selector注册该serverSocketChannel
         * SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了
         * SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户与服务器的连接已经建立成功
         * SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了)
         * SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作)
         * 这里 注意,下面两种,SelectionKey.OP_READ ,SelectionKey.OP_WRITE ,
         * 1.当向通道中注册SelectionKey.OP_READ事件后,如果客户端有向缓存中write数据,下次轮询时,则会 isReadable()=true;
         * 2.当向通道中注册SelectionKey.OP_WRITE事件后,这时你会发现当前轮询线程中isWritable()一直为ture,如果不设置为其他事件
         */
        SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  

        /*
         * 利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor
         * 该selectionKey为serverSocketChannel的selectionKey
         * attach的为new Acceptor(this)
         * 用于void dispatch(SelectionKey key)中获取key.attachment()
         * 将被本类中的run()方法的selectionKeys.clear(); 清空
         * 第二次的selector.selectedKeys();获取到的将会是socketChannel注册的OP_READ的selectionKey(attach的为SocketReadHandler)
         */
        selectionKey.attach(new Acceptor(this));  
    }  

    @Override  
    public void run() {  
        try {  
            while(!Thread.interrupted()){  
                selector.select();  
                Set<SelectionKey> selectionKeys= selector.selectedKeys();  
                Iterator<SelectionKey> it=selectionKeys.iterator();  
                //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。  
                while(it.hasNext()){  
                    SelectionKey selectionKey=it.next();

                    /*
                     * 第一次触发此方法,获取(OP_ACCEPT)selectionKey.attachment()为new Acceptor(this)
                     * Acceptor run()方法里面为 new SocketReadHandler(reactor.selector, socketChannel);
                     * 在SocketReadHandler构造方法中将socketChannel register到Selector,返回selectionKey
                     * 再将该socketChannel的selectionKey attach(this); this represent new出来的SocketReadHandler
                     * 
                     * 第二次触发此方法,获取(OP_READ)selectionKey.attachment()为new出来的SocketReadHandler
                     * SocketReadHandler run()方法里面为 socketChannel.read(inputBuffer); 实际处理的逻辑代码
                     */
                    dispatch(selectionKey);

                    /*
                     * selectionKeys.clear();  将selectionKeys清空,
                     * Acceptor类中的run()>>>new SocketReadHandler()构造方法中的 selector.wakeup()>>>再次触发selector.select();
                     * Set<SelectionKey> selectionKeys= selector.selectedKeys();
                     * 第一次遍历的selectionKeys里面只有一个就是OP_ACCEPT的selectionKey,attachment为Acceptor对象
                     * 第二次遍历的selectionKeys里面只有一个就是OP_READ的selectionKey,attachment为SocketReadHandler对象
                     */
                    selectionKeys.clear();  
                }  
            }  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  

    /** 
     * 运行Acceptor或SocketReadHandler 
     */  
    void dispatch(SelectionKey key) {
        //本例第一次此方法执行key为serverSocketChannel注册的selectionKey,key.attachment()为Acceptor对象
        //本例第二次此方法执行key为socketChannel注册的selectionKey,key.attachment()为SocketReadHandler对象
        Runnable r = (Runnable)(key.attachment());    
        if (r != null){
            /*
             * 第一次执行Acceptor的run(),run()方法将调用SocketReadHandler构造方法
             * 在SocketReadHandler构造方法中将向selector注册socketChannel,并attach(SocketReadHandler对象)
             * 第二次执行SocketReadHandler的run(),处理具体逻辑代码
             */
            r.run();  
        }    
    }    

}  

Class Acceptor:

public class Acceptor implements Runnable{  
    private Reactor reactor;  
    public Acceptor(Reactor reactor){  
        this.reactor=reactor;  
    }  
    @Override  
    public void run() {  
        try {  
            /*
             * ServerSocketChannel可以设置成非阻塞模式。
             * 在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。
             * 因此,需要检查返回的SocketChannel是否是null.
             */
            SocketChannel socketChannel=reactor.serverSocketChannel.accept(); 

            /*
             * 调用Handler来处理channel
             * 在SocketReadHandler构造方法中将socketChannel register到Selector,返回selectionKey
             * 再将该socketChannel的selectionKey attach(this); this represent new出来的SocketReadHandler
             */
            if(socketChannel!=null) new SocketReadHandler(reactor.selector, socketChannel);  
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
}

Class SocketReadHandler :

public class SocketReadHandler implements Runnable{  
    private SocketChannel socketChannel;  
    public SocketReadHandler(Selector selector,
            SocketChannel socketChannel) throws IOException{  
        this.socketChannel=socketChannel;  
        socketChannel.configureBlocking(false);  

        SelectionKey selectionKey=socketChannel.register(selector, 0);  

        //将该socketChannel注册的SelectionKey绑定为本SocketReadHandler 
        //下一步有事件触发时,将调用本类的run方法。    
        //参看dispatch(SelectionKey key)    
        selectionKey.attach(this);  

        //同时将SelectionKey标记为可读,以便读取。    
        selectionKey.interestOps(SelectionKey.OP_READ);    
        selector.wakeup();  
    }  

    /** 
     * 处理读取数据 
     */  
    @Override  
    public void run() {  
        ByteBuffer inputBuffer=ByteBuffer.allocate(1024);  
        inputBuffer.clear();  
        try {  
            socketChannel.read(inputBuffer);  
            //激活线程池 处理这些request  
            //requestHandle(new Request(socket,btt));   
        } catch (IOException e) {  
            e.printStackTrace();  
        }  
    }  
}

Author: 顺坚
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source 顺坚 !
评论
 Current
Reactor模式的理解 Reactor模式的理解
Reactor模式在服务端中被广泛使用,Reactor是一个设计模式。Reactor把服务端的请求处理分为几个部分处理,提高了CPU的利用率。同时Reactor需要和NIO一起使用,才能使效率最高。 Class Reactor: /**
2024-08-25
Next 
SpringBoot启动过程和自动装配 SpringBoot启动过程和自动装配
SpringBoot看过很多次,但是一直理解不深,只停留在了解的阶段。直到最近公司项目做拆分,需要用到SpringBoot启动时的一些特性,于是把启动过程看了一遍。举个例子 测试对Spring启动原理的理解 Rpc框架和Spring的集成问
2024-07-27
  TOC