`

Netty 简单样例分析

 
阅读更多

Netty 是JBoss旗下的io传输的框架,他利用java里面的nio来实现高效,稳定的io传输。

作为io传输,就会有client和server,下面我们看看用netty怎样写client和server

Client:
需要做的事情:
1.配置client启动类
  ClientBootstrap bootstrap = new ClientBootstrap(..)

2.根据不同的协议或者模式为client启动类设置pipelineFactory。
这里telnet pipline Factory 在netty中已经存在,所有直接用
  bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
 也可以自己定义
 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(
                        new DiscardClientHandler(firstMessageSize));
            }
        });       
 这里DiscardClientHandler 就是自己定义的handler,他需要
 public class DiscardServerHandler extends SimpleChannelUpstreamHandler
 继承SimpleChannelUpstreamHandler  来实现自己的handler。这里DiscardClientHandler
 是处理自己的client端的channel,他的
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        // Server is supposed to send nothing.  Therefore, do nothing.
    }
  可以看到Discard client不需要接受任何信息
 
3.连接server
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

这里解释一下channelFuture:

在Netty中所有的io操作都是异步的,这也就是意味任何io访问,那么就立即返回处理,并且不能确保
返回的数据全部完成。因此就出现了channelFuture,channelFuture在传输数据时候包括数据和状态两个
部分。他只有Uncompleted和Completed

 
 

                                      +---------------------------+
                                      | Completed successfully    |
                                      +---------------------------+
                                 +---->      isDone() = true      |
 +--------------------------+    |    |   isSuccess() = true      |
 |        Uncompleted       |    |    +===========================+
 +--------------------------+    |    | Completed with failure    |
 |      isDone() = false    |    |    +---------------------------+
 |   isSuccess() = false    |----+---->   isDone() = true         |
 | isCancelled() = false    |    |    | getCause() = non-null     |
 |    getCause() = null     |    |    +===========================+
 +--------------------------+    |    | Completed by cancellation |
                                 |    +---------------------------+
                                 +---->      isDone() = true      |
                                      | isCancelled() = true      |
                                      +---------------------------+
 

 
 既然netty io是异步的,那么如何知道channel传送完成有两种方式,一种添加监听器
 addListener(ChannelFutureListener) 还有一种直接调用await()方法,这两种方式
 有下面的区别
 监听器:是以事件模式的,因此代码就需要用事件模式的样式去写,相当复杂,但他是non-blocking模式的
 性能方面要比await方法好,而且不会产生死锁情况
 
 await(): 直接方法调用,使用简单,但是他是blocking模式,性能方面要弱而且会产生死锁情况
 
 不要在ChannelHandler 里面调用await(),这是因为通常在channelHandler里的event method是被i/o线程调用的
 (除非ChannelPipeline里面有个ExecutionHandler),那么如果这个时候用await就容易产生死锁。
 
 

// BAD - NEVER DO THIS
 @Override
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     if (e.getMessage() instanceof GoodByeMessage) {
         ChannelFuture future = e.getChannel().close();
         future.awaitUninterruptibly();
         // Perform post-closure operation
         // ...
     }
 }

 // GOOD
 @Override
 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
     if (e.getMessage() instanceof GoodByeMessage) {
         ChannelFuture future = e.getChannel().close();
         future.addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture future) {
                 // Perform post-closure operation
                 // ...
             }
         });
     }
 }

 
 虽然await调用比较危险,但是你确保不是在一个i/o 线程中调用该方法,毕竟await方法还是很简洁方便的,如果
 调用该方法是在一个i/o 线程,那么就会抛出 IllegalStateException
 
 await的timeout和i/o timeout区别
 需要注意的是这两个timeout是不一样的, #await(long),#await(long, TimeUnit), #awaitUninterruptibly(long),
 #awaitUninterruptibly(long, TimeUnit) 这里面的timeout也i/o timeout 没有任何关系,如果io timeout,那么
 channelFuture 将被标记为completed with failure,而await的timeout 与future完全没有关系,只是await动作的
 timeout。
 

 // BAD - NEVER DO THIS
 ClientBootstrap b = ...;
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly(10, TimeUnit.SECONDS);
 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     // You might get a NullPointerException here because the future
     // might not be completed yet.
     f.getCause().printStackTrace();
 } else {
     // Connection established successfully
 }

 // GOOD
 ClientBootstrap b = ...;
 // Configure the connect timeout option.
 b.setOption("connectTimeoutMillis", 10000);
 ChannelFuture f = b.connect(...);
 f.awaitUninterruptibly();

 // Now we are sure the future is completed.
 assert f.isDone();

 if (f.isCancelled()) {
     // Connection attempt cancelled by user
 } else if (!f.isSuccess()) {
     f.getCause().printStackTrace();
 } else {
     // Connection established successfully
 }

 
4.等待或监听数据全部完成
如: future.getChannel().getCloseFuture().awaitUninterruptibly();

5.释放连接等资源
 bootstrap.releaseExternalResources();
 
Server:
1.配置server

ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));
                       
2.设置pipeFactory
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new EchoServerHandler());
            }
        });
    或者
   bootstrap.setPipelineFactory(new HttpServerPipelineFactory());
  
3.绑定sever端端口
bootstrap.bind(new InetSocketAddress(8080));

 

======================netty个人理解分割线======================================

 

netty是个非常好用的nio框架,提供了安全、快捷的tcp/ip、udp/ip通讯方式。网上也有好多文章,netty3.6的官方文档是这个http://netty.io/3.6/guide/,写得很好。

下面就读着这篇文档,做下笔记:

先看两段代码:

一:DiscardServer,这是个服务端的代码,是服务端的启动程序(创建channel,指定通道处理程序)

 

Java代码 
  1. public class DiscardServer {  
  2.   
  3.     private final int port;  
  4.   
  5.     public DiscardServer(int port) {  
  6.         this.port = port;  
  7.     }  
  8.   
  9.     public void run() {  
  10.         // Configure the server.  
  11.         ServerBootstrap bootstrap = new ServerBootstrap(  
  12.                 new NioServerSocketChannelFactory(  
  13.                         Executors.newCachedThreadPool(),  
  14.                         Executors.newCachedThreadPool()));  
  15.   
  16.         // Set up the pipeline factory.  
  17.         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
  18.             public ChannelPipeline getPipeline() throws Exception {  
  19.                 return Channels.pipeline(new DiscardServerHandler());  
  20.             }  
  21.         });  
  22.   
  23.         // Bind and start to accept incoming connections.  
  24.         bootstrap.bind(new InetSocketAddress(port));  
  25.     }  
  26.   
  27.     public static void main(String[] args) throws Exception {  
  28.         int port;  
  29.         if (args.length > 0) {  
  30.             port = Integer.parseInt(args[0]);  
  31.         } else {  
  32.             port = 8080;  
  33.         }  
  34.         new DiscardServer(port).run();  
  35.     }  
  36. }  

 

 

既然是通信,那当然有通道,Channel就是通道,通道是谁创建的呢,当然是ChannelFactory

代码中的NioServerSocketChannelFactory就是服务端的NIO的Socket的通道工厂,名字起得很直白吧。

创建这个工厂需要两个线程池——Executor(Executors.newCachedThreadPool()返回的ExecutorService,ExecutorService继承自Executor),这两个Executor一个用来执行boss线程,一个用来执行I/O工人线程。

工厂设置好之后, 就该创建Channel了,创建Channel是个复杂的过程,netty提供了一个帮助类ServerBootstrap,使用它可以便捷的执行这个创建过程。

有了通道后,当然会有相关的通讯处理,这些处理就是ChannelHandler(DiscardServerHandler就是自定义的handler,代码见二:DiscardServerHandler),ChannelHandler使用ChannelPipeline来设置(添加、移除、获取),ChannelPipelineFactory当然是用来创建ChannelPipeline的了。

最后,绑定个端口号,开始启动服务吧

二:DiscardServerHandler,通道处理程序

 

Java代码 
  1. public class DiscardServerHandler extends SimpleChannelUpstreamHandler {  
  2.   
  3.     private static final Logger logger = Logger.getLogger(  
  4.             DiscardServerHandler.class.getName());  
  5.   
  6.     private long transferredBytes;  
  7.   
  8.     public long getTransferredBytes() {  
  9.         return transferredBytes;  
  10.     }  
  11.   
  12.     @Override  
  13.     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {  
  14.         if (e instanceof ChannelStateEvent) {  
  15.             logger.info(e.toString());  
  16.         }  
  17.   
  18.         // Let SimpleChannelHandler call actual event handler methods below.  
  19.         super.handleUpstream(ctx, e);  
  20.     }  
  21.   
  22.     @Override  
  23.     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
  24.         // Discard received data silently by doing nothing.  
  25.        transferredBytes += ((ChannelBuffer) e.getMessage()).readableBytes();  
  26.     }  
  27.   
  28.     @Override  
  29.     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
  30.         // Close the connection when an exception is raised.  
  31.         logger.log(  
  32.                 Level.WARNING,  
  33.                 "Unexpected exception from downstream.",  
  34.                 e.getCause());  
  35.         e.getChannel().close();  
  36.     }  
  37. }  

 这个通道处理程序继承自SimpleChannelUpstreamHandler,最终实现了ChannelHandler,netty是基于事件驱动的,所以DiscardServerHandler实现的方法中都会有相关的事件

 

messageReceived:当消息到达时执行

exceptionCaught:当发生异常时执行

messageReceived有两个参数,一个是ChannelHandlerContext,一个是MessageEvent,messageEvent就是消息事件,从消息事件中可以取出消息(getMessage())

服务端的代码搞定了,下面上客户端的代码,客户端的代码跟服务端的大体一致:

一:DiscardClient

Java代码 
  1. public class DiscardClient {  
  2.   
  3.     private final String host;  
  4.     private final int port;  
  5.     private final int firstMessageSize;  
  6.   
  7.     public DiscardClient(String host, int port, int firstMessageSize) {  
  8.         this.host = host;  
  9.         this.port = port;  
  10.         this.firstMessageSize = firstMessageSize;  
  11.     }  
  12.   
  13.     public void run() {  
  14.         // Configure the client.  
  15.         ClientBootstrap bootstrap = new ClientBootstrap(  
  16.                 new NioClientSocketChannelFactory(  
  17.                         Executors.newCachedThreadPool(),  
  18.                         Executors.newCachedThreadPool()));  
  19.   
  20.         // Set up the pipeline factory.  
  21.         bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  
  22.             public ChannelPipeline getPipeline() throws Exception {  
  23.                 return Channels.pipeline(  
  24.                         new DiscardClientHandler(firstMessageSize));  
  25.             }  
  26.         });  
  27.   
  28.         // Start the connection attempt.  
  29.         ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));  
  30.   
  31.         // Wait until the connection is closed or the connection attempt fails.  
  32.         future.getChannel().getCloseFuture().awaitUninterruptibly();  
  33.   
  34.         // Shut down thread pools to exit.  
  35.         bootstrap.releaseExternalResources();  
  36.     }  
  37.   
  38.     public static void main(String[] args) throws Exception {  
  39.         // Print usage if no argument is specified.  
  40.         if (args.length < 2 || args.length > 3) {  
  41.             System.err.println(  
  42.                     "Usage: " + DiscardClient.class.getSimpleName() +  
  43.                     " <host> <port> [<first message size>]");  
  44.             return;  
  45.         }  
  46.   
  47.         // Parse options.  
  48.         final String host = args[0];  
  49.         final int port = Integer.parseInt(args[1]);  
  50.         final int firstMessageSize;  
  51.         if (args.length == 3) {  
  52.             firstMessageSize = Integer.parseInt(args[2]);  
  53.         } else {  
  54.             firstMessageSize = 256;  
  55.         }  
  56.   
  57.         new DiscardClient(host, port, firstMessageSize).run();  
  58.     }  
  59. }  

 二:

Java代码 
  1. public class DiscardClientHandler extends SimpleChannelUpstreamHandler {  
  2.   
  3.     private static final Logger logger = Logger.getLogger(  
  4.             DiscardClientHandler.class.getName());  
  5.   
  6.     private long transferredBytes;  
  7.     private final byte[] content;  
  8.   
  9.     public DiscardClientHandler(int messageSize) {  
  10.         if (messageSize <= 0) {  
  11.             throw new IllegalArgumentException(  
  12.                     "messageSize: " + messageSize);  
  13.         }  
  14.         content = new byte[messageSize];  
  15.     }  
  16.   
  17.     public long getTransferredBytes() {  
  18.         return transferredBytes;  
  19.     }  
  20.   
  21.     @Override  
  22.     public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {  
  23.         if (e instanceof ChannelStateEvent) {  
  24.             if (((ChannelStateEvent) e).getState() != ChannelState.INTEREST_OPS) {  
  25.                 logger.info(e.toString());  
  26.             }  
  27.         }  
  28.   
  29.         // Let SimpleChannelHandler call actual event handler methods below.  
  30.         super.handleUpstream(ctx, e);  
  31.     }  
  32.   
  33.     @Override  
  34.     public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {  
  35.         // Send the initial messages.  
  36.         generateTraffic(e);  
  37.     }  
  38.   
  39.     @Override  
  40.     public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {  
  41.         // Keep sending messages whenever the current socket buffer has room.  
  42.         generateTraffic(e);  
  43.     }  
  44.   
  45.     @Override  
  46.     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {  
  47.         // Server is supposed to send nothing.  Therefore, do nothing.  
  48.     }  
  49.   
  50.     @Override  
  51.     public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) {  
  52.         transferredBytes += e.getWrittenAmount();  
  53.     }  
  54.   
  55.     @Override  
  56.     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {  
  57.         // Close the connection when an exception is raised.  
  58.         logger.log(  
  59.                 Level.WARNING,  
  60.                 "Unexpected exception from downstream.",  
  61.                 e.getCause());  
  62.         e.getChannel().close();  
  63.     }  
  64.   
  65.     private void generateTraffic(ChannelStateEvent e) {  
  66.         // Keep generating traffic until the channel is unwritable.  
  67.         // A channel becomes unwritable when its internal buffer is full.  
  68.         // If you keep writing messages ignoring this property,  
  69.         // you will end up with an OutOfMemoryError.  
  70.         Channel channel = e.getChannel();  
  71.         while (channel.isWritable()) {  
  72.             ChannelBuffer m = nextMessage();  
  73.             if (m == null) {  
  74.                 break;  
  75.             }  
  76.             channel.write(m);  
  77.         }  
  78.     }  
  79.   
  80.     private ChannelBuffer nextMessage() {  
  81.         return ChannelBuffers.wrappedBuffer(content);  
  82.     }  
  83. }  

 

 

分享到:
评论
1 楼 aerfaguihua 2016-02-24  
请问楼主 netty编写的客户端能否同步监听接收服务器传来的数据?

相关推荐

Global site tag (gtag.js) - Google Analytics