多多色-多人伦交性欧美在线观看-多人伦精品一区二区三区视频-多色视频-免费黄色视屏网站-免费黄色在线

國內(nèi)最全I(xiàn)T社區(qū)平臺(tái) 聯(lián)系我們 | 收藏本站
阿里云優(yōu)惠2
您當(dāng)前位置:首頁 > php框架 > 框架設(shè)計(jì) > Java之NIO(二)selector socketChannel

Java之NIO(二)selector socketChannel

來源:程序員人生   發(fā)布時(shí)間:2015-06-19 08:25:49 閱讀次數(shù):6023次

上篇文章對(duì)NIO進(jìn)行了簡介,對(duì)Channel和Buffer接口的使用進(jìn)行了說明,并舉了1個(gè)簡單的例子來講明其使用方法。

本篇?jiǎng)t重點(diǎn)說明selector,Selector(選擇器)是Java NIO中能夠檢測(cè)1到多個(gè)NIO通道,并能夠知曉通道是不是為諸如讀寫事件做好準(zhǔn)備的組件。這樣,1個(gè)單獨(dú)的線程可以管理多個(gè)channel,從而管理多個(gè)網(wǎng)絡(luò)連接。

與selector聯(lián)系緊密的是ServerSocketChannel和SocketChannel,他們的使用與上篇文章描寫的FileChannel的使用方法類似,然后與ServerSocket和Socket也有1些聯(lián)系。

本篇首先簡單的進(jìn)selector進(jìn)行說明,然后1個(gè)簡單的示例程序,來演示即時(shí)通訊。

Selector

使用傳統(tǒng)IO進(jìn)行網(wǎng)絡(luò)編程,以下圖所示:


每個(gè)到服務(wù)真?zhèn)€連接,都需要1個(gè)單獨(dú)的線程(或線程池)來處理其對(duì)應(yīng)的socket,當(dāng)連接數(shù)多的時(shí)候,對(duì)服務(wù)真?zhèn)€壓力極大。并使用socket的getInputStream。Read方法來不斷的輪訓(xùn)每一個(gè)socket,效力可想而知。

而selector則可以在同1個(gè)線程中監(jiān)聽多個(gè)channel的狀態(tài),當(dāng)某個(gè)channel有selector感興趣的事情發(fā)現(xiàn),selector則被激活。即不會(huì)主動(dòng)去輪詢。以下圖所示:

 

Selector使用以下示意:

