博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
reactor模式:单线程的reactor模式
阅读量:4680 次
发布时间:2019-06-09

本文共 10541 字,大约阅读时间需要 35 分钟。

reactor模式称之为响应器模式,常用于nio的网络通信框架,其服务架构图如下

不同于传统IO的串行调度方式,NIO把整个服务请求分为五个阶段

read:接收到请求,读取数据

decode:解码数据

compute:业务逻辑处理

encode:返回数据编码

send:发送数据

其中,以read和send阶段IO最为频繁

 

 

代码实现

1  // Reactor線程   2     package server;   3        4     import java.io.IOException;   5     import java.net.InetSocketAddress;   6     import java.nio.channels.SelectionKey;   7     import java.nio.channels.Selector;   8     import java.nio.channels.ServerSocketChannel;   9     import java.util.Iterator;  10     import java.util.Set;  11       12     public class TCPReactor implements Runnable {  13       14         private final ServerSocketChannel ssc;  15         private final Selector selector;  16       17         public TCPReactor(int port) throws IOException {  18             selector = Selector.open();  19             ssc = ServerSocketChannel.open();  20             InetSocketAddress addr = new InetSocketAddress(port);  21             ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口  22             ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞  23             SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key  24             sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象  25         }  26       27         @Override  28         public void run() {  29             while (!Thread.interrupted()) { // 在線程被中斷前持續運行  30                 System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");  31                 try {  32                     if (selector.select() == 0) // 若沒有事件就緒則不往下執行  33                         continue;  34                 } catch (IOException e) {  35                     // TODO Auto-generated catch block  36                     e.printStackTrace();  37                 }  38                 Set
selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合 39 Iterator
it = selectedKeys.iterator(); 40 while (it.hasNext()) { 41 dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度 42 it.remove(); 43 } 44 } 45 } 46 47 /* 48 * name: dispatch(SelectionKey key) 49 * description: 調度方法,根據事件綁定的對象開新線程 50 */ 51 private void dispatch(SelectionKey key) { 52 Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程 53 if (r != null) 54 r.run(); 55 } 56 57 }

 

