在呼应式编程中,多线程异步性成为天然的内涵,多线程之间的切换也成为原生的,在处理一个数据流Flux/Mono时,根本无法知道是运转在哪个线程上或哪个线程池里,能够说,每一个操作符operator以及内部的函数都或许运转在不同的线程上。这就意味着,以前用ThreadLocal来作为办法间通明传递同享变量的办法不再行得通。为此,Reactor供给了Context来代替ThreadLocal完成一个跨线程的同享变量的通明办法
本文会从以下几个方面来介绍Context的相关知识:

  1. context的根本用法
  2. 从源码上解读context的用法
  3. 用log的MDC案例介绍怎么用context完成与threadlocal的桥接
  4. 总结下context以及现在的一些局限性

一、运用介绍

static String KEY = "TEST_CONTEXT_KEY";
static String KEY2 = "TEST_CONTEXT_KEY2";
public static void main(String[] args) {
	Flux<String> flux = convert("hello", Flux.just(1, 2, 3));
	flux
    	.subscriberContext(Context.of(KEY, "Outside"))
    	.subscribe(v -> System.out.println(v));
}
public static Flux<String> convert(String prefix, Flux<Integer> publisher) {
    return publisher.map(v -> prefix + " " + v)
        .subscriberContext(Context.of(KEY, "NotUsed"))
        .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v))
        .subscriberContext(context -> context.put(KEY2, "Inside"))
        .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + v));
}

上面是context的运用计划介绍,其输出如下:

Outside Outside Inside hello 1
Outside Outside Inside hello 2
Outside Outside Inside hello 3

上面的运用案例展示了一个运用context的常见比如。经过在外部办法里传入context,如flux.subscriberContext(Context.of(KEY, "Outside")),使得内部办法convert能够获取外界环境的context,一起内部办法还能够增加自己的context数据,如subscriberContext(context -> context.put(KEY2, "Inside")),结合之后,在让内部的办法(flatMap里的办法)感知到整个上下文context的数据内容。

关于context的运用,首要分为几个部分: 1. context的创立 2. context的写入(传入)与读取 3. 履行次序

1. context —— 不可变目标

由于reactor天然是跨线程的,所以context设计为了不可变的目标,即每次的更新都是创立一个新的目标。每次的put/putAll操作,都是先把旧目标的值仿制到新目标,然后再进行put/putAll等更新操作。

2. context的写入与读取

context写入是运用subscriberContext办法,其入参有两种办法:传值办法subscriberContext(ctx)与lambda函数办法 —— subscriberContext(ctx -> ctx.put(key,value))。
context的读取是运用Mono的静态办法subscriberContext()来获取,由于其回来的是一个Mono, 所以一般与flatMap结合运用。

3. 履行次序

context的传入是发生在subscribe()订阅阶段的,所以其写入的次序是从下往上的,即在示例中,先履行subscriberContext(Context.of(KEY, "Outside")),再履行subscriberContext(context -> context.put(KEY2, "Inside")), 最后履行subscriberContext(Context.of(KEY, "NotUsed"))
在订阅阶段履行完后,进入运转阶段,数据流从上往下履行,每次读取context的时分Mono.subscriberContext()都是读取下一个的context。所以”NotUsed”的context并没有生效。
此外,context.put()操作是仿制旧的再update新的目标,所以Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v)这个阶段仍能读取前一个context关于KEY的内容。

总结

  1. context是不可变目标,每次更新都是新的context
  2. context是存在于subscriber的内部的,一个context是绑定在当时subscriber上的,如FluxContextStart的目标
  3. context的写入次序是从下而上的,读取的时分是从上而下的,只能读取之后的subscriber里的context。
  4. 每个subscriber中的context都是独有的,运转阶段的时分,无法改动其他subscriber的context。

