你好,我是何辉。今天我们深入研究Dubbo源码的第十篇,调用流程。
在消费方这样的代码你一定见过很多了。
1 2 @DubboReference private DemoFacade demoFacade;
在 Spring Bean 中定义 Dubbo 接口为成员变量时,用 @DubboReference 注解修饰 DemoFacade。
你有没有好奇过,我们现在看到的这个 demoFacade 变量,在内存运行时,值类型还属于 DemoFacade 这个类型么?如果不是,那拿着 demoFacade 变量去调用里面的方法时,在消费方到底会经历怎样的调用流程呢?
这个问题,我以前也探索过,不过因为源码太复杂,最后都徒劳无功。直到有一天,遇到了一位资深大佬,指点了我看源码的 12 字方针:不钻细节:只看流程;不看过程:只看结论;再看细节:再看过程 。参考他的经验,我经过一番简单的调试后,就轻松梳理出了调用流程的大体框架。
今天我就带你按照这12字方针的思路,感受一下高手是如何研究源码流程的。每个环节,我会用图片总结,加强你对流程的形象化理解。今天的内容稍微多一些,做好准备,我们马上开始。
sayHello 调试 先来看下我们的消费方调用代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Component public class InvokeDemoService { @DubboReference private DemoFacade demoFacade; public String invokeDemo () { return demoFacade.sayHello("Geek" ); } }
代码在这个类中定义了一个 invokeDemo 的方法,然后,在方法体中对下游 DemoFacade 接口的 sayHello 方法发起了远程调用。
是不是发现消费方真的非常简单且普通。那怎么展开探索呢?我们以 Debug 的方式调试消费方,在 sayHello 方法打个断点,通过断点调试,钻进调用流程的各个环节,同时参考12字方针,深入研究调用流程的底层逻辑。
1. JDK代理 我们在调用 sayHello 方法的这行打上一个断点,先运行提供方,再 Debug 运行消费方,很快断点就到来了。
还记得前面提出的一个疑问么,demoFacade 在内存运行时,值类型是什么?
从图中,我们可以清楚地看到,demoFacade 的类型,是一个随机生成的代理类名,不再属于 DemoFacade 这个类型了,而且结合 $Proxy 类名、代理类中的 h 成员变量属于 JdkDynamicAopProxy 类型,综合判断,这是采用 JDK 代理动态,生成了一个继承 Proxy 的代理类。
如果你再展开 h 属性看看它里面的成员变量。
有 2 个比较重要的字段,targetSource 和 interfaces,我们挨个看下。
先来看看 targetSource 变量(ReferenceBean$DubboReferenceLazyInitTargetSource)。
看到它的类名,有没有似曾相识的感觉,类名上包含了 DubboReference 关键字,想必跟订阅流程中的引用有关。而且类名的构成,由 $ 符号隔开了,可以说明 DubboReferenceLazyInitTargetSource 是 ReferenceBean 的一个内部类。
进去瞄一眼,看看猜想对不对。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private class DubboReferenceLazyInitTargetSource extends AbstractLazyCreationTargetSource { @Override protected Object createObject () throws Exception { return getCallProxy(); } @Override public synchronized Class<?> getTargetClass() { return getInterfaceClass(); } } ↓ private Object getCallProxy () throws Exception { if (referenceConfig == null ) { throw new IllegalStateException ("ReferenceBean is not ready yet, please make sure to call reference interface method after dubbo is started." ); } return referenceConfig.get(); }
进去之后才发现,原来内部类是一层壳,核心创建对象的逻辑还是 ReferenceConfig 的 get 方法。
我们再来看 interfacaces 变量。
这里除了有我们调用下游的接口 DemoFacade,还有一个回声测试接口(EchoService)和一个销毁引用的接口(Destroyable)。
这俩接口我们没见过,却被 Dubbo 在创建代理的过程中追加进来了,可以说明一点,我们可以拿着 demoFacade 变量强转为 EchoService,或者强转为 Destroyable,强转后就可以调用对应接口的方法,实现了一个代理类拥有多态功能的效果。
这个多态功能的支持,得归功于 JDK 的 Proxy 类,它在创建代理对象的 newProxyInstance 方法中,支持入参传入一个接口数组,Dubbo 框架在创建代理时,不但传入了 DemoFacade 接口,还传入了 EchoService 和 Destroyable 接口,生成的代理对象就有了另外两种接口的行为能力。
1 2 3 4 5 6 7 8 9 10 11 @CallerSensitive public static Object newProxyInstance (ClassLoader loader, Class<?>[] interfaces, InvocationHandler h) throws IllegalArgumentException { 省略部分实现代码... }
好,简单整理一下,刚开始断点探索,我们就接触到了“高深”的 JDK 代理对象,解开了开头的一个疑惑, demoFacade 在运行时并非我们所见的 DemoFacade 类型,而是由 JDK 动态代理生成的一个代理对象类型。
碰巧发现,在生成的代理对象中,targetSource 成员变量创建的底层核心逻辑还是 ReferenceConfig 的 get 方法,不得不说 ReferenceConfig 是消费方引用下游接口逻辑中非常重要的一个类。
同时还认识了 EchoService 和 Destroyable 两个接口,让我们使用 demoFacade 时不仅可以调用 sayHello 方法,还可以强转为这两个接口调用不同的方法,使得一个 demoFacade 变量拥有三能能力,这就是代理增强的魅力所在。
2. InvokerInvocationHandler 接下来,我们继续 Debug 进入 sayHello 方法中,发现了一个名字有点眼熟的 InvokerInvocationHandler 类,虽然不知道具体能做啥,但是发现它实现了 JDK 代理中的 InvocationHandler 接口,所以,我们可以认为,这个 InvokerInvocationHandler 类是 Dubbo 框架接收 JDK 代理触发调用的入口。
来看看 InvokerInvocationHandler 的 invoke 方法,验证一下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 0 ) { if ("toString" .equals(methodName)) { return invoker.toString(); } else if ("$destroy" .equals(methodName)) { invoker.destroy(); return null ; } else if ("hashCode" .equals(methodName)) { return invoker.hashCode(); } } else if (parameterTypes.length == 1 && "equals" .equals(methodName)) { return invoker.equals(args[0 ]); } RpcInvocation rpcInvocation = new RpcInvocation (serviceModel, method, invoker.getInterface().getName(), protocolServiceKey, args); if (serviceModel instanceof ConsumerModel) { rpcInvocation.put(Constants.CONSUMER_MODEL, serviceModel); rpcInvocation.put(Constants.METHOD_MODEL, ((ConsumerModel) serviceModel).getMethodModel(method)); } return InvocationUtil.invoke(invoker, rpcInvocation); }
按12字方针,从上往下大致浏览一遍。
从代码表面的流程看,一些 toString、$destroy、hashCode 等方法做了特殊的逻辑提前返回了,最终调用 InvocationUtil 的 invoke 方法进行了收口处理。 从方法的入参和返回值看,有方法对象、入参数据、代理对象,满足了反射调用的基本要素,调用后返回的数据,自然就是我们需要的结果。 再看方法实现体的一些细节,发现最终并没有使用 proxy 代理对象,而是使用了 invoker + rpcInvocation 传入 InvocationUtil 工具类,完成了逻辑收口。 再进入 InvocationUtil,忽略与入参无关的逻辑,最终发现把 rpcInvocation 对象传入了 invoker 的 invoke 方法中,继续走后续调用逻辑。
3. MigrationInvoker Debug 一直往后走,来到了一个 MigrationInvoker 的调用类,从类的名字上看是“迁移”的意思,有点 Dubbo2 与 Dubbo3 新老兼容迁移的味道。
那来看看 MigrationInvoker 的 invoke 方法代码逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Override public Result invoke (Invocation invocation) throws RpcException { if (currentAvailableInvoker != null ) { if (step == APPLICATION_FIRST) { if (promotion < 100 && ThreadLocalRandom.current().nextDouble(100 ) > promotion) { return invoker.invoke(invocation); } } return currentAvailableInvoker.invoke(invocation); } switch (step) { case APPLICATION_FIRST: if (checkInvokerAvailable(serviceDiscoveryInvoker)) { currentAvailableInvoker = serviceDiscoveryInvoker; } else if (checkInvokerAvailable(invoker)) { currentAvailableInvoker = invoker; } else { currentAvailableInvoker = serviceDiscoveryInvoker; } break ; case FORCE_APPLICATION: currentAvailableInvoker = serviceDiscoveryInvoker; break ; case FORCE_INTERFACE: default : currentAvailableInvoker = invoker; } return currentAvailableInvoker.invoke(invocation); }
继续按 12 字方针分析。
从代码流程看,从上到下都在针对 step 变量值,进行 if…else 判断走不走分支处理,说明这里针对 step 进行了逻辑分发处理。 从方法的入参和返回值来看,入参 invocation 是一个富含所有请求信息的接口,返参也是一个抽象的 Result 接口,意思很直白,根据参数拿到结果,只是入参和返参是接口,体现了抽象的概念。 再看看方法实现体的一些细节,通过针对 APPLICATION_FIRST、FORCE_APPLICATION、FORCE_INTERFACE 来进行分发处理,这不就是消费方设置 dubbo.application.service-discovery.migration 属性,进行新老订阅方案兼容的值么?根据不同的属性值,选择对应可用的 invoker 进行继续后续调用。
4. MockClusterInvoker 然后走进”step == APPLICATION_FIRST“的分支逻辑,进入 currentAvailableInvoker 的 invoke 方法,来到了 MockClusterInvoker 这个类,看名字是“模拟集群调用”的意思。
本意是干啥的呢,我们继续看它的 invoke 方法逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Override public Result invoke (Invocation invocation) throws RpcException { Result result; String value = getUrl().getMethodParameter(invocation.getMethodName(), "mock" , Boolean.FALSE.toString()).trim(); if (ConfigUtils.isEmpty(value)) { result = this .invoker.invoke(invocation); } else if (value.startsWith("force" )) { result = doMockInvoke(invocation, null ); } else { try { result = this .invoker.invoke(invocation); if (result.getException() != null && result.getException() instanceof RpcException){ RpcException rpcException= (RpcException)result.getException(); if (rpcException.isBiz()){ throw rpcException; }else { result = doMockInvoke(invocation, rpcException); } } } catch (RpcException e) { if (e.isBiz()) { throw e; } result = doMockInvoke(invocation, e); } } return result; }
照例按12字方针分析。
从代码流程看,整体上就三个 if 逻辑分支,no mock、force mock 和 fail mock,分别进行了不同逻辑的调用。 从方法的入参和返回值看,入参和返参和 MigrationInvoker 一样,都是重写了 invoke 方法,只是内部逻辑不一样,不同实现类做各自的事情。 再看看方法实现体的一些细节,mock 属性值(除了空值),当以 force 开头时,会直接执行 Mock 业务逻辑,其他情况还是先尝试正常后续调用,如果出现了非业务异常时,就尝试执行 Mock 业务逻辑。
5. 过滤器链 这里,看到 Cluster 这个关键字,想必你也想到了它是一个 SPI 接口,那在“发布流程”中我们也学过,远程导出和远程引用的时候,会用过滤器链把 invoker 层层包装起来。
那么我们就接着断点下去,发现确实进入 FutureFilter、MonitorFilter 等过滤器,这也证明了过滤器链包裹消费方 invoker 的存在。
6. FailoverClusterInvoker 断点一层层走完了所有的过滤器,接着又来到了 FailoverClusterInvoker 这个类,从名字上一看就是在“[温故知新]”中接触的故障转移策略,是不是有点好奇故障到底是怎么转移的?
我们不妨继续断点,进入它的 doInvoke 方法看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke (Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = calculateInvokeTimes(methodName); RpcException le = null ; List<Invoker<T>> invoked = new ArrayList <Invoker<T>>(copyInvokers.size()); Set<String> providers = new HashSet <String>(len); for (int i = 0 ; i < len; i++) { if (i > 0 ) { checkWhetherDestroyed(); copyInvokers = list(invocation); checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getServiceContext().setInvokers((List) invoked); boolean success = false ; try { Result result = invokeWithContext(invoker, invocation); success = true ; return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException (e.getMessage(), e); } finally { if (!success) { providers.add(invoker.getUrl().getAddress()); } } } throw new RpcException (le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); }
代码流程分析也比较简单。
从代码流程看,主要就是一个大的 for 循环,循环体中进行了 select 操作,拿到了一个合适的 invoker,发起后续逻辑调用。 从方法的入参和返回值看,入参是 invocation、invokers、loadbalance 三个参数,猜测应该就是利用 loadbalance 负载均衡器,从 invokers 集合中,选择一个 invoker来发送 invocation 数据,发送完成后得到了返参的 Result 结果。 再看看方法实现体的一些细节,通过计算 retries 属性值得到重试次数并循环,每次循环都是利用负载均衡器选择一个进行调用,如果出现非业务异常,继续循环调用,直到所有次数循环完,还是没能拿到结果的话就会抛出 RpcException 异常了。
7. DubboInvoker 现在你应该知道“故障转移策略”的深层含义了吧,就是根据预先设置好的重试次数,当调用发生非业务异常时,按照负载均衡策略继续选择一个 invoker,再次发起调用。
了解完故障转移策略,我们继续 Debug,结果来到了 DubboInvoker 这个类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Override protected Result doInvoke (final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[0 ]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = calculateTimeout(invocation, methodName); invocation.setAttachment(TIMEOUT_KEY, timeout); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false ); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { ExecutorService executor = getCallbackExecutor(getUrl(), inv); CompletableFuture<AppResponse> appResponseFuture = currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj); FutureContext.getContext().setCompatibleFuture(appResponseFuture); AsyncRpcResult result = new AsyncRpcResult (appResponseFuture, inv); result.setExecutor(executor); return result; } } catch (TimeoutException e) { throw new RpcException (RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException (RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
从代码流程看,拿到了一个交换数据的客户端类,然后走两个发送数据的分支,一条分支逻辑单程调用不需要响应,一条有响应。 从方法的入参和返回值来看,没有什么特别,根据请求数据想办法发起调用,拿到 Result 结果。 但是当前的类名是处理 Dubbo 协议的调用?有点好奇,我们看看这个 doInvoke 有哪些实现类,发现了有 InjvmInvoker、GrpcInvoker 等实现类,也证实了当前的 DubboInvoker 是专门处理 Dubbo 协议调用的类。
再看看方法实现体的一些细节,计算了超时时间值,单程发送,不需要响应好说,直接调用交互数据客户端类的 send 方法就行,需要有响应的分支逻辑还定义了线程池,调用的是 request 方法进行发送。 不过,两条分支最终都返回了一个异步结果对象,可以看出当前的 DubboInvoker 旨在屏蔽协议之间的差异,让上层调用方不感知协议间的不同。
8. ReferenceCountExchangeClient 因为我们没有单独设置调用不需要响应,就继续断点进入 currentClient 的 request 方法,看看到底是怎么发送的,来到了 ReferenceCountExchangeClient 类。
1 2 3 4 5 6 7 8 9 @Override public CompletableFuture<Object> request (Object request, int timeout, ExecutorService executor) throws RemotingException { return client.request(request, timeout, executor); }
从代码流程来看,这个类中的 request 方法也很简单,把入参数据直接全部交给了当前类的成员变量 client 处理。 从方法的入参和返回值看,入参是请求对象、超时时间、线程池对象,返参是 CompletableFuture 对象。从线程池和 Future 对象,充分说明了这个 request 方法进行了同步转异步的操作。 再看看方法实现体的一些细节,从断点展示的变量值细节看,只是发现了 client 的类型属于 HeaderExchangeClient,也就意味着 HeaderExchangeClient 完成了同步转异步的逻辑操作。
9. NettyClient 仍然没有看到数据是怎么发送的,我们继续深入 HeaderExchangeClient 的 request 方法中,Debug 几步后发现,进入了抽象类 AbstractPeer 的 send 方法。
1 2 3 4 5 6 7 8 9 @Override public void send (Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false )); }
逻辑也挺简单。
从代码流程来看,没有过多的逻辑包装,直接把入参交给了另外一个重载的 send 方法。 从方法的入参和返回值来看,入参是请求对象,没有返参,并且方法上声明了 RemotingException 异常,说明这里离数据的发送应该很近了。至于没有返回值,主要是前面由同步转了异步,没有返回值也正常。 再看看方法实现体的一些细节,抽象类看不出什么,但是从堆栈变量的引用值可以发现,当前的抽象类是 NettyClient 的父类。 那我们从 NettyClient 的 send 方法细看,结果发现了最终调用了 NioSocketChannel 的 writeAndFlush 方法,这个不就是 Netty 网络通信框架的 API 么?
到这里,我们在代码层面解释了数据原来是通过 Netty 框架的 NioSocketChannel 发送出去的。
10. NettyCodecAdapter 一旦进入到 Netty 框架,再通过断点一步步跟踪数据就有点难了,因为 Netty 框架内部处处都是“线程+队列”的异步操作方式,这里我们走个捷径,进入 NettyClient 类,找找初始化 NettyClient 的相关 Netty 层面的配置。
你会找到一个 NettyCodecAdapter 类,对数据进行编解码,直接在类中的 encode 方法打个断点,等断点过来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private class InternalEncoder extends MessageToByteEncoder { @Override protected void encode (ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { ChannelBuffer buffer = new NettyBackedChannelBuffer (out); Channel ch = ctx.channel(); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); codec.encode(channel, buffer, msg); } }
可以看到,编码的方法简单干脆。
从代码流程看,调用了一个 codec 编码器的变量,对入参编码处理。 从方法的入参和返回值看,msg 是请求数据对象,out 变量是 ByteBuf 缓冲区,应该是将 msg 编码之后的数据流。 再看看方法实现体的一些细节,没什么特别之处,无非就是走后续编解码器的编码方法,编码完成后,再通过 Netty 框架把数据流发送出去。
调用流程的其他案例 经过漫长的十步断点,我们把消费方的调用流程大致串了起来,虽说过程有点曲折,但最终的艰辛是值得的,对于你理解框架调用的来龙去脉很重要,而且未来你会体会到,当调用环节发生了异常,你只要认真回忆一下异常在哪个环节,然后结合代码简单分析一下,很快就能分析出原因来。
消费方的调用流程就是这么一步步调试分析出来的,那还有哪些典型的流程,可以按这个思路一步步调试分析出调用流程呢?我就列举三个比较典型的,你课后可以练练手。
第一,Tomcat 容器接收请求流程,通过在 HttpServlet 的 service 方法打个断点,从调用堆栈的最下面开始,可以分析出 Tomcat 的整体架构。
第二,SpringMvc 处理请求的流程,通过在 Controller 的任意方法打个断点,就可以逐步分析出一个 SpringMvc 处理请求的大致流程框架。
第三,Spring 的 getBean 方法,通过这个方法的层层深入,可以分析出一个庞大的 Spring 对象生成体系,也能挖掘出非常多 Spring 各种操控对象的扩展机制。
总结 今天,从一个被 @DubboReference 标识的常规成员变量开始,我们抛出了两个问题,变量的值类型在运行时是否是见到的类型,在消费方发起调用时会经过怎样的调用流程体系。
带着这两个问题,我们用最普通的 Debug 调试方式,借鉴 12 字方针深入跟踪调用流程的源码,边分析、边绘图,步步为营,最终梳理出了消费方调用流程的大体框架。你还可以和“源码框架”中学过的分层模块联系到一起复习。
总结一下跟踪源码的 12 字方针。
“不钻细节:只看流程。”代码虽然多,但我们不必研究每个细节,要先捡方法中的重要分支流程,一些提前返回的边边角角的流程一概忽略不看。 “不看过程:只看结论。”每个方法的代码逻辑可长可短,我们可以重点研究这个方法需要什么入参,又能给出什么返参,以此推测方法在干什么,到底要完成一件什么样的事情,搞清楚并得出结论。 “再看细节:再看过程。”当你按照前两点认真调试后,大概的调用流程体系就清楚了。在此基础之上,再来细看被遗忘的边角料代码,有助于你进一步丰富调用流程图,体会源码细节中那些缜密的思维逻辑。 思考题 留个作业,消费方的普通调用会经历哪些流程你已经十分熟悉了,可以研究消费方进行泛化调用时会经历哪些流程,以及泛化调用的底层是怎么实现的?动手练习一下。
期待看到你的回答,如果觉得今天的内容对你有帮助,也欢迎分享给身边的朋友一起讨论。我们下一讲见。
20 思考题参考 上一期留了个作业,研究下消费方接收 zookeeper 服务端事件变更的地方在哪里。
想要解答这个问题,我们得从添加监听的起源看起,只有知道添加监听干了些什么事情,预留了什么回调,我们才能进一步找到接收的地方。
我们直接进入 doSubscribe 方法中看看,到底添加了什么监听?监听对应的实现类是哪个?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Override public void doSubscribe (final URL url, final NotifyListener listener) { try { checkDestroyed(); if ("*" .equals(url.getServiceInterface())) { } else { CountDownLatch latch = new CountDownLatch (1 ); try { List<URL> urls = new ArrayList <>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap <>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl (url, path, k, latch)); if (zkListener instanceof RegistryChildListenerImpl) { ((RegistryChildListenerImpl) zkListener).setLatch(latch); } zkClient.create(path, false ); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null ) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } finally { latch.countDown(); } } } catch (Throwable e) { throw new RpcException ("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
再次回到了这段代码中,这次我们是有针对性地寻找监听器类了,眼尖的你,相信很快就看到了 RegistryChildListenerImpl 这个类,从名字上看,是子节点的监听实现类,既然能在这段实现全流程逻辑,那毫无疑问得进入 RegistryChildListenerImpl 这个类去看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 private class RegistryChildListenerImpl implements ChildListener { private RegistryNotifier notifier; private long lastExecuteTime; private volatile CountDownLatch latch; public RegistryChildListenerImpl (URL consumerUrl, String path, NotifyListener listener, CountDownLatch latch) { this .latch = latch; notifier = new RegistryNotifier (getUrl(), ZookeeperRegistry.this .getDelay()) { @Override public void notify (Object rawAddresses) { long delayTime = getDelayTime(); if (delayTime <= 0 ) { this .doNotify(rawAddresses); } else { long interval = delayTime - (System.currentTimeMillis() - lastExecuteTime); if (interval > 0 ) { try { Thread.sleep(interval); } catch (InterruptedException e) { } } lastExecuteTime = System.currentTimeMillis(); this .doNotify(rawAddresses); } } @Override protected void doNotify (Object rawAddresses) { ZookeeperRegistry.this .notify(consumerUrl, listener, ZookeeperRegistry.this .toUrlsWithEmpty(consumerUrl, path, (List<String>) rawAddresses)); } }; } public void setLatch (CountDownLatch latch) { this .latch = latch; } @Override public void childChanged (String path, List<String> children) { try { latch.await(); } catch (InterruptedException e) { logger.warn("Zookeeper children listener thread was interrupted unexpectedly, may cause race condition with the main thread." ); } notifier.notify(children); } }
通过这段代码我们看到了若 childChanged 方法发生了回调,那么最终会调用到 ZookeeperRegistry 的 notify 方法。虽然我们找到了 Dubbo 针对 zookeeper 目录监听回调的地方,但是到底是谁调用了这个 childChanged 方法呢?
这个时候,又得使出我们的 Debug 绝招了,你可以按步骤操作。
步骤一,启动提供方。 步骤二,启动消费方。 步骤三,在 RegistryChildListenerImpl 类中的 childChanged 方法打个断点。 步骤四,关闭提供方。 步骤五,启动提供方。 就能看到断点的到来了。
通过对断点调用栈的分析,真相也就清晰了,原来 dubbo 框架引用了 curator 插件来与 zookeeper 进行交互,curator 插件会实时感知 zookeeper 服务端发送的事件变更,然后 curator 再通过一定的回调处理,就调用到了 RegistryChildListenerImpl 类中的 childChanged 方法。