1 // 接受連線請求線程   2     package server;   3        4     import java.io.IOException;   5     import java.nio.channels.SelectionKey;   6     import java.nio.channels.Selector;   7     import java.nio.channels.ServerSocketChannel;   8     import java.nio.channels.SocketChannel;   9       10     public class Acceptor implements Runnable {  11       12         private final ServerSocketChannel ssc;  13         private final Selector selector;  14           15         public Acceptor(Selector selector, ServerSocketChannel ssc) {  16             this.ssc=ssc;  17             this.selector=selector;  18         }  19           20         @Override  21         public void run() {  22             try {  23                 SocketChannel sc= ssc.accept(); // 接受client連線請求  24                 System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");  25                   26                 if(sc!=null) {  27                     sc.configureBlocking(false); // 設置為非阻塞  28                     SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key  29                     selector.wakeup(); // 使一個阻塞住的selector操作立即返回  30                     sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象  31                 }  32                   33             } catch (IOException e) {  34                 // TODO Auto-generated catch block  35                 e.printStackTrace();  36             }  37         }  38       39           40     }
1 // Handler線程   2     package server;   3        4     import java.io.IOException;   5     import java.nio.ByteBuffer;   6     import java.nio.channels.SelectionKey;   7     import java.nio.channels.SocketChannel;   8     import java.util.concurrent.LinkedBlockingQueue;   9     import java.util.concurrent.ThreadPoolExecutor;  10     import java.util.concurrent.TimeUnit;  11       12     public class TCPHandler implements Runnable {  13       14         private final SelectionKey sk;  15         private final SocketChannel sc;  16       17         int state;   18       19         public TCPHandler(SelectionKey sk, SocketChannel sc) {  20             this.sk = sk;  21             this.sc = sc;  22             state = 0; // 初始狀態設定為READING  23         }  24       25         @Override  26         public void run() {  27             try {  28                 if (state == 0)  29                     read(); // 讀取網絡數據  30                 else  31                     send(); // 發送網絡數據  32       33             } catch (IOException e) {  34                 System.out.println("[Warning!] A client has been closed.");  35                 closeChannel();  36             }  37         }  38           39         private void closeChannel() {  40             try {  41                 sk.cancel();  42                 sc.close();  43             } catch (IOException e1) {  44                 e1.printStackTrace();  45             }  46         }  47       48         private synchronized void read() throws IOException {  49             // non-blocking下不可用Readers,因為Readers不支援non-blocking  50             byte[] arr = new byte[1024];  51             ByteBuffer buf = ByteBuffer.wrap(arr);  52               53             int numBytes = sc.read(buf); // 讀取字符串  54             if(numBytes == -1)  55             {  56                 System.out.println("[Warning!] A client has been closed.");  57                 closeChannel();  58                 return;  59             }  60             String str = new String(arr); // 將讀取到的byte內容轉為字符串型態  61             if ((str != null) && !str.equals(" ")) {  62                 process(str); // 邏輯處理  63                 System.out.println(sc.socket().getRemoteSocketAddress().toString()  64                         + " > " + str);  65                 state = 1; // 改變狀態  66                 sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件  67                 sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  68             }  69         }  70       71         private void send() throws IOException  {  72             // get message from message queue  73               74             String str = "Your message has sent to "  75                     + sc.socket().getLocalSocketAddress().toString() + "\r\n";  76             ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()  77       78             while (buf.hasRemaining()) {  79                 sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容  80             }  81               82             state = 0; // 改變狀態  83             sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件  84             sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  85         }  86           87         void process(String str) {  88             // do process(decode, logically process, encode)..  89             // ..  90         }  91     }
1 package server;   2        3     import java.io.IOException;   4        5     public class Main {   6        7            8         public static void main(String[] args) {   9             // TODO Auto-generated method stub  10             try {  11                 TCPReactor reactor = new TCPReactor(1333);  12                 reactor.run();  13             } catch (IOException e) {  14                 // TODO Auto-generated catch block  15                 e.printStackTrace();  16             }  17         }  18       19     }

 

客户端代码

 

1 package main.pkg;   2        3     import java.io.BufferedReader;   4     import java.io.IOException;   5     import java.io.InputStreamReader;   6     import java.io.PrintWriter;   7     import java.net.Socket;   8     import java.net.UnknownHostException;   9       10     public class Client {  11       12         /** 13          * @param args 14          */  15         public static void main(String[] args) {  16             // TODO Auto-generated method stub  17             String hostname=args[0];  18             int port = Integer.parseInt(args[1]);  19             //String hostname="127.0.0.1";  20             //int port=1333;  21               22             System.out.println("Connecting to "+ hostname +":"+port);  23             try {  24                 Socket client = new Socket(hostname, port); // 連接至目的地  25                 System.out.println("Connected to "+ hostname);  26                   27                 PrintWriter out = new PrintWriter(client.getOutputStream());  28                 BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));  29                 BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));  30                 String input;  31                   32                 while((input=stdIn.readLine()) != null) { // 讀取輸入  33                     out.println(input); // 發送輸入的字符串  34                     out.flush(); // 強制將緩衝區內的數據輸出  35                     if(input.equals("exit"))  36                     {  37                         break;  38                     }  39                     System.out.println("server: "+in.readLine());  40                 }  41                 client.close();  42                 System.out.println("client stop.");  43             } catch (UnknownHostException e) {  44                 // TODO Auto-generated catch block  45                 System.err.println("Don't know about host: " + hostname);  46             } catch (IOException e) {  47                 // TODO Auto-generated catch block  48                 System.err.println("Couldn't get I/O for the socket connection");  49             }  50               51         }  52       53     }

 

 

代码解读:

1.创建TCPReactor 类的实例,启动端口监听

2.Acceptor 类只用于处理接受请求的时候,后续的读写跟其无任何关系

3.TCPReactor.run( )一直在进行,后续selectionkey有变动,会监听到,一直执行dispatch方法

 

 

最后提醒一点,从性能来说,单线程的reactor没过多的提升,因为IO和CPU的速度还是严重不匹配

 

 

参考文章:

https://blog.csdn.net/yehjordan/article/details/51012833

转载于:https://www.cnblogs.com/billmiao/p/9872222.html

你可能感兴趣的文章
小组成员及其git链接
查看>>
SQL case when else
查看>>
JAVA学习之路(环境配置,)
查看>>
Task.WaitAll代替WaitHandle.WaitAll
查看>>
MVc Identity登陆锁定
查看>>
cdn连接失败是什么意思_关于CDN的原理、术语和应用场景那些事
查看>>
ultraedit26 运行的是试用模式_免费试用U盘数据恢复工具 – 轻松找回U盘丢失的各种数据!...
查看>>
怎么从转移特性曲线上看dibl_白话IVD中的流体——泵的流量特性与管路阻力特性...
查看>>
奈奎斯特与香农定理_通俗理解奈奎斯特带宽
查看>>
ercharts一个页面能放几个_谷歌优化排名网站内页,一般放置几个关键词?
查看>>
redirect路由配置 vue_Vue 动态生成路由结构
查看>>
maven仲裁机制_Maven 基础知识依赖机制
查看>>
canvas绘制四分之一圆_用canvas画太极图(一步步详解附带源代码)
查看>>
计算上个月的第一天和最后一天_20年的最后一场旅行,21年的第一场旅行
查看>>
抄表 软件_水表远程抄表方案 M-BUS NB-IOT LoRa有什么区别呢
查看>>
一般柱子与柱子的距离_建筑内部布置柱子 间距大概是多少?
查看>>
python比excel好在哪_在数据分析方面,比起python,excel的局限性在哪(python excle 图表)...
查看>>
python 语言爱好者_语言都是相通的,学好一门语言,再学第二门语言就很简单,记录一下我复习c语言的过程。...
查看>>
input层级高 小程序_关于微信小程序textarea层级过高问题解决
查看>>
主力吸筹猛攻指标源码_通达信主力吸筹副图指标公式,通达信主力追踪副图源码...
查看>>