`

运用spring注解实现netty服务器端udp应用程序

 
阅读更多

netty是JBOSS针对网络开发的一套应用框架,它也是在NIO的基础上发展起来的。netty基于异步的事件驱动,具有高性能、高扩展性等特性,它提供了统一的底层协议接口,使得开发者从底层的网络协议(比如 TCP/IP、UDP)中解脱出来。就使用来说,开发者只要参考 Netty提供的若干例子和它的指南文档,就可以放手开发基于Netty的服务端程序了。

   netty有几个比较重要的概念,在此,仅做介绍,详细可以参考netty文档或源码。

   1). channelBuffer: Netty 的一个基本数据结构,这个数据结构存储了一个字节序列。 类似于 NIO ByteBuffer,但操作起来比ByteBuffer更为简单方便。

   2). ChannelFactory 是一个创建和管理 Channel 通道及其相关资源的工厂接口,它处理所有的 I/O 请求并产生相应的 I/O ChannelEvent 通道事件。

  3).ChannelHandler是所有I/O ChannelEvent事件的响应类,所有消息,包括netty通信异常事件,均在该响应类内处理。

  4).*** Bootstrap 是一个设置服务的帮助类。你甚至可以在这个服务中直接设置一个 Channel 通道。

  现在以实现一个UDP协议下的服务器应用程序为例,演示netty通过spring注解开发服务器端。(在此以maven工具管理项目开发)

  首先,是导入关联jar的POM文件:

 

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  3.   <modelVersion>4.0.0</modelVersion>  
  4.   <groupId>com.byd.example</groupId>  
  5.   <artifactId>nettyTest</artifactId>  
  6.   <version>0.0.1-SNAPSHOT</version>  
  7.   <packaging>jar</packaging>  
  8.   <name>nettyTest</name>  
  9.   <url>http://maven.apache.org</url>  
  10.   <properties>  
  11.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  12.   </properties>  
  13.   <dependencies>  
  14.     <dependency>  
  15.       <groupId>junit</groupId>  
  16.       <artifactId>junit</artifactId>  
  17.       <version>3.8.1</version>  
  18.       <scope>test</scope>  
  19.     </dependency>  
  20.     <dependency>  
  21.      <groupId>slf4j</groupId>  
  22.      <artifactId>slf4j-jdk14</artifactId>     
  23.      <version>1.6.1</version>  
  24.     </dependency>  
  25.     <dependency>  
  26.      <groupId>log4j</groupId>  
  27.      <artifactId>log4j</artifactId>    
  28.      <version>1.2.9</version>   
  29.     </dependency>  
  30.     <dependency>  
  31.      <groupId>org.jboss.netty</groupId>     
  32.      <artifactId>netty</artifactId>  
  33.      <version>3.2.0.BETA1</version>  
  34.     </dependency>  
  35.     <dependency>  
  36.      <groupId>org.springframework</groupId>   
  37.      <artifactId>spring-context</artifactId>    
  38.      <version>3.0.2.RELEASE</version>  
  39.     </dependency>  
  40.     <dependency>  
  41.      <groupId>org.springframework</groupId>   
  42.      <artifactId>spring-expression</artifactId>    
  43.      <version>3.0.2.RELEASE</version>  
  44.     </dependency>  
  45.     <dependency>  
  46.      <groupId>org.springframework</groupId>   
  47.      <artifactId>spring-tx</artifactId>    
  48.      <version>3.0.2.RELEASE</version>  
  49.     </dependency>  
  50.     <dependency>  
  51.      <groupId>org.springframework</groupId>   
  52.      <artifactId>spring-context</artifactId>    
  53.      <version>3.0.2.RELEASE</version>  
  54.     </dependency>  
  55.   </dependencies>  
  56.   <repositories>    
  57.      <repository>    
  58.        <id>repository.jboss.org</id>    
  59.           <url>http://repository.jboss.org/nexus/content/groups/public/</url>    
  60.          <snapshots>    
  61.            <enabled>false</enabled>    
  62.          </snapshots>    
  63.        </repository>    
  64.     </repositories>    
  65.     <build>  
  66.      
  67.     </build>  
  68. </project>  

接下来是应用程序代码,在本人机器上可以运行并正常工作,如有需要可以试试。

I、Server接口:

 

  1. package com.byd.example.netty;  
  2. public interface IServer   
  3. {  
  4.     /** 
  5.      * 启动服务器 
  6.      */  
  7.     public void start();  
  8.     /** 
  9.      * 重启程序 
  10.      */  
  11.     public void restart();  
  12.       
  13.     /** 
  14.      * 停止程序运行 
  15.      */  
  16.     public void stop();  
  17. }   

 II、ChannelHandler扩张类(继承SimpleChannelHandler):

 

  1. package com.byd.example.netty;  
  2. import java.util.Random;  
  3. import org.apache.log4j.Logger;  
  4. import org.jboss.netty.buffer.ChannelBuffer;  
  5. import org.jboss.netty.buffer.DynamicChannelBuffer;  
  6. import org.jboss.netty.channel.ChannelFutureListener;  
  7. import org.jboss.netty.channel.ChannelHandlerContext;  
  8. import org.jboss.netty.channel.ExceptionEvent;  
  9. import org.jboss.netty.channel.MessageEvent;  
  10. import org.jboss.netty.channel.SimpleChannelHandler;  
  11. import org.springframework.stereotype.Component;  
  12. @Component("receiverHandler")  
  13. public class ReceiverHandler extends SimpleChannelHandler  
  14. {  
  15.     private static final Logger     logger=Logger.getLogger(ReceiverHandler.class.getName());  
  16.       
  17.     @Override  
  18.     public void messageReceived(ChannelHandlerContext ctx,MessageEvent e) throws Exception  
  19.     {  
  20.         ChannelBuffer buffer=(ChannelBuffer)e.getMessage();  
  21.         byte[] recByte=buffer.copy().toByteBuffer().array();  
  22.         String recMsg=new String(recByte);  
  23.         logger.info("server received:"+recMsg.trim());  
  24.         Random random=new Random();  
  25.         int backWord=random.nextInt(10000);  
  26.         ChannelBuffer responseBuffer=new DynamicChannelBuffer(4);  
  27.         responseBuffer.readBytes(backWord);  
  28.         e.getChannel().write(responseBuffer);  
  29.     }  
  30.       
  31.     @Override  
  32.     public void exceptionCaught(ChannelHandlerContext ctx,ExceptionEvent e)  
  33.     {  
  34.         logger.error(e.getCause());  
  35.         if(e.getChannel() !=null)  
  36.         {  
  37.             e.getChannel().close().addListener(ChannelFutureListener.CLOSE);  
  38.         }  
  39.     }  
  40. }   

III、ChannelPipelineFactory实现类,包装ChannelHandler,处理I/O事件。

 

  1. package com.byd.example.netty;  
  2. import org.jboss.netty.channel.ChannelPipeline;  
  3. import org.jboss.netty.channel.ChannelPipelineFactory;  
  4. import org.jboss.netty.channel.Channels;  
  5. import org.springframework.beans.factory.annotation.Autowired;  
  6. import org.springframework.beans.factory.annotation.Qualifier;  
  7. import org.springframework.stereotype.Component;  
  8. @Component("serverChannelPipelineFactory")  
  9. public class ServerChannelPipelineFactory implements ChannelPipelineFactory  
  10. {  
  11.     @Autowired  
  12.     @Qualifier("receiverHandler")  
  13.     private ReceiverHandler handler;  
  14.     @Override  
  15.     public ChannelPipeline getPipeline() throws Exception   
  16.     {  
  17.         ChannelPipeline pipeline=Channels.pipeline();  
  18.         pipeline.addLast("handler"this.handler);  
  19.         return pipeline;  
  20.     }  
  21.     public void setHandler(ReceiverHandler handler) {  
  22.         this.handler = handler;  
  23.     }  
  24. }  

IV、Iserver接口的实现类。

 

  1. package com.byd.example.netty;  
  2. import java.net.InetSocketAddress;  
  3. import java.net.SocketAddress;  
  4. import java.util.concurrent.Executors;  
  5. import org.apache.log4j.Logger;  
  6. import org.jboss.netty.bootstrap.ConnectionlessBootstrap;  
  7. import org.jboss.netty.channel.Channel;  
  8. import org.jboss.netty.channel.ChannelFutureListener;  
  9. import org.jboss.netty.channel.socket.DatagramChannelFactory;  
  10. import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;  
  11. import org.springframework.beans.factory.annotation.Autowired;  
  12. import org.springframework.beans.factory.annotation.Qualifier;  
  13. import org.springframework.stereotype.Component;  
  14. @Component("serverNettyImpl")  
  15. public class ServerNettyImpl implements IServer {  
  16.     @Autowired  
  17.     @Qualifier("serverChannelPipelineFactory")  
  18.     private ServerChannelPipelineFactory pipelineFactory;  
  19.     private Channel channel;  
  20.     private static final Logger logger = Logger.getLogger(ServerNettyImpl.class  
  21.             .getName());  
  22.     @Override  
  23.     public void start() {  
  24.         DatagramChannelFactory udpChannelFactory = new NioDatagramChannelFactory(  
  25.                 Executors.newCachedThreadPool());  
  26.         ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(udpChannelFactory);  
  27.         bootstrap.setOption("reuseAddress"false);  
  28.         bootstrap.setOption("child.reuseAddress"false);  
  29.         bootstrap.setOption("readBufferSize"1024);  
  30.         bootstrap.setOption("writeBufferSize"1024);  
  31.         bootstrap.setPipelineFactory(this.pipelineFactory);  
  32.         SocketAddress serverAddress = new InetSocketAddress(5000);  
  33.         this.channel = bootstrap.bind(serverAddress);  
  34.         logger.info("server start on " + serverAddress);  
  35.     }  
  36.     @Override  
  37.     public void restart() {  
  38.         this.stop();  
  39.         this.start();  
  40.     }  
  41.     @Override  
  42.     public void stop() {  
  43.         if (this.channel != null) {  
  44.             this.channel.close().addListener(ChannelFutureListener.CLOSE);  
  45.         }  
  46.     }  
  47. }   

V、应用程序入口:

 

  1. package com.byd.example;  
  2. import org.springframework.context.ApplicationContext;  
  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  4. import com.byd.example.netty.IServer;  
  5. public class NettyTestRun   
  6. {  
  7.     public static void main( String[] args )  
  8.     {  
  9.         ApplicationContext context=new ClassPathXmlApplicationContext("classpath*:nettyTest-context.xml");  
  10.         IServer server=(IServer)context.getBean("serverNettyImpl");  
  11.         server.start();  
  12.     }  
  13. }  

VI、context配置文件:

 

  1. <?xml version="1.0" encoding="UTF-8" ?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  3.     xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop"  
  4.     xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans  
  6.     http://www.springframework.org/schema/beans/spring-beans.xsd  
  7.     http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd  
  8.     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd  
  9.     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">  
  10.     <context:component-scan base-package="com.byd.example"/>  
  11.     <context:annotation-config/>  
  12. </beans>  

 VII、日志配置文件:

 

  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >  
  3. <log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/' >        
  4.     <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">  
  5.     <param name="target" value="System.out"/>  
  6.     <layout class="org.apache.log4j.PatternLayout">  
  7.         <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%m%n" />  
  8.     </layout>  
  9.     <!--过滤器设置输出的级别-->  
  10.     <filter class="org.apache.log4j.varia.LevelRangeFilter">  
  11.         <param name="levelMin" value="debug" />  
  12.         <param name="levelMax" value="warn" />  
  13.         <param name="AcceptOnMatch" value="true" />  
  14.     </filter>  
  15. </appender>             
  16.     <appender name="FILE" class="org.apache.log4j.RollingFileAppender">     
  17.         <!-- 设置日志输出文件名 -->        
  18.         <param name="File" value="./target/output.log" />    
  19.         <!-- 设置是否在重新启动服务时,在原有日志的基础添加新日志 -->      
  20.         <param name="Append" value="true" />      
  21.         <param name="MaxBackupIndex" value="10" />      
  22.         <param name="encoding" value="UTF-8"/>  
  23.         <layout class="org.apache.log4j.PatternLayout">      
  24.             <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%m%n" />      
  25.         </layout>      
  26.     </appender>      
  27.     <appender name="activexAppender" class="org.apache.log4j.DailyRollingFileAppender">      
  28.         <param name="File" value="./target/activex.log" />        
  29.         <param name="DatePattern" value="'.'yyyy-MM-dd'.log'" />        
  30.         <layout class="org.apache.log4j.PatternLayout">      
  31.          <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss}-%m%n" />      
  32.         </layout>        
  33.     </appender>      
  34.             
  35. <!--  
  36.     <logger name="com.runway.bssp.activeXdemo" additivity="false">      
  37.         <priority value ="info"/>   
  38.         <param name="Level" value="DEBUG"/>       
  39.         <appender-ref ref="activexAppender" />        
  40.     </logger> -->  
  41.      
  42.       
  43.     <!-- 根logger的设置-->      
  44.     <root>      
  45.         <priority value ="debug"/>      
  46.         <appender-ref ref="CONSOLE"/>      
  47.         <appender-ref ref="FILE"/>         
  48.     </root>      
  49. </log4j:configuration>     

整理上面的代码,运行起来,可以看到服务器侦听在5000端口并接受客户端的信息并返回一个int随机数。

后记:netty当然也可以应用在TCP和客户端的程序,具体的使用可以参考netty API文档。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics