# Dubbo Filter 加载机制

本次分析的版本号为2.7.7

# 入口

org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper

public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    /**
     * 构造调用链
     * 
     * @param invoker invoke
     * @param key   固定值,service|reference.filter
     * @param group provider
     * @param <T>
     * @return
     */
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;

        // 基于SPI机制获取已经激活的Filter,并按order排序
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

        if (!filters.isEmpty()) {
            // 遍历所有filter
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                // 装饰器模式,将invoker用filter逐层进行包装
                // 这也就引出了一个问题,基于filter包装invoker,如果没有filter,那么会无法进行请求调用
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result asyncResult;
                        try {
                            asyncResult = filter.invoke(next, invocation);
                        } catch (Exception e) {
                            // 异常处理1,如果filter继承了抽象类ListenableFilter,就调用抽象类实现的onError方法
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                try {
                                    Filter.Listener listener = listenableFilter.listener(invocation);
                                    if (listener != null) {
                                        listener.onError(e, invoker, invocation);
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                                // 异常处理2,如果filter实现了接口Filter.Listener,就调用抽象类实现的onError方法
                            } else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                listener.onError(e, invoker, invocation);
                            }
                            throw e;
                        } finally {

                        }
                        return asyncResult.whenCompleteWithContext((r, t) -> {
                            if (filter instanceof ListenableFilter) {
                                ListenableFilter listenableFilter = ((ListenableFilter) filter);
                                Filter.Listener listener = listenableFilter.listener(invocation);
                                try {
                                    if (listener != null) {
                                        if (t == null) {
                                            listener.onResponse(r, invoker, invocation);
                                        } else {
                                            listener.onError(t, invoker, invocation);
                                        }
                                    }
                                } finally {
                                    listenableFilter.removeListener(invocation);
                                }
                            } else if (filter instanceof Filter.Listener) {
                                Filter.Listener listener = (Filter.Listener) filter;
                                if (t == null) {
                                    listener.onResponse(r, invoker, invocation);
                                } else {
                                    listener.onError(t, invoker, invocation);
                                }
                            }
                        });
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }

        return last;
    }

    @Override
    public int getDefaultPort() {
        return protocol.getDefaultPort();
    }

    /**
     * 服务端暴露服务
     * 
     * @param invoker
     * @param <T>
     * @return
     * @throws RpcException
     */
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }

    /**
     * 客户端引用服务
     *
     * @param invoker
     * @param <T>
     * @return
     * @throws RpcException
     */
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        if (UrlUtils.isRegistry(url)) {
            return protocol.refer(type, url);
        }
        return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
    }

    @Override
    public void destroy() {
        protocol.destroy();
    }

    @Override
    public List<ProtocolServer> getServers() {
        return protocol.getServers();
    }

}

# 流程分析

# 基于SPI机制加载Filter

# ExtensionLoader.getActivateExtension(URL url, String key, String group)

  1. key为url中的service|reference.filter参数,即配置文件中的dubbo.provider|consumer.filter
  2. 调用方法ExtensionLoader.getActivateExtension(URL url, String[] values, String group)
    • 参数1:调用的url 参数2:key参数以逗号分隔 参数3:分组,参数为:provider|consumer
public class ExtensionLoader<T> {
   // other method
   public List<T> getActivateExtension(URL url, String key, String group) {
       // 获取url中的service.filter参数
       String value = url.getParameter(key);
       // 参数1:调用的url   参数2:key参数以逗号分隔 参数3:分组,参数为:provider|consumer
       return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);
   }
   // other method
}

