欢迎咱们注重大众号「JAVA前哨」查看更多精彩共享文章,首要包括源码剖析、实践使用、架构思想、职场共享、产品考虑等等,一同欢迎咱们加我个人微信「java_front」一同交流学习


0 文章概述

在RPC场景中由于重试或许没有实现幂等机制而导致的重复数据问题,有必要引起咱们注重,有或许会形成例如一次购买创立多笔订单,一条通知信息被发送屡次等问题,这是技术人员有必要面对和解决的问题。

有人或许会说:当调用失利时程序并没有显现重试,为什么还会发生重复数据问题呢?这是由于即便没有显现重试,RPC框架在集群容错机制中自动进行了重试,这个问题有必要引起注重。

本文咱们以DUBBO框架为例剖析为什么重试,怎么做重试,怎么做幂等三个问题。


警惕看不见的重试机制:为什么使用RPC必须考虑幂等性


1 为什么重试

假如简单对一个RPC交互过程进行分类,咱们能够分为三类:呼应成功、呼应失利、没有呼应。


警惕看不见的重试机制:为什么使用RPC必须考虑幂等性


关于呼应成功和呼应失利这两种状况,顾客很好处理。由于呼应信息清晰,所以只需依据呼应信息,继续处理成功或许失利逻辑即可。可是没有呼应这种场景比较难处理,这是由于没有呼应或许包括以下状况:

(1) 生产者底子没有接收到恳求
(2) 生产者接收到恳求而且已处理成功,可是顾客没有接收到呼应
(3) 生产者接收到恳求而且已处理失利,可是顾客没有接收到呼应

假定你是一名RPC框架设计者,究竟是挑选重试仍是抛弃调用呢?其实终究怎么挑选取决于事务特性,有的事务自身就具有幂等性,可是有的事务不能允许重试否则会形成重复数据。

那么谁对事务特性最了解呢?答案是顾客,由于顾客作为调用方必定最了解自身事务,所以RPC框架只需供给一些战略供顾客挑选即可。


2 怎么做重试

2.1 集群容错战略

DUBBO作为一款优异RPC框架,供给了如下集群容错战略供顾客挑选:

Failover: 毛病搬运
Failfast: 快速失利
Failsafe: 安全失利
Failback: 异步重试
Forking:  并行调用
Broadcast:播送调用

(1) Failover

毛病搬运战略。作为默认战略当消费发生反常时经过负载均衡战略再挑选一个生产者节点进行调用,直到到达重试次数

(2) Failfast

快速失利战略。顾客只消费一次服务,当发生反常时则直接抛出

(3) Failsafe

安全失利战略。顾客只消费一次服务,假如消费失利则包装一个空成果,不抛出反常

(4) Failback

异步重试战略。当消费发生反常时回来一个空成果,失利恳求将会进行异步重试。假如重试超过最大重试次数还不成功,抛弃重试并不抛出反常

(5) Forking

并行调用战略。顾客经过线程池并发调用多个生产者,只需有一个成功就算成功

(6) Broadcast

播送调用战略。顾客遍历调用一切生产者节点,任何一个出现反常则抛出反常


2.2 源码剖析

2.2.1 Failover

Failover毛病搬运战略作为默认战略,当消费发生反常时经过负载均衡战略再挑选一个生产者节点进行调用,直到到达重试次数。即便事务代码没有显现重试,也有或许屡次履行消费逻辑从而形成重复数据:

public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    public FailoverClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 一切生产者Invokers
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 获取重试次数
        int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        RpcException le = null;
        // 现已调用过的生产者
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size());
        Set<String> providers = new HashSet<String>(len);
        // 重试直到到达最大次数
        for (int i = 0; i < len; i++) {
            if (i > 0) {
                // 假如当时实例被销毁则抛出反常
                checkWhetherDestroyed();
                // 依据路由战略选出可用生产者Invokers
                copyInvokers = list(invocation);
                // 从头查看
                checkInvokers(copyInvokers, invocation);
            }
            // 负载均衡挑选一个生产者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                // 服务消费建议长途调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le);
                }
                // 有成果则回来
                return result;
            } catch (RpcException e) {
                // 事务反常直接抛出
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                // RpcException不抛出继续重试
                le = new RpcException(e.getMessage(), e);
            } finally {
                // 保存现已访问过的生产者
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le);
    }
}