public static void main(String[] args) throws IOException { Selector selector = Selector.open();//聲明selector ServerSocketChannel sc = ServerSocketChannel.open(); sc.configureBlocking(false);//必須設(shè)置為異步 sc.socket().bind(new InetSocketAddress(8081));//綁定端口 //把channel 注冊(cè)到 selector上 sc.register(selector, SelectionKey.OP_ACCEPT|SelectionKey.OP_CONNECT|SelectionKey.OP_READ|SelectionKey.OP_WRITE); while(true){ selector.select();//阻塞,直到注冊(cè)的channel上某個(gè)感興趣的事情產(chǎn)生 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if(key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } keyIterator.remove(); } } }

 

極簡即時(shí)通訊

本例子是是1個(gè)極其簡單的例子,很多地方都不完善,但是例子可以很好的說明selector的使用方法。

本例子包括服務(wù)端和客戶端兩個(gè)部份,其中服務(wù)端采取兩個(gè)selector,用來建立連接和數(shù)據(jù)的讀寫。兩個(gè)selector在兩個(gè)線程中。

服務(wù)端

/** * 簡單的即時(shí)通訊服務(wù)端,采取建立連接 selector和數(shù)據(jù) selector分離。很不完善 * */ public class ServerSocketChannelTest { private static final int SERVER_PORT = 8081; private ServerSocketChannel server; private volatile Boolean isStop = false; //負(fù)責(zé)建立連接的selector private Selector conn_Sel; //負(fù)責(zé)數(shù)據(jù)讀寫的selector private Selector read_Sel; // private ExecutorService sendService = Executors.newFixedThreadPool(3); //鎖,用來在建立連接后,喚醒read_Sel時(shí)使用的同步 private Object lock = new Object(); //注冊(cè)的用戶 private Map<String, ClientInfo> clents = new HashMap<String, ClientInfo>(); /** * 初始化,綁定端口 */ public void init() throws IOException { //創(chuàng)建ServerSocketChannel server = ServerSocketChannel.open(); //綁定端口 server.socket().bind(new InetSocketAddress(SERVER_PORT)); server.configureBlocking(false); //定義兩個(gè)selector conn_Sel = Selector.open(); read_Sel = Selector.open(); //把channel注冊(cè)到selector上,第2個(gè)參數(shù)為興趣的事件 server.register(conn_Sel, SelectionKey.OP_ACCEPT); } // 負(fù)責(zé)建立連接。 private void beginListen() { System.out.println("--------開始監(jiān)聽----------"); while (!isStop) { try { conn_Sel.select(); } catch (IOException e) { e.printStackTrace(); continue; } Iterator<SelectionKey> it = conn_Sel.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey con = it.next(); it.remove(); if (con.isAcceptable()) { try { SocketChannel newConn = ((ServerSocketChannel) con .channel()).accept(); handdleNewInConn(newConn); } catch (IOException e) { e.printStackTrace(); continue; } } else if (con.isReadable()) {//廢代碼,履行不到。 try { handleData((SocketChannel) con.channel()); } catch (IOException e) { e.printStackTrace(); } } } } } /** * 負(fù)責(zé)接收數(shù)據(jù) */ private void beginReceive(){ System.out.println("---------begin receiver data-------"); while (true) { synchronized (lock) { } try { read_Sel.select(); } catch (IOException e) { e.printStackTrace(); continue; } Iterator<SelectionKey> it = read_Sel.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey con = it.next(); it.remove(); if (con.isReadable()) { try { handleData((SocketChannel) con.channel()); } catch (IOException e) { e.printStackTrace(); } } } } } private void handdleNewInConn(SocketChannel newConn) throws IOException { newConn.configureBlocking(false); //這里必須先喚醒read_Sel,然后加鎖,避免讀寫線程的中select方法再次鎖定。 synchronized (lock) { read_Sel.wakeup(); newConn.register(read_Sel, SelectionKey.OP_READ); } //newConn.register(conn_Sel, SelectionKey.OP_READ); } private void handleData(final SocketChannel data) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(512); try { int size= data.read(buffer); if (size==⑴) { System.out.println("-------連接斷開-----"); //這里暫時(shí)不處理,這里可以移除已注冊(cè)的客戶端 } } catch (IOException e) { e.printStackTrace(); return; } buffer.flip(); byte[] msgByte = new byte[buffer.limit()]; buffer.get(msgByte); Message msg = Message.getMsg(new String(msgByte)); //這里讀完數(shù)據(jù)其實(shí)已可以另開線程了下1步的處理,理想情況下,根據(jù)不同的消息類型,建立不同的隊(duì)列,把待發(fā)送的消息放進(jìn)隊(duì)列 //固然也能夠持久化。如果在數(shù)據(jù)沒有讀取前,另開線程的話,讀寫線程中 read_Sel.select(),會(huì)立刻返回。可以把 if (msg.getType().equals("0")) {// 注冊(cè) ClientInfo info = new ClientInfo(msg.getFrom(), data); clents.put(info.getClentID(), info); System.out.println(msg.getFrom() + "注冊(cè)成功"); } else {// 轉(zhuǎn)發(fā) System.out.println("收到"+msg.getFrom()+"發(fā)給"+msg.getTo()+"的消息"); ClientInfo to = clents.get(msg.getTo()); buffer.rewind(); if (to != null) { SocketChannel sendChannel = to.getChannel(); try { while (buffer.hasRemaining()) { sendChannel.write(buffer); } } catch (Exception e) { } finally { buffer.clear(); } } } } public static void main(String[] args) throws IOException { final ServerSocketChannelTest a = new ServerSocketChannelTest(); a.init(); new Thread("receive..."){ public void run() { a.beginReceive(); }; }.start(); a.beginListen(); } }


客戶端


/** * new 次對(duì)象,然后調(diào)用start方法,其中self 是自己id * * to 是接收人id * */ public class Client { /** * 自己的ID */ private String self; /** * 接收人ID */ private String to; //通道管理器 private Selector selector; private ByteBuffer writeBuffer = ByteBuffer.allocate(512); private SocketChannel channel; private Object lock = new Object(); private volatile boolean isInit = false; public Client(String self, String to) { super(); this.self = self; this.to = to; } /** * 取得1個(gè)Socket通道,并對(duì)該通道做1些初始化的工作 * @param ip 連接的服務(wù)器的ip * @param port 連接的服務(wù)器的端口號(hào) * @throws IOException */ public void initClient(String ip,int port) throws IOException { // 取得1個(gè)Socket通道 channel = SocketChannel.open(); // 設(shè)置通道為非阻塞 channel.configureBlocking(false); // 取得1個(gè)通道管理器 this.selector = Selector.open(); // 客戶端連接服務(wù)器,其實(shí)方法履行并沒有實(shí)現(xiàn)連接,需要在listen()方法中調(diào) //用channel.finishConnect();才能完成連接 channel.connect(new InetSocketAddress(ip,port)); //將通道管理器和該通道綁定,并為該通道注冊(cè)SelectionKey.OP_CONNECT事件。 channel.register(selector, SelectionKey.OP_CONNECT); } /** * 采取輪詢的方式監(jiān)聽selector上是不是有需要處理的事件,如果有,則進(jìn)行處理 * @throws IOException */ @SuppressWarnings("unchecked") public void listen() throws IOException { // 輪詢?cè)L問selector while (true) { synchronized (lock) { } selector.select(); // 取得selector當(dāng)選中的項(xiàng)的迭代器 Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = ite.next(); // 刪除已選的key,以防重復(fù)處理 ite.remove(); // 連接事件產(chǎn)生 if (key.isConnectable()) { SocketChannel channel = (SocketChannel) key .channel(); // 如果正在連接,則完成連接 if(channel.isConnectionPending()){ channel.finishConnect(); } // 設(shè)置成非阻塞 channel.configureBlocking(false); //在和服務(wù)端連接成功以后,為了可以接收到服務(wù)真?zhèn)€信息,需要給通道設(shè)置讀的權(quán)限。 channel.register(this.selector, SelectionKey.OP_READ); isInit = true; // 取得了可讀的事件 } else if (key.isReadable()) { read(key); } } } } /** * 處理讀取服務(wù)端發(fā)來的信息的事件 * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException{ SocketChannel data = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(512) ; try { data.read(buffer ); } catch (IOException e) { e.printStackTrace(); data.close(); return; } buffer.flip(); byte[] msgByte = new byte[buffer.limit()]; buffer.get(msgByte); Message msg = Message.getMsg(new String(msgByte)); System.out.println("---收到消息--"+msg+" 來自 "+msg.getFrom()); } private void sendMsg(String content){ writeBuffer.put(content.getBytes()); writeBuffer.flip(); try { while (writeBuffer.hasRemaining()) { channel.write(writeBuffer); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } writeBuffer.clear(); } /** * 啟動(dòng)客戶端測(cè)試 * @throws IOException */ public void start() throws IOException { initClient("localhost",8081); new Thread("reading"){ public void run() { try { listen(); } catch (IOException e) { e.printStackTrace(); } }; }.start(); int time3 = 0; while(!isInit&&time3<3){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } time3 ++; } System.out.println("--------開始注冊(cè)------"); Message re = new Message("", self, ""); sendMsg(re.toString()); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-----注冊(cè)成功----"); String content =""; System.out.println("---- 請(qǐng)輸入要發(fā)送的消息,按回車發(fā)送,輸入 123 退出----------"); Scanner s = new Scanner(System.in); while (!content.equals("123")&&s.hasNext()) { content = s.next(); Message msg = new Message(content, self, to); msg.setType("1"); sendMsg(msg.toString()); if (content.equals("123")) { break; } System.out.println("---發(fā)送成功---"); } channel.close(); } }


客戶端測(cè)試

public class TestClient1 { public static void main(String[] args) throws IOException { Client c1 =new Client("1", "2"); c1.start(); } } public class TestClient2 { public static void main(String[] args) throws IOException { Client c2 =new Client("2", "1"); c2.start(); } }

結(jié)束

本文的例子極其簡單,但是都經(jīng)過測(cè)試。在編碼的進(jìn)程中,遇到的問題主要有兩點(diǎn):

1.     channel.register()方法阻塞

2.     使用線程池遇到問題。本文最后在服務(wù)真?zhèn)€讀寫線程中,沒有使用線程池,緣由注釋說的比較明白,也說明了使用線程池的1種假想。

 

另外在本文編碼進(jìn)程中,遇到了1些問題,去網(wǎng)上尋求答案,遇到了1些不錯(cuò)的文章,本文某些部份由參考。

selector的講授,官方文檔翻譯 http://ifeve.com/selectors/

NIO就緒的OP_write http://blog.csdn.net/zhouhl_cn/article/details/6582435

此文不錯(cuò):http://blog.csdn.net/jjzhk/article/details/39553613

http://www.2cto.com/kf/201312/267592.html

 

另外還有兩個(gè)反面教材:

http://www.oschina.net/code/snippet_860673_22507毛病很大

http://www.oschina.net/code/snippet_246601_22883代碼本身是正確的,但底下的評(píng)論人沒有好好看書。

 

 

生活不易,碼農(nóng)辛苦
如果您覺得本網(wǎng)站對(duì)您的學(xué)習(xí)有所幫助,可以手機(jī)掃描二維碼進(jìn)行捐贈(zèng)
程序員人生
------分隔線----------------------------
分享到:
------分隔線----------------------------
關(guān)閉
程序員人生
主站蜘蛛池模板: 中文字幕人成不卡一区 | 欧美精品综合一区二区三区 | 欧美性xxxx | 美女啪啪网站 | 伊人久久网站 | 国产精品欧美亚洲区 | 一本视频在线 | 国产高清一区二区三区视频 | 精品国产成a人在线观看 | 成人综合在线视频 | 国产高清视频免费在线观看 | 视频一区精品 | 中文字幕曰韩一区二区不卡 | 好看欧美视频高清va | 精品国产三级v | 日本成人一区 | 激情视频在线观看网站 | 欧美日本一区视频免费 | 国产丝袜福利视频在线播放 | 老王午夜69精品影院 | 美女免费网站在线视频 | 久久视频在线看 | 国产v在线播放 | 成人欧美一区二区三区视频不卡 | 依人在线免费视频 | www.大香| 亚洲精品久久片久久 | 欧美一级做一a做片性视频 欧美一级做一级爱a做片性 | 色综合久久久久久久久五月 | 国产精品久久久久无码av | 成人天堂在线 | 欧美添下面视频免费观看 | 国产成人精品视频频 | 亚洲精品一区二区三区四区 | 特级淫片aaaaa片毛片 | 91日韩| 亚州色片| 欧美freesex黑人又粗又 | 最新国产福利片在线观看 | 精品国产爱久久 | 免费一级毛片免费播放 |