# ExtensionLoader.getActivateExtension(URL url, String[] values, String group)

  • 第一轮加载
  1. 判断values(将key以逗号分隔的数组)中是否有-default参数。
    1. 包含-default参数,不加载所有默认的filter,以及自定义的filter
    2. 不包含-default参数
      1. METE-INF/dubbo/internal/org.apache.dubbo.rpc.Filter读取内置的filter配置
      2. METE-INF/dubbo/com.alibaba.dubbo.rpc.Filter读取自定义filter配置
  2. 过滤掉没有写Activate注解的filter
  3. 匹配是提供者侧或消费者侧的filter(单次只加载一侧的filter)
  4. 过滤掉在配置文件中指定要激活的filter,比如配置了dubbo.provider.filter=xxxFilter,那么xxxFilter在这一轮不加载
  5. 过滤掉名称前带有'-'号的filter
  6. 过滤掉没有携带指定参数filter,比如org.apache.dubbo.validation.filter.ValidationFilter要求url中带有参数validation才进行激活
  7. 对加载出来的filter进行排序
  • 第二轮加载
  1. 加载在配置文件中指定要激活的filter
    • 加载指定名称的filter,若名称配置为true,加载所有默认的和自定义的filter
    • 如果配置配置了default,把default前面的自定义filter放在系统的filter前面执行,即如果配置了dubbo.provider.filter=xxxFilter,default,xxFilter2,那么先执行xxxFilter,然后是系统内置的和自定义默认激活的filter,然后是xxFilter2
public class ExtensionLoader<T> {
   // other method
    public List<T> getActivateExtension(URL url, String[] values, String group) {
       List<T> activateExtensions = new ArrayList<>();
       // name,配置文件的配置数据,以逗号分隔的数组
       // 比如dubbo.provider.filter=xxxFilter,xxxFilter2, 那么names = ["xxxFilter", "xxxFilter2"]
       List<String> names = values == null ? new ArrayList<>(0) : asList(values);
       
       // 第一轮加载
       // 如果service.filter有-default,那么不加载所有的filter,这一轮加载跳过
       if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
           // 加载所有默认的filter, 包含内置的和自定义的filter
           getExtensionClasses();
           for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
               String name = entry.getKey();
               Object activate = entry.getValue();

               String[] activateGroup, activateValue;

               if (activate instanceof Activate) {
                   activateGroup = ((Activate) activate).group();
                   activateValue = ((Activate) activate).value();
               } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
                   activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
                   activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
               } else {
                   // 1. 过滤掉没写注解的filter
                   continue;
               }
               // 分组配置,消费者侧或提供者侧
               if (isMatchGroup(group, activateGroup)
                       // 未在配置文件中指定激活,比如如果了配置dubbo.provider.filter=xxxFilter,那么xxxFilter不在这里加载
                       && !names.contains(name)
                       // 跳过名称前带了'-'号的的filter
                       && !names.contains(REMOVE_VALUE_PREFIX + name)
                       // 判断是否激活,指的是url中带了指定参数才激活filter
                       // 比如org.apache.dubbo.validation.filter.ValidationFilter要求url中带有参数validation才进行激活
                       && isActive(activateValue, url)) {
                   activateExtensions.add(getExtension(name));
               }
           }
           // 排序,order小的优先执行
           activateExtensions.sort(ActivateComparator.COMPARATOR);
       }
       
       // 第二轮加载
       List<T> loadedExtensions = new ArrayList<>();
       // 遍历配置文件的配置name
       for (int i = 0; i < names.size(); i++) {
           String name = names.get(i);
           // 名称以'-'开头的不加载
           if (!name.startsWith(REMOVE_VALUE_PREFIX)
                   // 也不在以'-'开头的名称中
                   && !names.contains(REMOVE_VALUE_PREFIX + name)) {
              // 如果定义在default前面的自定义过滤器,加在系统过滤器前面
               if (DEFAULT_KEY.equals(name)) {
                   if (!loadedExtensions.isEmpty()) {
                       activateExtensions.addAll(0, loadedExtensions);
                       loadedExtensions.clear();
                   }
               } else {
                   // 加载指定名称的filter
                   loadedExtensions.add(getExtension(name));
               }
           }
       }
       // 激活所有第二轮加载的filter,放在系统或自定义激活的过滤器后执行
       if (!loadedExtensions.isEmpty()) {
           activateExtensions.addAll(loadedExtensions);
       }
       return activateExtensions;
   }

   /**
    * 加载指定名称的filter
    * 
    * @param name 指定的名称
    * @return the filter
    */
   public T getExtension(String name) {
      if (StringUtils.isEmpty(name)) {
         throw new IllegalArgumentException("Extension name == null");
      }
      // 如果名称是true,那么加载所有内置的,和自定义的filter
      if ("true".equals(name)) {
         return getDefaultExtension();
      }
      // 加载指定名称的filter
      final Holder<Object> holder = getOrCreateHolder(name);
      Object instance = holder.get();
      if (instance == null) {
         synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
               instance = createExtension(name);
               holder.set(instance);
            }
         }
      }
      return (T) instance;
   }
   // other method
}

