Netty是由JBOSS供给的一个java开源结构。Netty供给异步的、根据事情驱动的网络使用程序结构,可以简略快速的开发高性能、高可靠性的网络IO使用, 是主流的NIO结构之一,Netty在互联网范畴、分布式核算范畴、通讯等范畴都有广泛的使用,知名的Elasticsearch、Dubbo结构等内部都采用了Netty。

Netty简化和流线化了网络使用的编程开发过程,例如本文主讲的根据Netty-UDP的socket音讯播送与监控。

本文主讲Netty UDP的音讯播送与监控的代码完成, 为了让读者更易了解,本文按如下顺序阐述

  1. 前置的常识与概念:主要阐述 四层和七层协议TCP和UDP 的概念
  2. Netty中UDP播送相关接口与完成类 做一个简要的阐明
  3. 实战代码部分:分为服务端与客户服,含代码完成功用的解析

前置常识

Netty官网: netty.io
HTTP协议-02:四层和七层协议

什么是TCP和UDP

TCP是面向衔接的传输,是指管理了两个端点之间的衔接的建立,在衔接的生命周期内的有序和可靠的音讯传输及有序的终止。

UDP属于无衔接协议,并无持久化衔接的概念,每个音讯(一个UDP数据包)都是一个独自的传输单元。UDP也无TCP的纠错机制,每个节点都将承认他们所接收到的包,而没有被承认的包将会被发送方重新传输。

UDP适用、优势与不足的分析

  • 有限制,但UDP高速于TCP;适用于那些可以处理或容忍音讯丢失的使用程序(金融类的交易一定是不合适的)
  • 单播: 发送音讯给一个由仅有地址所标识的单一的网络目的地,面向衔接的协议和无衔接协议都支撑
  • 多播: 传输到一个预订的主机组
  • 播送: 传到网络(或子网)上所有的主机
  • 发布与订阅: 类似于syslog的使用程序将被归类为发布与订阅(一个生产者,多个接收者订阅音讯)

Netty中UDP播送相关接口与完成类

  • interface AddressedEnvelope<M,A extends SocketAddress>: 定义一个音讯,其包装了另一个音讯并带有发送者和接收者地址。其间M是音讯类型, A是地址类型
  • class DefaultAddressEnvelope<M,A extends SocketAddress> implements AddressedEnvelope<M,A: 供给了AddressedEnvelope默认完成
  • interface DatagramChannel extends Channe: 扩展了Netty的Channel抽象类以支撑UDP的多播组管理
  • class NioDatagramChannel extends AbstractNioMessageChannel: 定义一个能发送或接收AddressedEnvelope音讯的Channel类型
  • class DatagramPacket extends DefaultAddressEnvelope<ByteBuf,InetSocketAddress> implements ByteBufHolder

    • 扩展了DefaultAddressEnvelope以使用ByteBuf作为音讯数据容器
    • DatagramPacket是一个简略的音讯容器,DatagramChannel完成用它来和长途节点通讯

实战代码

功用描绘

  • 播送端(服务端):读取一个文件,将文件中的每一行当成一个音讯播送到指定端口(注:该程序无身份认证、验证或加密,请读者自行增加)
  • 接收端(客户端):接收并处理音讯

ChannelPipeline事情流

  • 本地: ChannelPipeline处理流程: LogEvent -> LogEventEncoder -> DataGramPacket
  • 播送多个长途节点:长途节点1,长途节点2,长途节点3….

服务端代码

LogEvent — 定义音讯组件

public final class LogEvent {
    public static final byte SEPARATOR=(byte)':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;
    public LogEvent(String logfile, String msg ){
      this(null,logfile,msg,-1);
    }
    public LogEvent(InetSocketAddress source, String logfile, String msg, long received) {
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }
    public InetSocketAddress getSource() {
        return source;
    }
    public String getLogfile() {
        return logfile;
    }
    public String getMsg() {
        return msg;
    }
      public long getReceivedTimestamp(){
            return received;
      }
}

LogEventEncoder – 音讯封装