顾客调用生产者节点A发生RpcException反常时(例如超时反常),在未到达最大重试次数之前,顾客会经过负载均衡战略再次挑选其它生产者节点消费。试想假如生产者节点A其实现已处理成功了,可是没有及时将成功成果回来给顾客,那么再次重试或许就会形成重复数据问题。


2.2.2 Failfast

快速失利战略。顾客只消费一次服务,当发生反常时则直接抛出,不会进行重试:

public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    public FailfastClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 查看生产者Invokers是否合法
        checkInvokers(invokers, invocation);
        // 负载均衡挑选一个生产者Invoker
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            // 服务消费建议长途调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 服务消费失利不重试直接抛出反常
            if (e instanceof RpcException && ((RpcException) e).isBiz()) {
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0,
                                   "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName()
                                   + " select from all providers " + invokers + " for service " + getInterface().getName()
                                   + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost()
                                   + " use dubbo version " + Version.getVersion()
                                   + ", but no luck to perform the invocation. Last error is: " + e.getMessage(),
                                   e.getCause() != null ? e.getCause() : e);
        }
    }
}

2.2.3 Failsafe

安全失利战略。顾客只消费一次服务,假如消费失利则包装一个空成果,不抛出反常,不会进行重试:

public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
    public FailsafeClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            // 查看生产者Invokers是否合法
            checkInvokers(invokers, invocation);
            // 负载均衡挑选一个生产者Invoker
            Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
            // 服务消费建议长途调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            // 消费失利包装为一个空成果目标
            logger.error("Failsafe ignore exception: " + e.getMessage(), e);
            return new RpcResult();
        }
    }
}

2.2.4 Failback

异步重试战略。当消费发生反常时回来一个空成果,失利恳求将会进行异步重试。假如重试超过最大重试次数还不成功,抛弃重试并不抛出反常:

