大家好,我是小趴菜,接下来我会从0到1手写一个RPC结构,该专题包括以下专题,有爱好的小伙伴就跟着我一同学习吧

本章源码地址:gitee.com/baojh123/se…

自定义注解 -> opt-01
服务提供者收发音讯根底完结 -> opt-01
自定义网络传输协议的完结 -> opt-02
自定义编解码完结 -> opt-03
服务提供者调用实在方法完结 -> opt-04
完善服务顾客发送音讯根底功用 -> opt-05
注册中心根底功用完结 -> opt-06
服务提供者整合注册中心 -> opt-07
服务顾客整合注册中心 -> opt-08
完善服务顾客接纳照应成果 -> opt-09
服务顾客,服务提供者整合SpringBoot -> opt-10
动态署理屏蔽RPC服务调用底层细节 -> opt-10
SPI机制根底功用完结 -> opt-11
SPI机制扩展随机负载均衡战略 -> opt-12
SPI机制扩展轮询负载均衡战略 -> opt-13
SPI机制扩展JDK序列化 -> opt-14
SPI机制扩展JSON序列化 -> opt-15
SPI机制扩展protustuff序列化 -> opt-16

前语

在之前的章节中,我们现已完结了服务提供者的收发音讯功用,并且服务提供者可以调用实在方法并回来照应成果。

可是我们发现,发送音讯是在服务顾客注册的时分,发送了一条音讯,这明显不符合我们的需求。

这一章我们完善下服务顾客发送音讯的根底功用

完结

修正服务顾客:com.xpc.rpc.consumer.RpcConsumer

package com.xpc.rpc.consumer;
import com.xpc.rpc.codec.RpcDecoder;
import com.xpc.rpc.codec.RpcEncoder;
import com.xpc.rpc.consumer.handler.RpcConsumerHandler;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.request.RpcRequest;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class RpcConsumer {
     private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumer.class);
     private Bootstrap bootstrap;
     private EventLoopGroup eventLoopGroup;
     private Map<String,RpcConsumerHandler> handlerMap = new ConcurrentHashMap<>();
     public RpcConsumer() {
         eventLoopGroup = new NioEventLoopGroup();
         bootstrap = new Bootstrap();
         bootstrap.group(eventLoopGroup)
                 .channel(NioSocketChannel.class)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     protected void initChannel(SocketChannel channel) throws Exception {
                         ChannelPipeline pipeline = channel.pipeline();
                         //我们自己的编解码器
                         pipeline.addLast(new RpcDecoder());
                         pipeline.addLast(new RpcEncoder());
                         pipeline.addLast(new RpcConsumerHandler());
                     }
                 });
     }
     public void sendRequest(ProtocolMessage<RpcRequest> requestProtocolMessage) {
         //先写死,后续会通过注册中心获取
         String host = "127.0.0.1";
         int port = 21778;
         RpcRequest request = requestProtocolMessage.getT();
         String key = buildHandlerMapKey(request.getClassName(), request.getMethodName());
         RpcConsumerHandler consumerHandler;
         if(handlerMap.containsKey(key)) {
             consumerHandler = handlerMap.get(key);
             Channel channel = consumerHandler.getChannel();
             if(!channel.isOpen() || !channel.isActive()) {
                 consumerHandler = getConsumerHandler(key, host, port);
                 handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
             }
         }else {
             consumerHandler = getConsumerHandler(key, host, port);
             if(consumerHandler == null) {
                 throw new RuntimeException("");
             }
             handlerMap.put(buildHandlerMapKey(request.getClassName(),request.getMethodName()),consumerHandler);
         }
         //发送音讯
         consumerHandler.sendRequest(requestProtocolMessage);
     }
    private RpcConsumerHandler getConsumerHandler(String key,String host, int port) {
        try {
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if(future.isSuccess()) {
                        LOGGER.info("客户端衔接成功........");
                    }else {
                        LOGGER.info("客户端衔接失利........");
                    }
                }
            });
            RpcConsumerHandler rpcConsumerHandler = channelFuture.channel().pipeline().get(RpcConsumerHandler.class);
            handlerMap.put(key,rpcConsumerHandler);
            return rpcConsumerHandler;
        } catch (InterruptedException e) {
            return null;
        }
    }
    public void close() {
         if(eventLoopGroup != null) {
             eventLoopGroup.shutdownGracefully();
         }
    }
    private String buildHandlerMapKey(String className,String methodName) {
         StringBuilder key = new StringBuilder();
         key.append(className).append("#").append(methodName);
         return key.toString();
     }
}
  • 首先是修正了结构方法,现在没有直接去获取衔接了,只是作为初始化
  • 新增sendRequest()方法,这个就是服务顾客发送音讯的方法了
  • 继而会调用RpcConsumerHandler.sendRequest()方法去实在的发送音讯给服务提供者