/**
 * LogEvent的编解码器
 * 在将logevent转为DataGramPackage之前必须先进行编码
 */
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;
    /**
     * 创建行将被发送到指定的InetSocketAddress的DatagramPacket的音讯
     * @param remoteAddress
     */
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = ctx.alloc().buffer(file.length msg.length 1);
        buf.writeBytes(file); //将文件写入到ByteBuf中
        buf.writeByte(LogEvent.SEPARATOR); //增加一个SEPARATOR
        buf.writeBytes(msg); //将日志音讯写入到ByteBuf中
        //将一个拥有数据和目的地的新DatagramPacket增加到出站音讯列表中
        out.add(new DatagramPacket(buf,remoteAddress));
    }
}

LogEventBroadcaster – 发动类

public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;
    public LogEventBroadcaster(InetSocketAddress address, File file) {
        this.group = new NioEventLoopGroup();
        this.file = file;
        this.bootstrap = new Bootstrap();
        bootstrap.group(group)
            .channel(NioDatagramChannel.class)
            .option(ChannelOption.SO_BROADCAST,true) //设置SO_BROADCAST套接字选项
            .handler(new LogEventEncoder(address));
    }
    public void run() throws Exception{
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        for(;;){
            //发动自动循环
            long len = file.length();
            if (len<pointer){
                pointer = len; //将文件指针设置到该文件的最后一个字节
            }else if(len>pointer){
                RandomAccessFile raf = new RandomAccessFile(file,"r");
                raf.seek(pointer); //设置当前的文件指针,以保证没有任何旧日志被发送
                String line;
                while((line=raf.readLine())!=null){
                    ch.writeAndFlush(new LogEvent(null,file.getAbsolutePath(),line,-1));
                }
                pointer = raf.getFilePointer();
                raf.close();
            }
            try{
                Thread.sleep(1000); //1秒
            }catch (Exception e){
                //休眠1秒被中断,则退出循环,否则重新处理它
                Thread.interrupted();
                break;
            }
        }
    }
    public void stop(){
        group.shutdownGracefully();
    }
    /**
     * 第1个参数为端口
     * 第2个参数文件路径
     * @param args
     */
    public static void main(String[] args) throws Exception {
        if (args.length!=2){
            throw new IllegalArgumentException("请输入2个参数");
        }
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0]))
                ,new File(args[1]));
        try{
            broadcaster.run();
        }finally {
            broadcaster.stop();
        }
    }
}

客户端-监控端

ClientLogEventEncoder – LogEvent的编解码器


/**
 * LogEvent的编解码器
 * 在将logevent转为DataGramPackage之前必须先进行编码
 */
public class ClientLogEventEncoder extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket packet, List<Object> out) throws Exception {
        ByteBuf data = packet.content();
        int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR);
        String filename = data.slice(0,idx).toString(CharsetUtil.UTF_8); //提取文件名
        String logMsg = data.slice(idx 1,data.readableBytes()).toString(CharsetUtil.UTF_8);
        LogEvent event = new LogEvent(packet.sender()
            ,filename,logMsg,System.currentTimeMillis()
        );
        out.add(event);
    }
}

ClientLogEventHandler – 音讯处理


public class ClientLogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append("[");
        builder.append(event.getSource());
        builder.append("][");
        builder.append(event.getLogfile());
        builder.append("]");
        builder.append(event.getMsg());
        System.out.println(builder.toString()); //打印logEvent的数据
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

LogEventMonitor — 发动程序


public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    public LogEventMonitor(InetSocketAddress address){
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST,true) //设置套接字SO_BROADCAST
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new ClientLogEventEncoder());
                        pipeline.addLast(new ClientLogEventHandler());
                    }
                })
                .localAddress(address);
    }
    public Channel bind(){
        //绑定channel
        //DatagramChannel无衔接
        return bootstrap.bind().syncUninterruptibly().channel();
    }
    public void stop(){
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws InterruptedException {
        if (args.length!=1){
            throw  new IllegalArgumentException("Usage: LogEventMonitor");
        }
        LogEventMonitor monitor = new LogEventMonitor(
                new InetSocketAddress(Integer.parseInt(args[0]))
        );
        try{
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync(); //堵塞等候服务端监听端口关闭。
        }finally {
            monitor.stop();
        }
    }
}

该文章是笔者两年前学习Netty时写的笔记文章,但段前的基础常识是本次新增的。

[参考]