public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
    private static final long RETRY_FAILED_PERIOD = 5;
    private final int retries;
    private final int failbackTasks;
    private volatile Timer failTimer;
    public FailbackClusterInvoker(Directory<T> directory) {
        super(directory);
        int retriesConfig = getUrl().getParameter(Constants.RETRIES_KEY, Constants.DEFAULT_FAILBACK_TIMES);
        if (retriesConfig <= 0) {
            retriesConfig = Constants.DEFAULT_FAILBACK_TIMES;
        }
        int failbackTasksConfig = getUrl().getParameter(Constants.FAIL_BACK_TASKS_KEY, Constants.DEFAULT_FAILBACK_TASKS);
        if (failbackTasksConfig <= 0) {
            failbackTasksConfig = Constants.DEFAULT_FAILBACK_TASKS;
        }
        retries = retriesConfig;
        failbackTasks = failbackTasksConfig;
    }
    private void addFailed(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker) {
        if (failTimer == null) {
            synchronized (this) {
                if (failTimer == null) {
                    // 创立守时器
                    failTimer = new HashedWheelTimer(new NamedThreadFactory("failback-cluster-timer", true), 1, TimeUnit.SECONDS, 32, failbackTasks);
                }
            }
        }
        // 构造守时任务
        RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
        try {
            // 守时任务放入守时器等待履行
            failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
        } catch (Throwable e) {
            logger.error("Failback background works error,invocation->" + invocation + ", exception: " + e.getMessage());
        }
    }
    @Override
    protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        Invoker<T> invoker = null;
        try {
            // 查看生产者Invokers是否合法
            checkInvokers(invokers, invocation);
            // 担任均衡挑选一个生产者Invoker
            invoker = select(loadbalance, invocation, invokers, null);
            // 消费服务建议长途调用
            return invoker.invoke(invocation);
        } catch (Throwable e) {
            logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);
            // 假如服务消费失利则记载失利恳求
            addFailed(loadbalance, invocation, invokers, invoker);
            // 回来空成果
            return new RpcResult();
        }
    }
    @Override
    public void destroy() {
        super.destroy();
        if (failTimer != null) {
            failTimer.stop();
        }
    }
    /**
     * RetryTimerTask
     */
    private class RetryTimerTask implements TimerTask {
        private final Invocation invocation;
        private final LoadBalance loadbalance;
        private final List<Invoker<T>> invokers;
        private final int retries;
        private final long tick;
        private Invoker<T> lastInvoker;
        private int retryTimes = 0;
        RetryTimerTask(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, Invoker<T> lastInvoker, int retries, long tick) {
            this.loadbalance = loadbalance;
            this.invocation = invocation;
            this.invokers = invokers;
            this.retries = retries;
            this.tick = tick;
            this.lastInvoker = lastInvoker;
        }
        @Override
        public void run(Timeout timeout) {
            try {
                // 负载均衡挑选一个生产者Invoker
                Invoker<T> retryInvoker = select(loadbalance, invocation, invokers, Collections.singletonList(lastInvoker));
                lastInvoker = retryInvoker;
                // 服务消费建议长途调用
                retryInvoker.invoke(invocation);
            } catch (Throwable e) {
                logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                // 超出最大重试次数记载日志不抛出反常
                if ((++retryTimes) >= retries) {
                    logger.error("Failed retry times exceed threshold (" + retries + "), We have to abandon, invocation->" + invocation);
                } else {
                    // 未超出最大重试次数从头放入守时器
                    rePut(timeout);
                }
            }
        }
        private void rePut(Timeout timeout) {
            if (timeout == null) {
                return;
            }
            Timer timer = timeout.timer();
            if (timer.isStop() || timeout.isCancelled()) {
                return;
            }
            timer.newTimeout(timeout.task(), tick, TimeUnit.SECONDS);
        }
    }
}

2.2.5 Forking

并行调用战略。顾客经过线程池并发调用多个生产者,只需有一个成功就算成功:

public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
    public ForkingClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        try {
            checkInvokers(invokers, invocation);
            final List<Invoker<T>> selected;
            // 获取配置参数
            final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
            final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            // 获取并行履行的Invoker列表
            if (forks <= 0 || forks >= invokers.size()) {
                selected = invokers;
            } else {
                selected = new ArrayList<>();
                for (int i = 0; i < forks; i++) {
                    // 挑选生产者
                    Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                    // 避免重复增加Invoker
                    if (!selected.contains(invoker)) {
                        selected.add(invoker);
                    }
                }
            }
            RpcContext.getContext().setInvokers((List) selected);
            final AtomicInteger count = new AtomicInteger();
            final BlockingQueue<Object> ref = new LinkedBlockingQueue<>();
            for (final Invoker<T> invoker : selected) {
                // 在线程池中并发履行
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 履行消费逻辑
                            Result result = invoker.invoke(invocation);
                            // 存储消费成果
                            ref.offer(result);
                        } catch (Throwable e) {
                            // 假如反常次数大于等于forks参数值阐明悉数调用失利,则把反常放入行列
                            int value = count.incrementAndGet();
                            if (value >= selected.size()) {
                                ref.offer(e);
                            }
                        }
                    }
                });
            }
            try {
                // 从行列获取成果
                Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                // 假如反常类型表示悉数调用失利则抛出反常
                if (ret instanceof Throwable) {
                    Throwable e = (Throwable) ret;
                    throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                }
                return (Result) ret;
            } catch (InterruptedException e) {
                throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
            }
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }
}

2.2.6 Broadcast

播送调用战略。顾客遍历调用一切生产者节点,任何一个出现反常则抛出反常:

