博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty4 客户端代码分享
阅读量:7037 次
发布时间:2019-06-28

本文共 19805 字,大约阅读时间需要 66 分钟。

hot3.png

netty4已经发布有段时间了,今天因为压测 写了一个netty4的客户端(版本4.0.6.Final)现在分享出来给大家:

netty4配置类

package nettyClient4;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.PooledByteBufAllocator;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import org.apache.log4j.xml.DOMConfigurator;import java.io.File;import java.io.FileInputStream;import java.io.InputStream;import java.util.Properties;/** * @author : 陈磊 
* Date: 13-3-11
* Time: 下午4:08
* connectMethod:13638363871@163.com
*/public class clientImpl implements Runnable { private String host=""; private short port; public static void main(String[]args){ try { DOMConfigurator.configure("res/log4j.xml"); Properties properties=getProPertis("res/client.properties"); int clientNum= Integer.valueOf(properties.getProperty("num")); String host= properties.getProperty("host"); short port=Short.valueOf(properties.getProperty("port")); for (int i=0;i!=clientNum;++i){ clientImpl client=new clientImpl(); client.setHost(host); client.setPort(port); Thread thread=new Thread(client); thread.start(); } } catch (Exception e){ //do nothing } } public static Properties getProPertis(String filePath) { InputStream fileInputStream = null; try { fileInputStream = new FileInputStream(new File(filePath)); Properties serverSettings = new Properties(); serverSettings.load(fileInputStream); fileInputStream.close(); return serverSettings; } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. return null; } } public void start() throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group( workerGroup).channel(NioSocketChannel.class) ; // NioSocketChannel is being used to create a client-side Channel. //Note that we do not use childOption() here unlike we did with // ServerBootstrap because the client-side SocketChannel does not have a parent. bootstrap.option(ChannelOption.SO_KEEPALIVE, true) ; bootstrap.handler(new ClientChannelInitializer()); // Bind and start to accept incoming connections. ChannelFuture channelFuture = bootstrap.connect(this.host,this.port); // Wait until the server socket is closed. // In this server, this does not happen, but you can do that to gracefully // shut down your CLIENT. Channel channel=channelFuture.channel(); while (true){ ByteBuf buffer= PooledByteBufAllocator.DEFAULT.heapBuffer(10); buffer.writeShort(Short.MIN_VALUE);//包长占2字节 buffer.writeByte(1); buffer.writeByte(0); buffer.setShort(0,buffer.writerIndex()-0x2); channel.writeAndFlush(buffer) ; Thread.sleep(200); } }finally { workerGroup.shutdownGracefully(); } } public short getPort() { return port; } public void setPort(short port) { this.port = port; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } private ByteBuf getWriteBuffer(int arg1,int arg2,ByteBuf buffer,Object...paras){ if (buffer==null){ buffer=PooledByteBufAllocator.DEFAULT.heapBuffer(10); } buffer.writeShort(Short.MIN_VALUE);//包长占2字节 buffer.writeByte(arg1); if (arg2!=0)buffer.writeByte(arg2); for (Object para:paras){ if (para instanceof Byte){ buffer.writeByte((Byte) para); // 占1字节 }else if ((para instanceof String)){ buffer.writeBytes(((String) para).getBytes()); } else if (para instanceof Integer){ buffer.writeInt((Integer)para); //占4字节 }else if (para instanceof Short){ buffer.writeShort((Short) para); //占2字节 } } /**包长占2字节,setShort()*/ buffer.setShort(0,buffer.writerIndex()-0x2); return buffer; } @Override public void run() { try { start(); } catch (Exception e) { e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. } }}

ChannelInitializer 初始化类:

package nettyClient4;import Server.ExtComponents.utilsKit.ThreadUtils.PriorityThreadFactory;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.socket.SocketChannel;import io.netty.handler.logging.LoggingHandler;import io.netty.util.concurrent.DefaultEventExecutorGroup;import io.netty.util.concurrent.EventExecutorGroup;/** * @author : 石头哥哥 *         Project : LandlordsServer *         Date: 13-8-7 *         Time: 上午9:53 *         Connect: 13638363871@163.com *         packageName: nettyClient4 */@ChannelHandler.Sharablepublic class ClientChannelInitializer extends ChannelInitializer
{ private static final LoggingHandler LOGGING_HANDLER=new LoggingHandler(); private static final EventExecutorGroup EVENT_EXECUTORS=new DefaultEventExecutorGroup(3,new PriorityThreadFactory("executionLogicHandlerThread+#", Thread.NORM_PRIORITY )); /** * This method will be called once the {@link io.netty.channel.Channel} was registered. After the method returns this instance * will be removed from the {@link io.netty.channel.ChannelPipeline} of the {@link io.netty.channel.Channel}. * * @param ch the {@link io.netty.channel.Channel} which was registered. * @throws Exception is thrown if an error occurs. In that case the {@link io.netty.channel.Channel} will be closed. */ @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("LOGGING_HANDLER",LOGGING_HANDLER); pipeline.addLast("decoder",new clientDecoder(20000,0,2 ,0,2)); pipeline.addLast("handler",new clientHandler()); pipeline.addLast("encoder",new clientEncoder()); }}

decoder类:
package nettyClient4;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import io.netty.handler.codec.CorruptedFrameException;import io.netty.handler.codec.TooLongFrameException;import java.nio.ByteOrder;import java.util.List;/** * @author : 石头哥哥 *         Project : LandlordsServer *         Date: 13-8-7 *         Time: 上午9:52 *         Connect: 13638363871@163.com *         packageName: nettyClient4 */public class clientDecoder extends ByteToMessageDecoder {    private final ByteOrder byteOrder;    private final int maxFrameLength;    private final int lengthFieldOffset;    private final int lengthFieldLength;    private final int lengthFieldEndOffset;    private final int lengthAdjustment;    private final int initialBytesToStrip;    private final boolean failFast;    private boolean discardingTooLongFrame;    private long tooLongFrameLength;    private long bytesToDiscard;    /**     *     * @param maxFrameLength     * @param lengthFieldOffset     * @param lengthFieldLength     * @param lengthAdjustment     * @param initialBytesToStrip     */    public clientDecoder(            int maxFrameLength,            int lengthFieldOffset, int lengthFieldLength,            int lengthAdjustment, int initialBytesToStrip) {        this(                maxFrameLength,                lengthFieldOffset, lengthFieldLength, lengthAdjustment,                initialBytesToStrip, true);    }    /**     *     * @param maxFrameLength     * @param lengthFieldOffset     * @param lengthFieldLength     * @param lengthAdjustment     * @param initialBytesToStrip     * @param failFast     */    public clientDecoder(            int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,            int lengthAdjustment, int initialBytesToStrip, boolean failFast) {        this(                ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength,                lengthAdjustment, initialBytesToStrip, failFast);    }    /**     *     * @param byteOrder     * @param maxFrameLength     * @param lengthFieldOffset     * @param lengthFieldLength     * @param lengthAdjustment     * @param initialBytesToStrip     * @param failFast     */    public clientDecoder(            ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,            int lengthAdjustment, int initialBytesToStrip, boolean failFast) {        if (byteOrder == null) {            throw new NullPointerException("byteOrder");        }        if (maxFrameLength <= 0) {            throw new IllegalArgumentException(                    "maxFrameLength must be a positive integer: " +                            maxFrameLength);        }        if (lengthFieldOffset < 0) {            throw new IllegalArgumentException(                    "lengthFieldOffset must be a non-negative integer: " +                            lengthFieldOffset);        }        if (initialBytesToStrip < 0) {            throw new IllegalArgumentException(                    "initialBytesToStrip must be a non-negative integer: " +                            initialBytesToStrip);        }        if (lengthFieldLength != 1 && lengthFieldLength != 2 &&                lengthFieldLength != 3 && lengthFieldLength != 4 &&                lengthFieldLength != 8) {            throw new IllegalArgumentException(                    "lengthFieldLength must be either 1, 2, 3, 4, or 8: " +                            lengthFieldLength);        }        if (lengthFieldOffset > maxFrameLength - lengthFieldLength) {            throw new IllegalArgumentException(                    "maxFrameLength (" + maxFrameLength + ") " +                            "must be equal to or greater than " +                            "lengthFieldOffset (" + lengthFieldOffset + ") + " +                            "lengthFieldLength (" + lengthFieldLength + ").");        }        this.byteOrder = byteOrder;        this.maxFrameLength = maxFrameLength;        this.lengthFieldOffset = lengthFieldOffset;        this.lengthFieldLength = lengthFieldLength;        this.lengthAdjustment = lengthAdjustment;        lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;        this.initialBytesToStrip = initialBytesToStrip;        this.failFast = failFast;    }    /**     * decode message will be added the MessageList out  queue!     * @param ctx     * @param in     * @param out     * @throws Exception     *  针对每一个channel 都会有一个相应的OutputMessageBuf消息缓存队列     */    @Override    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        ByteBuf byteBuf = decode(ctx, in);        if (byteBuf != null) {            out.add(byteBuf);        }    }    /**     * return the  date of ByteBuf     * @param ctx     * @param in     * @return     * @throws Exception     */    private ByteBuf decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {        if (discardingTooLongFrame) {            long bytesToDiscard = this.bytesToDiscard;            int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());            in.skipBytes(localBytesToDiscard);            bytesToDiscard-= localBytesToDiscard;            this.bytesToDiscard = bytesToDiscard;            failIfNecessary(ctx, false);            return null;        }        if (in.readableBytes() < lengthFieldEndOffset) {            return null;        }        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;        /**获取数据包长*///        long frameLength = (in.order(byteOrder)).getUnsignedByte(actualLengthFieldOffset);        long frameLength = (in.order(byteOrder)).getUnsignedShort(actualLengthFieldOffset);        if (frameLength < 0) {            in.skipBytes(lengthFieldEndOffset);            throw new CorruptedFrameException(                    "negative pre-adjustment length field: " + frameLength);        }        frameLength += lengthAdjustment + lengthFieldEndOffset;        if (frameLength < lengthFieldEndOffset) {            in.skipBytes(lengthFieldEndOffset);            throw new CorruptedFrameException(                    "Adjusted frame length (" + frameLength + ") is less " +                            "than lengthFieldEndOffset: " + lengthFieldEndOffset);        }        if (frameLength > maxFrameLength) {            // Enter the discard mode and discard everything received so far.            discardingTooLongFrame = true;            tooLongFrameLength = frameLength;            bytesToDiscard = frameLength - in.readableBytes();            in.skipBytes(in.readableBytes());            failIfNecessary(ctx, true);            return null;        }        // never overflows because it's less than maxFrameLength        int frameLengthInt = (int) frameLength;        if (in.readableBytes() < frameLengthInt) {            return null;        }        if (initialBytesToStrip > frameLengthInt) {            in.skipBytes(frameLengthInt);            throw new CorruptedFrameException(                    "Adjusted frame length (" + frameLength + ") is less " +                            "than initialBytesToStrip: " + initialBytesToStrip);        }        in.skipBytes(initialBytesToStrip);        // extract frame        int readerIndex = in.readerIndex();        int actualFrameLength = frameLengthInt - initialBytesToStrip;        ByteBuf frame = extractFrame(in, readerIndex, actualFrameLength,ctx);        in.readerIndex(readerIndex + actualFrameLength);        return frame;    }    /**     * 返回字节长度 byte  short     * return  the length of pack     * @param  in     * @param  actualLengthFieldOffset     * @return     */    @Deprecated    private long getFrameLength(ByteBuf in, int actualLengthFieldOffset) {        in = in.order(byteOrder);        return in.getUnsignedShort(actualLengthFieldOffset);//return  the length    }    /**     *     * @param  ctx     * @param  firstDetectionOfTooLongFrame     */    private void failIfNecessary(ChannelHandlerContext ctx, boolean firstDetectionOfTooLongFrame) {        if (bytesToDiscard == 0) {            // Reset to the initial state and tell the handlers that            // the frame was too large.            long tooLongFrameLength = this.tooLongFrameLength;            this.tooLongFrameLength = 0;            discardingTooLongFrame = false;            if (!failFast ||                    failFast && firstDetectionOfTooLongFrame) {                fail(tooLongFrameLength);            }        } else {            // Keep discarding and notify handlers if necessary.            if (failFast && firstDetectionOfTooLongFrame) {                fail(tooLongFrameLength);            }        }    }    /**     * Extract the sub-region of the specified buffer.     * 

* If you are sure that the frame and its content are not accessed after * the current {@link #decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf)} * call returns, you can even avoid memory copy by returning the sliced * sub-region (i.e. return buffer.slice(index, length)). * It's often useful when you convert the extracted frame into an object. * Refer to the source code of {@link io.netty.handler.codec.serialization.ObjectDecoder} to see how this method * is overridden to avoid memory copy. */ protected ByteBuf extractFrame(ByteBuf buffer, int index, int length) { ByteBuf frame = Unpooled.buffer(length); frame.writeBytes(buffer, index, length); return frame; } /** * is overridden to avoid memory copy. * 使用Unpooled ByteBufs会造成沉重的分配与再分配问题, * 使用ChannelHandlerContext.alloc()或Channel.alloc() * 来获取ByteBufAllocator分配ByteBuf,从而减轻GC执行; * @param buffer * @param index * @param length * @param ctx ChannelHandlerContext * @return */ protected ByteBuf extractFrame(ByteBuf buffer, int index,int length,ChannelHandlerContext ctx) { ByteBuf frame =ctx.alloc().buffer(length); frame.writeBytes(buffer, index, length); return frame; } /** * * 解析数据,返回pack[]数组数据包 * @param buffer * @param index * @param length * @return byte[] */ private byte[] extractFrameByteArray(ByteBuf buffer, int index, int length) { byte[] pack=new byte[length]; buffer.readBytes(pack,index,length); return pack; } //logger fail private void fail(long frameLength) { if (frameLength > 0) { throw new TooLongFrameException( "Adjusted frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded"); } else { throw new TooLongFrameException( "Adjusted frame length exceeds " + maxFrameLength + " - discarding"); } }}

encoder类:
package nettyClient4;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;/** * @author : 石头哥哥 *         Project : LandlordsServer *         Date: 13-8-7 *         Time: 上午9:52 *         Connect: 13638363871@163.com *         packageName: nettyClient4 */@ChannelHandler.Sharablepublic class clientEncoder  extends MessageToByteEncoder
{ public clientEncoder(){ super(false); } /** * Encode a message into a {@link io.netty.buffer.ByteBuf}. This method will be called for each written message that can be handled * by this encoder. * * @param ctx the {@link io.netty.channel.ChannelHandlerContext} which this {@link io.netty.handler.codec.MessageToByteEncoder} belongs to * @param msg the message to encode * @param out the {@link io.netty.buffer.ByteBuf} into which the encoded message will be written * @throws Exception is thrown if an error accour */ @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { out.writeBytes(msg, msg.readerIndex(), msg.readableBytes()); }}

逻辑处理类handler:

package nettyClient4;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;/** * @author : 石头哥哥 *         Project : LandlordsServer *         Date: 13-8-7 *         Time: 上午9:52 *         Connect: 13638363871@163.com *         packageName: nettyClient4 */public class clientHandler  extends ChannelInboundHandlerAdapter {    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        ByteBuf byteBuf= (ByteBuf)msg;        try {            byte firstType=byteBuf.readByte(); //类型一            if (firstType>0){                byte secondType=byteBuf.readByte();  //类型二            }else {            }        } catch (Exception e) {        }        byteBuf.release();        ctx.fireChannelReadComplete();    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {    }}
以上就是netty4的客户端代码,注意配置类里面的参数设置(如果要用的话稍微改改);有什么问题可以留言

转载于:https://my.oschina.net/chenleijava/blog/151095

你可能感兴趣的文章
Andrew Ng 深度学习笔记-01-week3-课程
查看>>
Android获取通过XML设置的空间的高宽
查看>>
生活的苦逼
查看>>
在iptables防火墙下开启vsftpd的端口
查看>>
Mysql、MariaDB 新型主从集群配置GTID
查看>>
Linux HA Cluster的实例演示(2)
查看>>
Javascript Closure
查看>>
Delphi之word报表
查看>>
重要博客
查看>>
解析C#开发过程常见的编程模式
查看>>
java单例模式Singleton
查看>>
JsonUtils工具整理
查看>>
Python操作Redis
查看>>
【C++ Primer】第六章(分支语句和逻辑操作符)
查看>>
centsos7修改主机名 [root@st152 ~]# cat /etc/hostname
查看>>
软件工程(2018)团体第五次作业
查看>>
windows phone 7 系列教程索引
查看>>
委托的异步编程和同步编程的使用( Invoke 和BeginInvoke)
查看>>
转载 iphone 获取iPhone用户手机号
查看>>
简单springmvc在Eclipse的Tomcat上部署404error,直接在Tomcat上部署可以访问
查看>>