netty4已经发布有段时间了,今天因为压测 写了一个netty4的客户端(版本4.0.6.Final)现在分享出来给大家:
netty4配置类
package nettyClient4;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import org.apache.log4j.xml.DOMConfigurator;import java.io.File;import java.io.FileInputStream;import java.io.InputStream;import java.util.Properties;/** * @author : 陈磊 * Date: 13-3-11 * Time: 下午4:08 * connectMethod:13638363871@163.com */public class clientImpl implements Runnable { private String host=""; private short port; public static void main(String[]args){ try { DOMConfigurator.configure("res/log4j.xml"); Properties properties=getProPertis("res/client.properties"); int clientNum= Integer.valueOf(properties.getProperty("num")); String host= properties.getProperty("host"); short port=Short.valueOf(properties.getProperty("port")); for (int i=0;i!=clientNum;++i){ clientImpl client=new clientImpl(); client.setHost(host); client.setPort(port); Thread thread=new Thread(client); thread.start(); } } catch (Exception e){ //do nothing } } public static Properties getProPertis(String filePath) { InputStream fileInputStream = null; try { fileInputStream = new FileInputStream(new File(filePath)); Properties serverSettings = new Properties(); serverSettings.load(fileInputStream); fileInputStream.close(); return serverSettings; } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. return null; } } public void start() throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group( workerGroup).channel(NioSocketChannel.class) ; // NioSocketChannel is being used to create a client-side Channel. //Note that we do not use childOption() here unlike we did with // ServerBootstrap because the client-side SocketChannel does not have a parent. bootstrap.option(ChannelOption.SO_KEEPALIVE, true) ; bootstrap.handler(new ClientChannelInitializer()); // Bind and start to accept incoming connections. ChannelFuture channelFuture = bootstrap.connect(this.host,this.port); // Wait until the server socket is closed. // In this server, this does not happen, but you can do that to gracefully // shut down your CLIENT. Channel channel=channelFuture.channel(); while (true){ ByteBuf buffer= PooledByteBufAllocator.DEFAULT.heapBuffer(10); buffer.writeShort(Short.MIN_VALUE);//包长占2字节 buffer.writeByte(1); buffer.writeByte(0); buffer.setShort(0,buffer.writerIndex()-0x2); channel.writeAndFlush(buffer) ; Thread.sleep(200); } }finally { workerGroup.shutdownGracefully(); } } public short getPort() { return port; } public void setPort(short port) { this.port = port; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } private ByteBuf getWriteBuffer(int arg1,int arg2,ByteBuf buffer,Object...paras){ if (buffer==null){ buffer=PooledByteBufAllocator.DEFAULT.heapBuffer(10); } buffer.writeShort(Short.MIN_VALUE);//包长占2字节 buffer.writeByte(arg1); if (arg2!=0)buffer.writeByte(arg2); for (Object para:paras){ if (para instanceof Byte){ buffer.writeByte((Byte) para); // 占1字节 }else if ((para instanceof String)){ buffer.writeBytes(((String) para).getBytes()); } else if (para instanceof Integer){ buffer.writeInt((Integer)para); //占4字节 }else if (para instanceof Short){ buffer.writeShort((Short) para); //占2字节 } } /**包长占2字节,setShort()*/ buffer.setShort(0,buffer.writerIndex()-0x2); return buffer; } @Override public void run() { try { start(); } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } }}
ChannelInitializer 初始化类:
package nettyClient4;import Server.ExtComponents.utilsKit.ThreadUtils.PriorityThreadFactory;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.logging.LoggingHandler;import io.netty.util.concurrent.DefaultEventExecutorGroup;import io.netty.util.concurrent.EventExecutorGroup;/** * @author : 石头哥哥 * Project : LandlordsServer * Date: 13-8-7 * Time: 上午9:53 * Connect: 13638363871@163.com * packageName: nettyClient4 */@ChannelHandler.Sharablepublic class ClientChannelInitializer extends ChannelInitializer{ private static final LoggingHandler LOGGING_HANDLER=new LoggingHandler(); private static final EventExecutorGroup EVENT_EXECUTORS=new DefaultEventExecutorGroup(3,new PriorityThreadFactory("executionLogicHandlerThread+#", Thread.NORM_PRIORITY )); /** * This method will be called once the {@link io.netty.channel.Channel} was registered. After the method returns this instance * will be removed from the {@link io.netty.channel.ChannelPipeline} of the {@link io.netty.channel.Channel}. * * @param ch the {@link io.netty.channel.Channel} which was registered. * @throws Exception is thrown if an error occurs. In that case the {@link io.netty.channel.Channel} will be closed. */ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("LOGGING_HANDLER",LOGGING_HANDLER); pipeline.addLast("decoder",new clientDecoder(20000,0,2 ,0,2)); pipeline.addLast("handler",new clientHandler()); pipeline.addLast("encoder",new clientEncoder()); }}
decoder类:
package nettyClient4;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import io.netty.handler.codec.CorruptedFrameException;import io.netty.handler.codec.TooLongFrameException;import java.nio.ByteOrder;import java.util.List;/** * @author : 石头哥哥 * Project : LandlordsServer * Date: 13-8-7 * Time: 上午9:52 * Connect: 13638363871@163.com * packageName: nettyClient4 */public class clientDecoder extends ByteToMessageDecoder { private final ByteOrder byteOrder; private final int maxFrameLength; private final int lengthFieldOffset; private final int lengthFieldLength; private final int lengthFieldEndOffset; private final int lengthAdjustment; private final int initialBytesToStrip; private final boolean failFast; private boolean discardingTooLongFrame; private long tooLongFrameLength; private long bytesToDiscard; /** * * @param maxFrameLength * @param lengthFieldOffset * @param lengthFieldLength * @param lengthAdjustment * @param initialBytesToStrip */ public clientDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) { this( maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true); } /** * * @param maxFrameLength * @param lengthFieldOffset * @param lengthFieldLength * @param lengthAdjustment * @param initialBytesToStrip * @param failFast */ public clientDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { this( ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); } /** * * @param byteOrder * @param maxFrameLength * @param lengthFieldOffset * @param lengthFieldLength * @param lengthAdjustment * @param initialBytesToStrip * @param failFast */ public clientDecoder( ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { if (byteOrder == null) { throw new NullPointerException("byteOrder"); } if (maxFrameLength <= 0) { throw new IllegalArgumentException( "maxFrameLength must be a positive integer: " + maxFrameLength); } if (lengthFieldOffset < 0) { throw new IllegalArgumentException( "lengthFieldOffset must be a non-negative integer: " + lengthFieldOffset); } if (initialBytesToStrip < 0) { throw new IllegalArgumentException( "initialBytesToStrip must be a non-negative integer: " + initialBytesToStrip); } if (lengthFieldLength != 1 && lengthFieldLength != 2 && lengthFieldLength != 3 && lengthFieldLength != 4 && lengthFieldLength != 8) { throw new IllegalArgumentException( "lengthFieldLength must be either 1, 2, 3, 4, or 8: " + lengthFieldLength); } if (lengthFieldOffset > maxFrameLength - lengthFieldLength) { throw new IllegalArgumentException( "maxFrameLength (" + maxFrameLength + ") " + "must be equal to or greater than " + "lengthFieldOffset (" + lengthFieldOffset + ") + " + "lengthFieldLength (" + lengthFieldLength + ")."); } this.byteOrder = byteOrder; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength; this.initialBytesToStrip = initialBytesToStrip; this.failFast = failFast; } /** * decode message will be added the MessageListencoder类:
package nettyClient4;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;/** * @author : 石头哥哥 * Project : LandlordsServer * Date: 13-8-7 * Time: 上午9:52 * Connect: 13638363871@163.com * packageName: nettyClient4 */@ChannelHandler.Sharablepublic class clientEncoder extends MessageToByteEncoder{ public clientEncoder(){ super(false); } /** * Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that can be handled * by this encoder. * * @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link io.netty.handler.codec.MessageToByteEncoder} belongs to * @param msg the message to encode * @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written * @throws Exception is thrown if an error accour */ @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { out.writeBytes(msg, msg.readerIndex(), msg.readableBytes()); }}
逻辑处理类handler:
package nettyClient4;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * @author : 石头哥哥 * Project : LandlordsServer * Date: 13-8-7 * Time: 上午9:52 * Connect: 13638363871@163.com * packageName: nettyClient4 */public class clientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf= (ByteBuf)msg; try { byte firstType=byteBuf.readByte(); //类型一 if (firstType>0){ byte secondType=byteBuf.readByte(); //类型二 }else { } } catch (Exception e) { } byteBuf.release(); ctx.fireChannelReadComplete(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { }}以上就是netty4的客户端代码,注意配置类里面的参数设置(如果要用的话稍微改改);有什么问题可以留言