本文共 5773 字,大约阅读时间需要 19 分钟。
import java.io.IOException;import java.net.InetAddress;import java.net.InetSocketAddress;import java.net.UnknownHostException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class NioClient{ /*研究NIO的各种事件的触发条件*/ private SocketChannel clientChannel = null; private Selector selector = null; private InetAddress localAddress = null; private int localPort = 0; private InetAddress remoteAddress = null; private int remotePort = 0; private Thread task = null; public NioClient(InetAddress remoteAddress,int remotePort) { /*初始化本地的地址*/ this.remoteAddress = remoteAddress; this.remotePort = remotePort; try { localAddress = InetAddress.getByName("127.0.0.1"); localPort = 9999; } catch (UnknownHostException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void initChannel() { try { clientChannel = SocketChannel.open(); /*Create and connect*/ clientChannel.configureBlocking(false); /*注册一个connect事件*/ clientChannel.register(selector, SelectionKey.OP_CONNECT); /*连接将要完成时,或者有错误发生时*/ clientChannel.connect(new InetSocketAddress(remoteAddress, remotePort)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void initSelector() { try { selector = Selector.open(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void initOperations() { initSelector(); initChannel(); } private void startTask() { Thread task = new Thread(new ClientTask()); task.start(); } public void launch() { /*初始化*/ initOperations(); /*启动client的任务线程*/ startTask(); } private class ClientTask implements Runnable { SetokSet = null; ByteBuffer sendbuffer = ByteBuffer.allocate(256); ByteBuffer recvbuffer = ByteBuffer.allocate(256); String echomessage = new String("I am client!"); private void sendEchoRequest(SelectionKey key,ByteBuffer buf) { /*将数据加入到发送缓存,bytebuffer*/ sendbuffer.clear(); sendbuffer.put(echomessage.getBytes()); sendbuffer.flip(); /*注册writable事件,selector会关注writable事件,一般情况下channel正常状态都是随时可writable的,因为channel与socket一样,是线程安全的, * write,read操作应该都是原子操作,且全双工。所以仅当有数据要发送时注册writable事件,否则writable会一直被触发。*/ key.interestOps(SelectionKey.OP_WRITE); } private void handleConnectable(SelectionKey key) { System.out.println("Client:Connectable"); /*收到此事件,表示基于此SocketChannel的连接(TCP)要么有错误发生,要么已经完成*/ SocketChannel client = (SocketChannel)key.channel(); try { if( client.isConnectionPending() ) { System.out.println("SocketChannel status:is open="+client.isOpen()); client.finishConnect(); /*如果connect失败,这一步会自动close掉SocketChannel*/ System.out.println("Client:Connection finished successfully!Connect to IP="+((InetSocketAddress)client.getRemoteAddress()).getHostName()+":port="+((InetSocketAddress)client.getRemoteAddress()).getPort()); sendEchoRequest(key,sendbuffer); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.out.println("SocketChannel status:is open="+client.isOpen()+",connection failed!"); /*连接失败,之前的Channel已经被close,这里重新创建一个Channel,重新进行连接。*/ initChannel(); } } private void handleWritable(SelectionKey key) { try { if( sendbuffer.remaining() > 0 ) { System.out.println("Client:send to server. msg="+new String(sendbuffer.array(),0,sendbuffer.remaining())); ((SocketChannel)key.channel()).write(sendbuffer); /*write会改变buffer的position的位置*/ key.interestOps(SelectionKey.OP_READ); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); try { key.channel().close(); key.cancel(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } private void handleReadable(SelectionKey key) { try { recvbuffer.clear(); ((SocketChannel)key.channel()).read(recvbuffer); recvbuffer.flip(); if( recvbuffer.remaining() > 0 ) { System.out.println("Client:recv echo msg from server="+new String(recvbuffer.array(),0,recvbuffer.remaining())); } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); try { key.channel().close(); key.cancel(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } private void handleSelectionKey(SelectionKey key) { if( key.isConnectable() ) { handleConnectable(key); } else if( key.isReadable() ) { System.out.println("Client:Readable"); handleReadable(key); } else if( key.isWritable() ) { System.out.println("Client:Writable"); handleWritable(key); } else { System.out.println("Client:Not handle event="+key.readyOps()); } } @Override public void run() { // TODO Auto-generated method stub while(!Thread.currentThread().isInterrupted()) { try { int num = selector.select(); /*select will block until something happen*/ if( num > 0 ) { okSet = selector.selectedKeys(); /*为了简单起见,且只有一个SocketChannel,这里对SelectionKey使用一个循环来处理。如果有多个SocketChannel注册到此 * Selector,则可能会有多个SocketChannel并发产生事件,这里需要用线程池。*/ Iterator ite = okSet.iterator(); while(ite.hasNext()) { handleSelectionKey((SelectionKey)ite.next()); ite.remove(); /*这一步千万不能少,否则select将暂时不响应此channel的这个事件。猜想select会判断某个channel的某个事件在 * selectedKeySet中是否有事件正在处理(未删除),没有则继续响应,否则暂时屏蔽,直到其被删除。*/ } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }}/*1、client处理selectedKeys用的是同步方式,不分发线程,因此不会出现同一个channel的readable或writable事件被分发给多个线程的情况; * 2、*/
转载地址:http://bthii.baihongyu.com/