public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
    public BroadcastClusterInvoker(Directory<T> directory) {
        super(directory);
    }
    @Override
    public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        RpcContext.getContext().setInvokers((List) invokers);
        RpcException exception = null;
        Result result = null;
        // 遍历调用一切生产者节点
        for (Invoker<T> invoker : invokers) {
            try {
                // 履行消费逻辑
                result = invoker.invoke(invocation);
            } catch (RpcException e) {
                exception = e;
                logger.warn(e.getMessage(), e);
            } catch (Throwable e) {
                exception = new RpcException(e.getMessage(), e);
                logger.warn(e.getMessage(), e);
            }
        }
        // 任何一个出现反常则抛出反常
        if (exception != null) {
            throw exception;
        }
        return result;
    }
}

3 怎么做幂等

经过上述剖析咱们知道,RPC框架自带的重试机制或许会形成数据重复问题,那么在使用中有必要考虑幂等性。幂等性是指一次操作与屡次操作发生成果相同,并不会由于屡次操作而发生不一致性。常见幂等计划有撤销重试、幂等表、数据库锁、状况机。


3.1 撤销重试

撤销重试有两种办法,第一是设置重试次数为零,第二是挑选不重试的集群容错战略。

<!-- 设置重试次数为零 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" retries="0" />
<!-- 挑选集群容错计划 -->
<dubbo:reference id="helloService" interface="com.java.front.dubbo.demo.provider.HelloService" cluster="failfast" />

3.2 幂等表

假定用户付出成功后,付出体系将付出成功音讯,发送至音讯行列。物流体系订阅到这个音讯,预备为这笔订单创立物流单。

可是音讯行列或许会重复推送,物流体系有或许接收到屡次这条音讯。咱们期望到达作用是:不管接收到多少条重复音讯,只能创立一笔物流单。

解决计划是幂等表计划。新建一张幂等表,该表便是用来做幂等,无其它事务含义,有一个字段名为key建有仅有索引,这个字段是幂等标准。

物流体系订阅到音讯后,首要尝试刺进幂等表,订单编号作为key字段。假如成功则继续创立物流单,假如订单编号现已存在则违反仅有性原则,无法刺进成功,阐明现已进行过事务处理,丢掉音讯。

这张表数据量会比较大,咱们能够经过守时任务对数据进行归档,例如只保留7天数据,其它数据存入归档表。

还有一种广义幂等表便是咱们能够用Redis代替数据库,在创立物流单之前,咱们能够查看Redis是否存在该订单编号数据,一同能够为这类数据设置7天过期时刻。


3.3 状况机

物流单创立成功后会发送音讯,订单体系订阅到音讯后更新状况为完成,假定变更是将订单状况0更新至状况1。订单体系也或许收到多条音讯,或许在状况现已被更新至状况1之后,仍然收到物流单创立成功音讯。

解决计划是状况机计划。首要制作状况机图,剖析状况流通形态。例如经过剖析状况1现已是终究态,那么即便接收到物流单创立成功音讯也不再处理,丢掉音讯。


3.4 数据库锁

数据库锁又能够分为悲观锁和达观锁两种类型,悲观锁是在获取数据时加锁:

select * from table where col='xxx' for update

达观锁是在更新时加锁,第一步首要查出数据,数据包括version字段。第二步进行更新操作,假如此刻记载现已被修改则version字段现已发生变化,无法更新成功:

update table set xxx,
version = #{version} + 1 
where id = #{id} 
and version = #{version}

4 文章总结

本文首要剖析了为什么重试这个问题,由于关于RPC交互无呼应场景,重试是一种重要挑选。然后剖析了DUBBO供给的六种集群容错战略,Failover作为默认战略供给了重试机制,在事务代码没有显现重试状况下,仍有或许建议屡次调用,这有必要引起注重。最终咱们剖析了几种常用幂等计划,期望本文对咱们有所协助。


欢迎咱们注重大众号「JAVA前哨」查看更多精彩共享文章,首要包括源码剖析、实践使用、架构思想、职场共享、产品考虑等等,一同欢迎咱们加我个人微信「java_front」一同交流学习