# Dubbo Admin Service加载机制
本次分析的版本号为0.3.0,支持dubbo 2.7.x,注册中心使用zookeeper。仅关注核心代码,即如何从注册中心读取服务和如何进行本地缓存。
# 图解
除了RegistryServerSync,其他都属于dubbo框架内部的实现,也就是说如果要实现dubbo admin的service本地缓存,只用书写RegistryServerSync即可
# 代码分析
# 开始订阅
基于事件ApplicationReadyEvent驱动,该事件表示application初始化完成,可以准备接收请求。
Event published as late as conceivably possible to indicate that the application is ready to service requests. The source of the event is the SpringApplication itself, but beware of modifying its internal state since all initialization steps will have been completed by then.
事件尽可能晚地发布,以指示应用程序已准备好为请求提供服务。事件的来源是SpringApplication本身,但请注意修改其内部状态,因为届时所有初始化步骤都将完成。
public class ApplicationReadyEvent extends SpringApplicationEvent {
private final ConfigurableApplicationContext context;
public ApplicationReadyEvent(SpringApplication application, String[] args, ConfigurableApplicationContext context) {
super(application, args);
this.context = context;
}
public ConfigurableApplicationContext getApplicationContext() {
return this.context;
}
}
# 触发订阅
在程序初始化完成,发出事件ApplicationReadyEvent
后,RegistryServerSync.startSubscribe
的方法开始从注册中心读取Service
@Component
public class RegistryServerSync implements DisposableBean, NotifyListener {
private static final Logger logger = LoggerFactory.getLogger(RegistryServerSync.class);
/**
* 初始化订阅的url
*/
private static final URL SUBSCRIBE = new URL(Constants.ADMIN_PROTOCOL, NetUtils.getLocalHost(), 0, "",
Constants.INTERFACE_KEY, Constants.ANY_VALUE,
Constants.GROUP_KEY, Constants.ANY_VALUE,
Constants.VERSION_KEY, Constants.ANY_VALUE,
Constants.CLASSIFIER_KEY, Constants.ANY_VALUE,
Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + ","
+ Constants.CONSUMERS_CATEGORY + ","
+ Constants.ROUTERS_CATEGORY + ","
+ Constants.CONFIGURATORS_CATEGORY,
Constants.ENABLED_KEY, Constants.ANY_VALUE,
Constants.CHECK_KEY, String.valueOf(false));
/**
* 开始订阅注册中心的Service
*/
@EventListener(classes = ApplicationReadyEvent.class)
public void startSubscribe() {
logger.info("Init Dubbo Admin Sync Cache...");
registry.subscribe(SUBSCRIBE, this);
}
}
# 服务读取
public class ZookeeperRegistry extends CacheableFailbackRegistry {
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);
private final static String DEFAULT_ROOT = "dubbo";
/**
* 从注册中心读取Service的核心方法
*
* @param url url
* @param listener 监听器
*/
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url,
k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child)
.addParameters(INTERFACE_KEY, child, Constants.CHECK_KEY,
String.valueOf(false)), k);
}
}
});
zkClient.create(root, false);
// 读取所有/dubbo路径下的服务
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
// 根据扫描到的service分组,生成URL
// admin://192.168.145.221/${这里是添加的serviceKey}?category=providers,consumers,routers,configurators&check={这里是加入的CHECK_KEY, 值为false}&classifier=*&enabled=*&group=*&interface=${这里是添加的serviceKey}&version=*
// serviceKey可以理解为注册到zk上的服务名称
// 递归调用subscribe方法
subscribe(url.setPath(service)
.addParameters(INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)),
listener);
}
}
} else {
CountDownLatch latch = new CountDownLatch(1);
List<URL> urls = new ArrayList<>();
// 每个service的扫描,分成4次,从4个类型(providers, consumers, routers, configurators)下去扫描
// 生成格式为/dubbo/{serviceName}/{categoryName}
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url,
k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener,
k -> new RegistryChildListenerImpl(url, path,
k, latch));
if (zkListener instanceof RegistryChildListenerImpl) {
((RegistryChildListenerImpl) zkListener).setLatch(latch);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
// 调用toUrlsWithEmpty方法,当有对应的服务提供者的时候,按服务提供者,生成正常的url。如果没有服务提供者,生成empty协议的url,用于移除缓存
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
// 1. org.apache.dubbo.registry.support.FailbackRegistry#notify
// 2. org.apache.dubbo.registry.support.FailbackRegistry#doNotify
// 3. org.apache.dubbo.registry.support.AbstractRegistry#notify
// 4. 遍历urls, 遍历过程中,调用RegistryServerSync.notify方法
notify(url, listener, urls);
// tells the listener to run only after the sync notification of main thread finishes.
latch.countDown();
}
} catch (Throwable e) {
throw new RpcException(
"Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
private String toRootDir() {
if (root.equals(PATH_SEPARATOR)) {
return root;
}
return root + PATH_SEPARATOR;
}
private String toRootPath() {
return root;
}
private String toServicePath(URL url) {
String name = url.getServiceInterface();
if (ANY_VALUE.equals(name)) {
return toRootPath();
}
return toRootDir() + URL.encode(name);
}
private String[] toCategoriesPath(URL url) {
String[] categories;
if (ANY_VALUE.equals(url.getCategory())) {
categories = new String[]{PROVIDERS_CATEGORY, CONSUMERS_CATEGORY, ROUTERS_CATEGORY, CONFIGURATORS_CATEGORY};
} else {
categories = url.getCategory(new String[]{DEFAULT_CATEGORY});
}
String[] paths = new String[categories.length];
for (int i = 0; i < categories.length; i++) {
paths[i] = toServicePath(url) + PATH_SEPARATOR + categories[i];
}
return paths;
}
private String toCategoryPath(URL url) {
return toServicePath(url) + PATH_SEPARATOR + url.getCategory(DEFAULT_CATEGORY);
}
private String toUrlPath(URL url) {
return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
}
/**
* 按匹配的服务提供者,生成url
* 服务提供者匹配 方法全路径相同,分组相同(或不指定分组),版本相同(或不指定版本),classifier相同
*
* @param consumer
* @param providers
* @return
*/
private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
List<URL> urls = new ArrayList<>();
if (CollectionUtils.isNotEmpty(providers)) {
for (String provider : providers) {
if (provider.contains(PROTOCOL_SEPARATOR_ENCODED)) {
URL url = URLStrParser.parseEncodedStr(provider);
if (UrlUtils.isMatch(consumer, url)) {
urls.add(url);
}
}
}
}
return urls;
}
private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
// 如果未找到url对应的服务提供者,生成empty协议的url,用于移除缓存
if (urls == null || urls.isEmpty()) {
int i = path.lastIndexOf(PATH_SEPARATOR);
String category = i < 0 ? path : path.substring(i + 1);
URL empty = URLBuilder.from(consumer)
.setProtocol(EMPTY_PROTOCOL)
.addParameter(CATEGORY_KEY, category)
.build();
urls.add(empty);
}
return urls;
}
@Override
protected boolean isMatch(URL subscribeUrl, URL providerUrl) {
return UrlUtils.isMatch(subscribeUrl, providerUrl);
}
/**
* 方法继承自FailbackRegistry
* @param url
* @param listener
*/
public void removeFailedSubscribed(URL url, NotifyListener listener) {
Holder h = new Holder(url, listener);
FailedSubscribedTask f = failedSubscribed.remove(h);
if (f != null) {
f.cancel();
}
removeFailedUnsubscribed(url, listener);
}
/**
* 方法继承自FailbackRegistry
* @param url
* @param listener
*/
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: "
+ getUrl().getParameter(FILE_KEY,
System.getProperty("user.home") + "/dubbo-registry-"
+ url.getHost() + ".cache") + ", cause: "
+ t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check =
getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
private class RegistryChildListenerImpl implements ChildListener {
private RegistryNotifier notifier;
private long lastExecuteTime;
private volatile CountDownLatch latch;
public RegistryChildListenerImpl(URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) {
this.latch = latch;
notifier = new RegistryNotifier(ZookeeperRegistry.this.getDelay()) {
@Override
public void notify(Object rawAddresses) {
long delayTime = getDelayTime();
if (delayTime <= 0) {
this.doNotify(rawAddresses);
} else {
long interval = delayTime - (System.currentTimeMillis() - lastExecuteTime);
if (interval > 0) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
// ignore
}
}
lastExecuteTime = System.currentTimeMillis();
this.doNotify(rawAddresses);
}
}
@Override
protected void doNotify(Object rawAddresses) {
ZookeeperRegistry.this.notify(consumerUrl, listener,
ZookeeperRegistry.this.toUrlsWithEmpty(consumerUrl, path,
(List<String>) rawAddresses));
}
};
}
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void childChanged(String path, List<String> children) {
try {
latch.await();
} catch (InterruptedException e) {
logger.warn(
"Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread.");
}
notifier.notify(children);
}
}
}
# 缓存结果
ZookeeperRegistry
从注册中心读取完Service后,调用RegistryServerSync.notify
方法将服务缓存到本地
@Component
public class RegistryServerSync implements DisposableBean, NotifyListener {
private static final Logger logger = LoggerFactory.getLogger(RegistryServerSync.class);
/**
* ConcurrentHashMap<ID_MD5, URL.toFullString>, 确保每个url在没有变更的时候返回的是同一个ID,ID也用于变更比较
*/
private final ConcurrentHashMap<String, String> URL_IDS_MAPPER = new ConcurrentHashMap<>();
/**
* ConcurrentMap<Category, ConcurrentMap<ServiceName, Map<ID_MD5, URL>>>
* registryCache
*/
private final ConcurrentMap<String, ConcurrentMap<String, Map<String, URL>>> registryCache = new ConcurrentHashMap<>();
@Autowired
private Registry registry;
/**
* 返回service的本地缓存
*
* @return 本地缓存map
*/
public ConcurrentMap<String, ConcurrentMap<String, Map<String, URL>>> getRegistryCache() {
return registryCache;
}
/**
* 服务的本地缓存和变更处理都在这里
*
* @param urls 一个service的urls,根据category分组
*/
@Override
public void notify(List<URL> urls) {
if (urls == null || urls.isEmpty()) {
return;
}
// Map<Category, Map<ServiceName, Map<ID_MD5, URL>>>
final Map<String, Map<String, Map<String, URL>>> categories = new HashMap<>();
String interfaceName = null;
for (URL url : urls) {
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY);
// NOTE: group and version in empty protocol is *
// 该判断目的在于移除缓存
if (Constants.EMPTY_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
ConcurrentMap<String, Map<String, URL>> services = registryCache.get(category);
if (services != null) {
String group = url.getParameter(Constants.GROUP_KEY);
String version = url.getParameter(Constants.VERSION_KEY);
// NOTE: group and version in empty protocol is *
if (!Constants.ANY_VALUE.equals(group) && !Constants.ANY_VALUE.equals(version)) {
services.remove(url.getServiceKey());
} else {
for (Map.Entry<String, Map<String, URL>> serviceEntry : services.entrySet()) {
String service = serviceEntry.getKey();
if (Tool.getInterface(service).equals(url.getServiceInterface())
&& (Constants.ANY_VALUE.equals(group) || StringUtils.isEquals(group, Tool.getGroup(service)))
&& (Constants.ANY_VALUE.equals(version) || StringUtils.isEquals(version, Tool.getVersion(service)))) {
services.remove(service);
}
}
}
}
} else {
// 该判断用于新增和变更缓存
if (StringUtils.isEmpty(interfaceName)) {
interfaceName = url.getServiceInterface();
}
Map<String, Map<String, URL>> services = categories.get(category);
if (services == null) {
services = new HashMap<>();
categories.put(category, services);
}
String service = url.getServiceKey();
Map<String, URL> ids = services.get(service);
if (ids == null) {
ids = new HashMap<>();
services.put(service, ids);
}
// 确保相同的URL用的是相同的ID
if (URL_IDS_MAPPER.containsKey(url.toFullString())) {
ids.put(URL_IDS_MAPPER.get(url.toFullString()), url);
} else {
String md5 = CoderUtil.MD5_16bit(url.toFullString());
ids.put(md5, url);
URL_IDS_MAPPER.putIfAbsent(url.toFullString(), md5);
}
}
}
// 如果变更和新增的service为空,就不做处理了
if (categories.size() == 0) {
return;
}
for (Map.Entry<String, Map<String, Map<String, URL>>> categoryEntry : categories.entrySet()) {
String category = categoryEntry.getKey();
ConcurrentMap<String, Map<String, URL>> services = registryCache.get(category);
if (services == null) {
services = new ConcurrentHashMap<String, Map<String, URL>>();
registryCache.put(category, services);
} else {
// Fix map can not be cleared when service is unregistered: when a unique “group/service:version” service is unregistered, but we still have the same services with different version or group, so empty protocols can not be invoked.
Set<String> keys = new HashSet<String>(services.keySet());
for (String key : keys) {
// 判断版本号,分组等若有变更,直接从缓存移除
if (Tool.getInterface(key).equals(interfaceName) && !categoryEntry.getValue().entrySet().contains(key)) {
services.remove(key);
}
}
}
// 将变更的和新增的加入缓存
services.putAll(categoryEntry.getValue());
}
}
}