# 排序

排序所有已加载的filter,order小的优先执行。

Activate.before, Activate.after已被官方加上过期声明注解@Deprecated,本次不讨论


public class ActivateComparator implements Comparator<Object> {
    // other method
   public int compare(Object o1, Object o2) {
      // doSomething
      ActivateInfo a1 = parseActivate(o1.getClass());
      ActivateInfo a2 = parseActivate(o2.getClass());

      // doSomething
      int n1 = a1 == null ? 0 : a1.order;
      int n2 = a2 == null ? 0 : a2.order;
      // never return 0 even if n1 equals n2, otherwise, o1 and o2 will override each other in collection like HashSet
      return n1 > n2 ? 1 : -1;
   }

   private ActivateInfo parseActivate(Class<?> clazz) {
      ActivateInfo info = new ActivateInfo();
      if (clazz.isAnnotationPresent(Activate.class)) {
         Activate activate = clazz.getAnnotation(Activate.class);
         info.before = activate.before();
         info.after = activate.after();
         info.order = activate.order();
      } else {
         com.alibaba.dubbo.common.extension.Activate activate = clazz.getAnnotation(
                 com.alibaba.dubbo.common.extension.Activate.class);
         info.before = activate.before();
         info.after = activate.after();
         info.order = activate.order();
      }
      return info;
   }
   // other method
}

# 遍历filter,使用装饰者模式将invoker进行装饰

# 执行请求:filter.invoke

# 异常回调

  1. 如果Filter继承了抽象类org.apache.dubbo.rpc.ListenableFilter,调用继承类的onError方法
  2. 如果Filter实现了接口org.apache.dubbo.rpc.Filter.Listener,调用实现类的onError方法

# 结论

# 第一轮加载

  1. 不加任何配置的情况下,系统默认加载所有内置的和自定义的filter,自定义的filter需要在METE-INF/dubbo/com.alibaba.dubbo.rpc.Filter中定义,即可生效
  2. 若需不使用某个filter,在配置文件中加上-xxxFilter,如不使用全部的filter,在配置文件中加上-default
  3. 基于order排序,序号小的靠前

# 第二轮加载

  1. 遍历配置文件的配置,若配置了filter的名称为true, 则不管第一轮是否进行加载,也会去加载所有内置的和自定义默认的filter,不管是否激活,不建议使用该配置
  2. 加载指定名称的filter,内置的,自定义的都可以加载, 不判断是否添加了Activate注解
  3. 第二轮加载的filter不进行排序,按照定义顺序,default前的放在第一轮加载的filter前执行,default后的或者没有配置default的放在第一轮加载的filter后执行

# 应用场景

  1. 对于需要默认激活的filter,添加Activate注解,用户通过配置dubbo.provider.filter=-xxxFilter按需移除
  2. 对于不需要默认激活的filter,不添加Activate注解,用户通过配置dubbo.provider.filter=xxxFilter按需使用
    1. 这就引出了一个场景,如果需要添加指定依赖才能生效的filter, 让用户自行选择是否添加依赖,使用该filter
  3. 如果要执行某一个filter优先于所有filter执行dubbo.provider.filter=xxxFilter,default,xxxFilter2
    1. 加载xxxFilter
    2. 加载内置的和其他自定义的filter,按order排序
    3. 加载xxxFilter2
    4. 按加载顺序执行
Last Updated: 11/30/2021, 8:33:18 PM