注意

  1. subscriberContext(Context.of("Outside")subscriberContext(context -> Context.of("Outside"))是有区别,前者是会结合复用前面的context,而后者是直接回来一个新的context并不会复用前面的context。
    其原因是,subscriberContext(Context.of("Outside")) 其实内部调用的是subscriberContext(context -> context.putAll(Context.of("Outside")),其入参的context便是前面的context,putAll办法会复用前面的context。而 subscriberContext(context -> Context.of(“Outside”))不复用的原因便是由于抛弃了入参的context。所以,能够运用这种办法来抛弃之前的context,当然不鼓舞这么做,由于你不清楚之前context会不会影响后续的程序。

  2. 本文章的代码用的事reactor 3.3的版别,自3.5之后,subscriberContext办法改为contextWrite,读取的办法改为deferContextual

源码解读

现在咱们从源代码上看看,context写入为什么是自下而上的,读取的时分又是依附于下一个subscriber并且自上而下的。

public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) {
	return new FluxContextStart<>(this, doOnContext);
}
FluxContextStart(Flux<? extends T> source, Function<Context, Context> doOnContext) {
	super(source);
	this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext");
}
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
    Context c = doOnContext.apply(actual.currentContext());
    return new ContextStartSubscriber<>(actual, c);
}
ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) {
    this.actual = actual;
    this.context = context;
    if (actual instanceof ConditionalSubscriber) {
        this.actualConditional = (ConditionalSubscriber<? super T>) actual;
    }
    else {
        this.actualConditional = null;
    }
}
@Override
public Context currentContext() {
    return this.context;
}

上面截取了subscriberContext办法的源代码,能够看到subscriberContext办法最终会创立ContextStartSubscriber的目标,并将生成的context赋值Context c = doOnContext.apply(actual.currentContext()),所以context是随同subscriberContext办法对应的subscriber里的。
由于context赋值操作Context c = doOnContext.apply(actual.currentContext())是发生在subscribeOrReturn办法里,即发生在subscribe()订阅阶段,所以整个履行的次序是自下而上的(沿着整个flow自下而上至源头的publisher)
那读取context的时分为什么是自上而下的呢?咱们来看下读取操作Mono.subscribeContext()的源码。

public static Mono<Context> subscriberContext() {
    return onAssembly(MonoCurrentContext.INSTANCE);
}
final class MonoCurrentContext extends Mono<Context>
		implements Fuseable, Scannable {
	static final MonoCurrentContext INSTANCE = new MonoCurrentContext();
	public void subscribe(CoreSubscriber<? super Context> actual) {
		Context ctx = actual.currentContext();
		actual.onSubscribe(Operators.scalarSubscription(actual, ctx));
	}
}
interface InnerOperator<I, O>
		extends InnerConsumer<I>, InnerProducer<O> {
	@Override
	default Context currentContext() {
		return actual().currentContext();
	}
}

Mono.subscribeContext()办法回来的是一个MonoCurrentContext的静态目标,在订阅subscribe时期,就会去读取当时的context,即Context ctx = actual.currentContext()。而关于一个InnerOperator的接口而言,其currentContext()办法会不断寻找下一个subscriber的context,即 actual().currentContext(),直到有哪个subscriber覆写了currentContext办法,如先前的ContextStartSubscriber目标。关于InnerOperator接口,是大多数subscriber都会完成的接口,例如map、filter、flatmap这些,都会完成这个接口。
在找到context之后,经过Operators.scalarSubscription(actual, ctx)写入,这个办法其实也是Mono.just()的完成,所以相当于把context作为value,生成了一个Mono.just(ctx)来完成了context读取。
所以,context读取的是从当时操作operator之后的那个最接近的subscriber的context。这也解释了前面运用案例中,subscriberContext(Context.of(KEY, "NotUsed")),没有作用的原因。

三、怎么桥接现有的ThreadLocal系统

虽然reactor供给了context来代替ThreadLocal的运用,但现在大多数的代码库依然是指令式编程的,运用的办法依然是根据ThreadLocal的,如Logger里的MDC。本末节以Logger中的MDC来介绍,怎么运用context完成与旧系统中的根据ThreadLocal办法的打通。
咱们假设有这样的一个场景,每一次的Http恳求都有一个trace id,咱们称为request id,并经过Http Header “X-Request-Id”来命名,打印日志的时分,期望每条日志里都包括恳求id,这样方便盯梢整个恳求链路的状况。
为此,咱们把日志配置里的pattern设置为:[%X{X-Request-Id}] [%thread] %-5level - %msg %n
能够在SpringBootapplication.yml里设置,如:

logging.pattern.level: "[%X{X-Request-Id}] [%thread] %-5level - %msg %n"

