# Dubbo Filter 加载机制
本次分析的版本号为2.7.7
# 入口
org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
/**
* 构造调用链
*
* @param invoker invoke
* @param key 固定值,service|reference.filter
* @param group provider
* @param <T>
* @return
*/
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 基于SPI机制获取已经激活的Filter,并按order排序
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
// 遍历所有filter
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
// 装饰器模式,将invoker用filter逐层进行包装
// 这也就引出了一个问题,基于filter包装invoker,如果没有filter,那么会无法进行请求调用
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result asyncResult;
try {
asyncResult = filter.invoke(next, invocation);
} catch (Exception e) {
// 异常处理1,如果filter继承了抽象类ListenableFilter,就调用抽象类实现的onError方法
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
try {
Filter.Listener listener = listenableFilter.listener(invocation);
if (listener != null) {
listener.onError(e, invoker, invocation);
}
} finally {
listenableFilter.removeListener(invocation);
}
// 异常处理2,如果filter实现了接口Filter.Listener,就调用抽象类实现的onError方法
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
listener.onError(e, invoker, invocation);
}
throw e;
} finally {
}
return asyncResult.whenCompleteWithContext((r, t) -> {
if (filter instanceof ListenableFilter) {
ListenableFilter listenableFilter = ((ListenableFilter) filter);
Filter.Listener listener = listenableFilter.listener(invocation);
try {
if (listener != null) {
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
} finally {
listenableFilter.removeListener(invocation);
}
} else if (filter instanceof Filter.Listener) {
Filter.Listener listener = (Filter.Listener) filter;
if (t == null) {
listener.onResponse(r, invoker, invocation);
} else {
listener.onError(t, invoker, invocation);
}
}
});
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
@Override
public int getDefaultPort() {
return protocol.getDefaultPort();
}
/**
* 服务端暴露服务
*
* @param invoker
* @param <T>
* @return
* @throws RpcException
*/
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
/**
* 客户端引用服务
*
* @param invoker
* @param <T>
* @return
* @throws RpcException
*/
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (UrlUtils.isRegistry(url)) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
@Override
public void destroy() {
protocol.destroy();
}
@Override
public List<ProtocolServer> getServers() {
return protocol.getServers();
}
}
# 流程分析
# 基于SPI机制加载Filter
# ExtensionLoader.getActivateExtension(URL url, String key, String group)
- key为url中的
service|reference.filter
参数,即配置文件中的dubbo.provider|consumer.filter
- 调用方法
ExtensionLoader.getActivateExtension(URL url, String[] values, String group)
- 参数1:调用的url 参数2:key参数以逗号分隔 参数3:分组,参数为:
provider|consumer
- 参数1:调用的url 参数2:key参数以逗号分隔 参数3:分组,参数为:
public class ExtensionLoader<T> {
// other method
public List<T> getActivateExtension(URL url, String key, String group) {
// 获取url中的service.filter参数
String value = url.getParameter(key);
// 参数1:调用的url 参数2:key参数以逗号分隔 参数3:分组,参数为:provider|consumer
return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
}
// other method
}
# ExtensionLoader.getActivateExtension(URL url, String[] values, String group)
- 第一轮加载
- 判断
values
(将key以逗号分隔的数组)中是否有-default
参数。- 包含
-default
参数,不加载所有默认的filter
,以及自定义的filter
- 不包含
-default
参数- 在
METE-INF/dubbo/internal/org.apache.dubbo.rpc.Filter
读取内置的filter配置 - 在
METE-INF/dubbo/com.alibaba.dubbo.rpc.Filter
读取自定义filter配置
- 在
- 包含
- 过滤掉没有写Activate注解的filter
- 匹配是提供者侧或消费者侧的filter(单次只加载一侧的filter)
- 过滤掉在配置文件中指定要激活的filter,比如配置了
dubbo.provider.filter=xxxFilter
,那么xxxFilter
在这一轮不加载 - 过滤掉名称前带有'-'号的filter
- 过滤掉没有携带指定参数filter,比如
org.apache.dubbo.validation.filter.ValidationFilter
要求url中带有参数validation
才进行激活 - 对加载出来的filter进行排序
- 第二轮加载
- 加载在配置文件中指定要激活的filter
- 加载指定名称的filter,若名称配置为true,加载所有默认的和自定义的filter
- 如果配置配置了default,把default前面的自定义filter放在系统的filter前面执行,即如果配置了
dubbo.provider.filter=xxxFilter,default,xxFilter2
,那么先执行xxxFilter
,然后是系统内置的和自定义默认激活的filter,然后是xxFilter2
public class ExtensionLoader<T> {
// other method
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> activateExtensions = new ArrayList<>();
// name,配置文件的配置数据,以逗号分隔的数组
// 比如dubbo.provider.filter=xxxFilter,xxxFilter2, 那么names = ["xxxFilter", "xxxFilter2"]
List<String> names = values == null ? new ArrayList<>(0) : asList(values);
// 第一轮加载
// 如果service.filter有-default,那么不加载所有的filter,这一轮加载跳过
if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
// 加载所有默认的filter, 包含内置的和自定义的filter
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
} else {
// 1. 过滤掉没写注解的filter
continue;
}
// 分组配置,消费者侧或提供者侧
if (isMatchGroup(group, activateGroup)
// 未在配置文件中指定激活,比如如果了配置dubbo.provider.filter=xxxFilter,那么xxxFilter不在这里加载
&& !names.contains(name)
// 跳过名称前带了'-'号的的filter
&& !names.contains(REMOVE_VALUE_PREFIX + name)
// 判断是否激活,指的是url中带了指定参数才激活filter
// 比如org.apache.dubbo.validation.filter.ValidationFilter要求url中带有参数validation才进行激活
&& isActive(activateValue, url)) {
activateExtensions.add(getExtension(name));
}
}
// 排序,order小的优先执行
activateExtensions.sort(ActivateComparator.COMPARATOR);
}
// 第二轮加载
List<T> loadedExtensions = new ArrayList<>();
// 遍历配置文件的配置name
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
// 名称以'-'开头的不加载
if (!name.startsWith(REMOVE_VALUE_PREFIX)
// 也不在以'-'开头的名称中
&& !names.contains(REMOVE_VALUE_PREFIX + name)) {
// 如果定义在default前面的自定义过滤器,加在系统过滤器前面
if (DEFAULT_KEY.equals(name)) {
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(0, loadedExtensions);
loadedExtensions.clear();
}
} else {
// 加载指定名称的filter
loadedExtensions.add(getExtension(name));
}
}
}
// 激活所有第二轮加载的filter,放在系统或自定义激活的过滤器后执行
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(loadedExtensions);
}
return activateExtensions;
}
/**
* 加载指定名称的filter
*
* @param name 指定的名称
* @return the filter
*/
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
// 如果名称是true,那么加载所有内置的,和自定义的filter
if ("true".equals(name)) {
return getDefaultExtension();
}
// 加载指定名称的filter
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
// other method
}
# 排序
排序所有已加载的filter,order小的优先执行。
Activate.before, Activate.after已被官方加上过期声明注解@Deprecated,本次不讨论
public class ActivateComparator implements Comparator<Object> {
// other method
public int compare(Object o1, Object o2) {
// doSomething
ActivateInfo a1 = parseActivate(o1.getClass());
ActivateInfo a2 = parseActivate(o2.getClass());
// doSomething
int n1 = a1 == null ? 0 : a1.order;
int n2 = a2 == null ? 0 : a2.order;
// never return 0 even if n1 equals n2, otherwise, o1 and o2 will override each other in collection like HashSet
return n1 > n2 ? 1 : -1;
}
private ActivateInfo parseActivate(Class<?> clazz) {
ActivateInfo info = new ActivateInfo();
if (clazz.isAnnotationPresent(Activate.class)) {
Activate activate = clazz.getAnnotation(Activate.class);
info.before = activate.before();
info.after = activate.after();
info.order = activate.order();
} else {
com.alibaba.dubbo.common.extension.Activate activate = clazz.getAnnotation(
com.alibaba.dubbo.common.extension.Activate.class);
info.before = activate.before();
info.after = activate.after();
info.order = activate.order();
}
return info;
}
// other method
}
# 遍历filter,使用装饰者模式将invoker进行装饰
# 执行请求:filter.invoke
# 异常回调
- 如果Filter继承了抽象类
org.apache.dubbo.rpc.ListenableFilter
,调用继承类的onError方法 - 如果Filter实现了接口
org.apache.dubbo.rpc.Filter.Listener
,调用实现类的onError方法
# 结论
# 第一轮加载
- 不加任何配置的情况下,系统默认加载所有内置的和自定义的filter,自定义的filter需要在
METE-INF/dubbo/com.alibaba.dubbo.rpc.Filter
中定义,即可生效 - 若需不使用某个filter,在配置文件中加上
-xxxFilter
,如不使用全部的filter,在配置文件中加上-default
- 基于order排序,序号小的靠前
# 第二轮加载
- 遍历配置文件的配置,若配置了filter的名称为
true
, 则不管第一轮是否进行加载,也会去加载所有内置的和自定义默认的filter,不管是否激活,不建议使用该配置 - 加载指定名称的filter,内置的,自定义的都可以加载, 不判断是否添加了Activate注解
- 第二轮加载的filter不进行排序,按照定义顺序,default前的放在第一轮加载的filter前执行,default后的或者没有配置default的放在第一轮加载的filter后执行
# 应用场景
- 对于需要默认激活的filter,添加Activate注解,用户通过配置
dubbo.provider.filter=-xxxFilter
按需移除 - 对于不需要默认激活的filter,不添加Activate注解,用户通过配置
dubbo.provider.filter=xxxFilter
按需使用- 这就引出了一个场景,如果需要添加指定依赖才能生效的filter, 让用户自行选择是否添加依赖,使用该filter
- 如果要执行某一个filter优先于所有filter执行
dubbo.provider.filter=xxxFilter,default,xxxFilter2
- 加载
xxxFilter
- 加载内置的和其他自定义的filter,按order排序
- 加载
xxxFilter2
- 按加载顺序执行
- 加载