今天继续上次未完成的第二部分,话说当请求到达 WebHandler,从 handle 开始处理。
SoulWebHandler
SoulWebHandler#handle
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
使用责任链,使用 new DefaultSoulPluginChain(plugins)
构建 PluginChain
,之后执行 execute
方法;
DefaultSoulPluginChain#execute
private static class DefaultSoulPluginChain implements SoulPluginChain {
private int index;
private final List<SoulPlugin> plugins;
/**
* Instantiates a new Default soul plugin chain.
*
* @param plugins the plugins
*/
DefaultSoulPluginChain(final List<SoulPlugin> plugins) {
this.plugins = plugins;
}
/**
* Delegate to the next {@code WebFilter} in the chain.
*
* @param exchange the current server exchange
* @return {@code Mono<Void>} to indicate when request handling is complete
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
AbstractSoulPlugin#execute
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
// 查找对应的 selectorData
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
AbstractSoulPlugin#matchSelector
private SelectorData matchSelector(final ServerWebExchange exchange, final Collection<SelectorData> selectors) {
return selectors.stream()
.filter(selector -> selector.getEnabled() && filterSelector(selector, exchange))
.findFirst().orElse(null);
}
整体请求流程如下
AbstractSoulPlugin#filterSelector
private Boolean filterSelector(final SelectorData selector, final ServerWebExchange exchange) {
if (selector.getType() == SelectorTypeEnum.CUSTOM_FLOW.getCode()) {
if (CollectionUtils.isEmpty(selector.getConditionList())) {
return false;
}
return MatchStrategyUtils.match(selector.getMatchMode(), selector.getConditionList(), exchange);
}
return true;
}
MatchStrategyUtils#match
public static boolean match(final Integer strategy, final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
String matchMode = MatchModeEnum.getMatchModeByCode(strategy);
MatchStrategy matchStrategy = ExtensionLoader.getExtensionLoader(MatchStrategy.class).getJoin(matchMode);
return matchStrategy.match(conditionDataList, exchange);
}
AndMatchStrategy#match
@Override
public Boolean match(final List<ConditionData> conditionDataList, final ServerWebExchange exchange) {
return conditionDataList
.stream()
.allMatch(condition -> OperatorJudgeFactory.judge(condition, buildRealData(condition, exchange)));
}
AbstractMatchStrategy#buildRealData
String buildRealData(final ConditionData condition, final ServerWebExchange exchange) {
String realData = "";
ParamTypeEnum paramTypeEnum = ParamTypeEnum.getParamTypeEnumByName(condition.getParamType());
switch (paramTypeEnum) {
case HEADER:
final HttpHeaders headers = exchange.getRequest().getHeaders();
final List<String> list = headers.get(condition.getParamName());
if (CollectionUtils.isEmpty(list)) {
return realData;
}
realData = Objects.requireNonNull(headers.get(condition.getParamName())).stream().findFirst().orElse("");
break;
case URI:
realData = exchange.getRequest().getURI().getPath();
break;
case QUERY:
final MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
realData = queryParams.getFirst(condition.getParamName());
break;
case HOST:
realData = HostAddressUtils.acquireHost(exchange);
break;
case IP:
realData = HostAddressUtils.acquireIp(exchange);
break;
case POST:
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
realData = (String) ReflectUtils.getFieldValue(soulContext, condition.getParamName());
break;
default:
break;
}
return realData;
}
OperatorJudgeFactory#judge
public static Boolean judge(final ConditionData conditionData, final String realData) {
if (Objects.isNull(conditionData) || StringUtils.isBlank(realData)) {
return false;
}
return OPERATOR_JUDGE_MAP.get(conditionData.getOperator()).judge(conditionData, realData);
}
MatchOperatorJudge#judge
@Override
public Boolean judge(final ConditionData conditionData, final String realData) {
if (Objects.equals(ParamTypeEnum.URI.getName(), conditionData.getParamType())) {
return PathMatchUtils.match(conditionData.getParamValue().trim(), realData);
}
return realData.contains(conditionData.getParamValue().trim());
}
PathMatchUtils#match
public class PathMatchUtils {
private static final AntPathMatcher MATCHER = new AntPathMatcher();
/**
* Match boolean.
*
* @param matchUrls the ignore urls
* @param path the path
* @return the boolean
*/
public static boolean match(final String matchUrls, final String path) {
return Splitter.on(",").omitEmptyStrings().trimResults().splitToList(matchUrls).stream().anyMatch(url -> reg(url, path));
}
private static boolean reg(final String pattern, final String path) {
return MATCHER.match(pattern, path);
}
}
此处我们就可以发现使用的 AntPathMater 的校验规则,今天可能到这里又要和大家说晚安了,剩下的明天继续吧!
Reactor 知识点
在介绍 publishOn
和 subscribeOn
方法之前,需要先介绍 Scheduler
这个概念。在 Reactor 中,Scheduler
用来定义执行调度任务的抽象。可以简单理解为线程池,但其实际作用要更多。先简单介绍 Scheduler
的实现:
Schedulers.elastic()
: 调度器会动态创建工作线程,线程数无上界,类似于Execturos.newCachedThreadPool()
Schedulers.parallel()
: 创建固定线程数的调度器,默认线程数等于 CPU 核心数。
Reactor 之 subscribeOn
Flux<User> blockingRepositoryToFlux(BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.elastic());
}
repository.findAll()
(也包括 Flux.fromIterable
)的执行发生在 Schedulers.elastic()
所定义的线程池中。
Reactor 之 publishOn
Mono<Void> fluxToBlockingRepository(Flux<User> flux,
BlockingRepository<User> repository) {
return flux
.publishOn(Schedulers.elastic())
.doOnNext(repository::save)
.then();
}
执行了 publishOn(Schedulers.elastic())
之后,repository::save
就会被 Schedulers.elastic()
定义的线程池所执行。
两者的区别
publishOn
影响在其之后的 operator 执行的线程池,而 subscribeOn
则会从源头影响整个执行过程。所以,publishOn
的影响范围和它的位置有关,而 subscribeOn
的影响范围则和位置无关。
Flux.just("tom")
.map(s -> {
System.out.println("[map] Thread name: " + Thread.currentThread().getName());
return s.concat("@mail.com");
})
.publishOn(Schedulers.newElastic("thread-publishOn"))
.filter(s -> {
System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
return s.startsWith("t");
})
.subscribeOn(Schedulers.newElastic("thread-subscribeOn"))
.subscribe(s -> {
System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
System.out.println(s);
});
输出结果如下:
[map] Thread name: thread-subscribeOn-3
[filter] Thread name: thread-publishOn-4
[subscribe] Thread name: thread-publishOn-4
tom@mail.com
通过 publishOn
和 subscribeOn
就在反应式编程中实现了线程池隔离的目的,一定程度上避免了会导致线程阻塞的程序执行影响到反应式编程的程序执行效率。