因而,要使得每条日志里有request id,那就必须要MDC里有key为X-Request-Id的内容。下面来看下,reactor中是怎么完成的。

@SpringBootApplication
@Slf4j
@RestController
public class MdcApplication {
  public static void main(String[] args) {
    SpringApplication.run(MdcApplication.class, args);
  }
  private final static String X_REQUEST_ID_KEY = "X-Request-Id";
  @GetMapping("/")
  Flux<String> split(@RequestParam("value") String value, @RequestHeader(X_REQUEST_ID_KEY) String requestId) {
    return Flux.fromArray(value.split("_"))
        .doOnEach(logWithContext(ch -> log.info("handling one item: {}", ch)))
        .subscriberContext(Context.of(X_REQUEST_ID_KEY, requestId));
  }
  private static <T> Consumer<Signal<T>> logWithContext(Consumer<T> logStatement) {
    return signal -> {
      if (!signal.isOnNext()) {
        return;
      }
      String requestId = signal.getContext().get(X_REQUEST_ID_KEY);
      try (MDC.MDCCloseable closeable = MDC.putCloseable(X_REQUEST_ID_KEY, requestId)) {
        logStatement.accept(signal.get());
      }
    };
  }
}

这是一个简略的示例程序,关于恳求输入的value值经过”-“分割后,再一个个回来给客户端。首先运用subscriberContext办法,将http header里的X-Request-Id作为context来传入。然后运用doOnEach的办法获取signal。doOnEach的办法能够作业在onNext、onComplete、onError等所有事情,每一个信号signal里都包括有context,当为onNext则还包括value值,当为onError时,则还包括有exception。因而能够经过signal来获取context。
在从context获取X-Request-Id后,能够运用try-with-resource办法来更新MDC,其作用是在履行完try里面的程序后,将更新的value回退。等价于:

try {
	MDC.put(X_REQUEST_ID_KEY, requestId);
	logStatement.accept(signal.get());
} finally {
	MDC.remove(X_REQUEST_ID_KEY);
}

置于为什么需要操作完之后回退掉MDC中的更新,那是由于reactor中所有的操作都是异步履行在不同线程中的,假如不回退的话,很有或许造成污染,其原因仍是MDC内部是用ThreadLocal完成的,所以跨线程的时分,假如不把ThreadLocal值清理洁净,很容易造成互相污染。

用curl指令发送恳求:curl --header "X-Request-Id:12345" localhost:8080?value=a_b_c,回来的结果是abc,打印的日志如下:

[12345] [reactor-http-nio-2] INFO  - handling one item: a
[12345] [reactor-http-nio-2] INFO  - handling one item: b 
[12345] [reactor-http-nio-2] INFO  - handling one item: c 

其中12345便是从context里获取到的request id。

假如想要将request id持续贯穿后续恳求流程,如恳求第三方服务,能够在用webClient发送恳求的时分,把request id作为header加入到它的request恳求里,如:

Mono.subscriberContext().map(ctx -> {
    RequestHeadersSpec<?> request = webClient.get().uri(uri);       
    request = request.header("X-Request-ID", ctx.get(X_REQUEST_ID_KEY)); 
    // The rest of your request logic...    
});

四、总结

本文介绍了reactor中context的概念,并用代码示例的办法介绍了怎么运用。再然后,经过源码的解读来加深对context运用规则的了解:自下而上的context写入,以及与subscriber绑定后的自上而下的读取。 在这之后,用以传递并打印日志中包括request id的一个实际比如,来介绍怎么运用context与log的MDC一起运用。
虽然reactor自3.1开端供给了context来补偿无法运用ThreadLocal的缺乏,但与ThreaLocal相比,context依然有不少局限。比如运用上的不方便,要么运用Mono.subscribeContext().map并搭配flatmap来运用,要么需要将数据流转化成信号signal流来运用,总归远不如ThreadLocal来的简略易用。别的,context的不可变特性,虽然有助于thread safe,但使得不同办法之间无法传递更新,比如办法A内修改后再传递给办法B,由于context是只读的,但这在ThreadLocal上却是轻而易举就能完成。
好消息的是,reactor在3.5开端,供给了新的办法deferContextual来简化context的运用。以及提出了context view的概念来简化context传递问题,感兴趣的能够阅览reactor文档。