修正服务顾客的自定义处理器:com.xpc.rpc.consumer.handler.RpcConsumerHandler

package com.xpc.rpc.consumer.handler;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import com.xpc.rpc.protocol.response.RpcResponse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcConsumerHandler extends SimpleChannelInboundHandler<ProtocolMessage<RpcResponse>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcConsumerHandler.class);
    private Channel channel;
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        this.channel = ctx.channel();
    }
    //获取照应成果
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolMessage<RpcResponse> protocolMessage) throws Exception {
        LOGGER.info("code: {}",protocolMessage.getT().getCode());
        LOGGER.info("data: {}",protocolMessage.getT().getData());
    }
    //发送央求到服务提供者
    public void sendRequest(ProtocolMessage<RpcRequest> requestProtocolMessage) {
        channel.writeAndFlush(requestProtocolMessage);
    }
    public Channel getChannel() {
        return channel;
    }
    public void setChannel(Channel channel) {
        this.channel = channel;
    }
}

到此代码修正就完结了,接下来就是检验了

检验

发动服务提供者:com.xpc.test.netty.providerTest

package com.xpc.test.netty;
import com.xpc.rpc.provider.server.base.BaseServer;
import org.junit.Test;
public class ProviderTest {
    @Test
    public void startNetty() {
        BaseServer baseServer = new BaseServer("com.xpc");
        baseServer.startNettyServer();
    }
}

修正并发动服务顾客 com.xpc.test.netty.ConsumerTest

package com.xpc.test.netty;
import com.xpc.rpc.common.enums.RpcMsgType;
import com.xpc.rpc.consumer.RpcConsumer;
import com.xpc.rpc.protocol.ProtocolMessage;
import com.xpc.rpc.protocol.header.RpcHeader;
import com.xpc.rpc.protocol.request.RpcRequest;
import org.junit.Test;
public class ConsumerTest {
    @Test
    public void startConsumer() throws Exception{
        RpcConsumer rpcConsumer = new RpcConsumer();
        //发送央求
        rpcConsumer.sendRequest(getRequest());
        Thread.sleep(5000);
        rpcConsumer.close();
    }
    //结构央求
    private ProtocolMessage<RpcRequest> getRequest() {
        ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<RpcRequest>();
        RpcHeader rpcHeader = new RpcHeader();
        rpcHeader.setMsgType(RpcMsgType.REQUEST.getType());
        rpcHeader.setRequestId(1L);
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName("com.xpc.test.scanner.DemoService");
        rpcRequest.setMethodName("hello");
        rpcRequest.setParameterTypes(new Class[]{String.class});
        rpcRequest.setParameters(new Object[]{"coco"});
        protocolMessage.setRpcHeader(rpcHeader);
        protocolMessage.setT(rpcRequest);
        return protocolMessage;
    }
}

服务提供者日志:

手撸RPC结构 - 完善顾客发送音讯根底功用

服务顾客日志:

手撸RPC结构 - 完善顾客发送音讯根底功用