服务加载机制SPI服务发现SPI机制优势SPI机制实现类加载类组装客户端服务请求完整流程服务代理发起请求同步请求流程异步请求流程发起请求入口线程模型请求编码服务端服务响应请求解码服务调用客户端获取结果响应解码唤醒消费者
SPI(Service Provider Interface)
机制通过将接口实现类的全限定名配置在META-INF/dubbo
文件中,并由服务加载器读取配置文件,加载接口实现类,动态为接口替换实现类,增强扩展能力。
dubbo
使用增强的Dubbo SPI
机制,可以配置和按需加载指定的实现类。同时具备IOC、AOP
能力:
具备IOC能力:如果某扩展类属性依赖其他对象,则会自动完成该依赖对象的注入;
具备AOP能力:自动发现扩展类的包装类,完成包装类的构造,使用包装类替代原始类别返回。
SPI机制实例:首先定义接口及其实现类
xxxxxxxxxx
221// 服务接口
2// org.apache.dubbo.springboot.demo
3
4public interface Payment {
5 void pay(double amount);
6}
7// 具体实现alipay
8// org.apache.dubbo.springboot.demo.impl.AlipayPayment
9public class .AlipayPayment implements Payment {
10
11 public void pay(double amount) {
12 System.out.println("Paying " + amount + " using Alipay");
13 }
14}
15// 具体实现wechatpay
16// org.apache.dubbo.springboot.demo.impl.WeChatPayPayment
17public class WeChatPayPayment implements Payment {
18
19 public void pay(double amount) {
20 System.out.println("Paying " + amount + " using WeChatPay");
21 }
22}
之后将实现类信息配置在META-INF/dubbo
目录下,以接口全限定名命名的文件中
xxxxxxxxxx
21alipay=org.apache.dubbo.springboot.demo.consumer.impl.AlipayPayment
2wechatpay=org.apache.dubbo.springboot.demo.consumer.impl.WeChatPayPayment
在使用时手动指定需要加载的实现类,完成服务调用
xxxxxxxxxx
91public class PaymentClient {
2 public static void main(String[] args) {
3 ExtensionLoader<Payment> loader = ExtensionLoader.getExtensionLoader(Payment.class);
4 Payment alipay = loader.getExtension("alipay");
5 alipay.pay(100);
6 Payment wechatPay = loader.getExtension("wechatpay");
7 wechatPay.pay(100);
8 }
9}
相较于使用CGLIB
或JDK Proxy
生成服务的动态代理机制,
SPI
方式可以根据需要,通过修改配置文件,自动动态加载不同的服务实现,切换实现类无需修改代码;
自动完成扩展对象的实例化和属性自动装配,无需手动处理;
通过ExtensionLoader
统一管理所有扩展点,更加灵活和易于管理。
通过ExtensionLoader#getExtension(name)
方法获取服务实列时,将触发类加载和实例化,得到服务实例。在获取实例对象时,由于服务实例是单例的,双检查通过后,触发类加载和实例化。
xxxxxxxxxx
191// ExtensionLoader#getExtension
2// 根据服务名,获取配置文件中该服务名对应单例服务对象
3public T getExtension(String name, boolean wrap) {
4 // 持有目标对象
5 final Holder<Object> holder = getOrCreateHolder(cacheKey);
6 Object instance = holder.get();
7 // 双检测通过后,触发实例创建
8 if (instance == null) {
9 synchronized (holder) {
10 instance = holder.get();
11 if (instance == null) {
12 // 创建拓展实例
13 instance = createExtension(name, wrap);
14 holder.set(instance);
15 }
16 }
17 }
18 return (T) instance;
19}
创建实例时,首先触发类加载,到配置文件META-INF/dubbo
读取服务实例列表,根据服务实例名称获得对应的Class
对象。为了避免同一个类被重复加载,同样使用了双检查机制。
xxxxxxxxxx
371// ExtensionLoader#getExtensionClasses
2// 根据配置文件解析出服务名称到服务类全限定类名映射关系表
3private Map<String, Class<?>> getExtensionClasses() {
4 Map<String, Class<?>> classes = cachedClasses.get();
5 // 双重检查,防止重复加载
6 if (classes == null) {
7 synchronized (cachedClasses) {
8 classes = cachedClasses.get();
9 if (classes == null) {
10 // 触发加载服务Class对象
11 classes = loadExtensionClasses();
12 cachedClasses.set(classes);
13 }
14 }
15 }
16 return classes;
17}
18
19// ExtensionLoader#loadResource
20// 根据配置文件,获取服务对象Class对象,本方法位于loadExtensionClasses调用链路下游
21private void loadResource(ClassLoader classLoader,java.net.URL resourceURL) {
22 // 获取配置文件内容
23 List<String> newContentList = getResourceContent(resourceURL);
24 String clazz;
25 for (String line : newContentList) {
26 String name = null;
27 int i = line.indexOf('=');
28 // 实例名称
29 name = line.substring(0, i).trim();
30 // 实例全限定类名
31 clazz = line.substring(i + 1).trim();
32 // 加载服务实例class,并通过loadClass方法对类进行缓存
33 loadClass(classLoader,extensionClasses,resourceURL,Class.forName(clazz, true, classLoader),name,overridden);
34 }
35 }
36 }
37}
获得服务实例Class
对象后,将通过反射创建对象,并执行前置、后置处理器,再通过反射获取所有setXXX
方法,自动注入依赖,实现IOC
特性支持,最后完成包装类创建,并注入当前服务实例,完成AOP
特性支持,最终得到指定的服务实例。
xxxxxxxxxx
301// ExtensionLoader#createExtension
2// 实例化、初始化服务对象
3private T createExtension(String name, boolean wrap) {
4 // 从配置文件中加载所有的拓展类,得到“配置项名称”到“配置类”的映射关系表
5 Class<?> clazz = getExtensionClasses().get(name);
6 // 尝试根据Class信息获取服务实例对象,如果不存在则通过反射获取构造器创建对象
7 extensionInstances.putIfAbsent(clazz, createExtensionInstance(clazz));
8 instance = (T) extensionInstances.get(clazz);
9
10 // 进行前置初始化,类似Spring的postProcessBeforeInitialization,默认无操作
11 instance = postProcessBeforeInitialization(instance, name);
12 // IOC特性:通过反射获取"setXXX"方法,再通过反射调用setter方法设置依赖
13 injectExtension(instance);
14 // 进行后置初始化,类似Spring的postProcessAfterInitialization,默认无操作
15 instance = postProcessAfterInitialization(instance, name);
16
17 // AOP特性:如果实列存在包装类,初始化包装类并返回
18 for (Class<?> wrapperClass : wrapperClassesList) {
19 Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
20 boolean match = xxx;
21 if (match) {
22 // 将当前instance作为参数传给Wrapper构造方法,通过反射创建Wrapper实例,然后向Wrapper实例中注入依赖
23 instance = injectExtension(
24 (T) wrapperClass.getConstructor(type).newInstance(instance));
25 // 后置处理器
26 instance = postProcessAfterInitialization(instance, name);
27 }
28 }
29 return instance;
30}
当消费者通过dubbo远程调用生产者提供服务时,整个远程服务调用可分为:
->消费者通过代理对象Proxy
发起远程调用;
->网络客户端Client
将编码后的请求发送给服务提供方Server
;
->Server
收到请求后对数据包解码,将解码后的请求发送至分发器Dispatcher
,将请求派发到指定的线程池;
->在线程池中执行请求的服务。
假设服务接口定义为
xxxxxxxxxx
31public interface HelloService {
2 public String sayHello(String name);
3}
服务端拥有该接口的实现,并通过@DubboService
注解,将服务注册到nacos
中
xxxxxxxxxx
71
2public class HelloServiceImpl implements HelloService {
3
4 public String sayHello(String name) {
5 return "Hello " + name;
6 }
7}
客户端通过注解@DubboReference
,从nacos
中获得远程服务的代理,可以通过该代理完成远程服务调用
xxxxxxxxxx
41public class Task {
2
3 private HelloService helloService;
4}
dubbo
使用Javassist
框架为服务接口生成动态代理类,用以和消费者交互,以HelloService
为例,通过arthas
反编译得到生成的代理类
xxxxxxxxxx
181public class HelloServiceDubboProxy0 implements ClassGenerator.DC,EchoService,Destroyable,HelloService {
2 // 接口的方法数组
3 public static Method[] methods;
4 // 请求处理执行方
5 private InvocationHandler handler;
6 public HelloServiceDubboProxy0(InvocationHandler invocationHandler) {
7 this.handler = invocationHandler;
8 }
9
10 // 执行服务调用
11 public String sayHello(String string) {
12 Object[] objectArray = new Object[]{string};
13 // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果
14 Object object = this.handler.invoke(this, methods[0], objectArray);
15 return (String)object;
16 }
17 // somecode
18}
InvocationHandler#invoke
及进入InvokerInvocationHandler#invoke
方法,将请求的服务名称、方法名称、方法参数类型、方法实参构建RpcInvocation
对象,进入请求的发送流程
xxxxxxxxxx
141// InvokerInvocationHandler#invoke
2// 包装远程调用请求,发往下游进行请求发送
3public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
4 // 包装远程调用请求
5 RpcInvocation rpcInvocation = new RpcInvocation(
6 serviceModel,
7 method.getName(),
8 invoker.getInterface().getName(),
9 protocolServiceKey,
10 method.getParameterTypes(),
11 args);
12 // 发往下游进行请求发送
13 return InvocationUtil.invoke(invoker, rpcInvocation);
14}
dubbo
支持同步请求和异步请求。对于同步请求,消费者线程间柱塞,直至请求的响应到来;对于异步请求,发起请求后,消费者线程将不会柱塞,响应到来后支持线程将处理响应数据。同步请求和异步请求的区别在于,执行等待响应及响应数据反序列化的线程不同。
消费者发出请求,立即拿到CompletableFuture
对象,不会在发送方法处阻塞;
消费者线程自动调用ThreadlessExecutor.waitAndDrain()
,在线程池任务队列上等待任务到来,此时消费者线程阻塞;
当收到响应时,IO线程完成响应头反序列化,并生成响应数据部分反序列化任务,填充到ThreadlessExecutor
任务队列中;
线程池中任务将由消费者线程执行,得到业务结果之后,调用Future.set()
方法进行设置,之后waitAndDrain()
方法返回;
消费者线程从Future
中拿到结果值。
消费者发出请求,立即拿到CompletableFuture
对象,不会在发送方法处阻塞;
直接返回CompletableFuture
对象,此时消费者线程不会阻塞;
当收到响应时,IO 线程完成响应头反序列化,并生成响应数据部分反序列化任务提交到该请求对应的共享线程池中;
数据部分反序列化任务完成后,调用Future.set()
方法进行设置,并唤醒等待在CompletableFuture
上线程,执行回调;
消费者线程从Future
中拿到结果值。
发起请求将由AbstractInvoker#invoke
方法开始执行,该发送方法为异步发送,在发出请求后,立即拿到AsyncRpcResult
对象,表示未完成的异步请求结果,消费者线程间不会在发送方法处阻塞。
之后根据请求是否是同步请求,决定是否在AsyncRpcResult
上等待响应到来。如果是同步请求,消费者线程将阻塞,直至响应到来;如果是异步请求,将直接返回,不必等待。
1// AbstractInvoker#invoke
2// 执行发送请求流程
3public Result invoke(Invocation inv) throws RpcException {
4 RpcInvocation invocation = (RpcInvocation) inv;
5 prepareInvocation(invocation);
6 // 发送请求并获得异步返回值
7 AsyncRpcResult asyncResult = doInvokeAndReturn(invocation);
8 // 如果是同步请求,将等待响应到来再返回
9 waitForResultIfSync(asyncResult, invocation);
10 return asyncResult;
11}
12// AbstractInvoker#doInvokeAndReturn
13// 发送请求并获得异步返回值
14 private AsyncRpcResult doInvokeAndReturn(RpcInvocation invocation) {
15 AsyncRpcResult asyncResult = (AsyncRpcResult) doInvoke(invocation);
16 return asyncResult;
17 }
发送请求之后的AbstractInvoker#doInvokeAndReturn
方法,将调用AbstractInvoker#doInvoke
抽象方法,该方法具体由子类实现,不同的RPC协议对应不同的AbstractInvoker
实现类。下面以dobbo
协议的DubboInvoker
为例:
如果是单向通信,直接将Invocation
包装成oneway
类型的请求发送,并立即返回内容为空的RpcResult
,将不会有线程等待调用返回;
如果是双向通信,将获取用于执行等待响应到来任务的线程池executor
,并创建DefaultFuture
,表示异步响应,executor
中线程在DefaultFuture
上等待响应返回,并执行回调等逻辑。然后DefaultFuture
会被封装成AsyncRpcResult
并立即返回,发送过程异步实现,不会在发送处柱塞。
x
1// Dubbo协议对应的AbstractInvoker实现类
2public class DubboInvoker<T> extends AbstractInvoker<T> {
3 // 异步发送请求,并获得异步返回值包装类
4 protected Result doInvoke(final Invocation invocation) throws Throwable {
5 RpcInvocation inv = (RpcInvocation) invocation;
6 // 是否单向通信,不关心调用是否成功与返回值
7 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
8 // 异步无返回值
9 if (isOneway) {
10 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
11 request.setTwoWay(false);
12 // 直接将Invocation包装成oneway类型的Request发送出去,不会创建DefaultFuture
13 currentClient.send(request, isSent);
14 // 返回异步默认RpcResult
15 return AsyncRpcResult.newDefaultAsyncResult(invocation);
16 } else {
17 // 异步有返回值
18 request.setTwoWay(true);
19 // 设置执行等待响应到来任务的线程池,同步和异步请求的线程池将不同
20 ExecutorService executor = getCallbackExecutor(getUrl(), inv);
21 // 创建请求的DefaultFuture对象,executor中线程将在DefaultFuture上等待响应返回,并执行回调
22 CompletableFuture<AppResponse> appResponseFuture =
23 currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
24 // 将AppResponse封装成AsyncRpcResult返回
25 AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
26 result.setExecutor(executor);
27 return result;
28 }
29 }
30}
DefaultFuture
继承JDK中的CompletableFuture
,示未完成的请求结果。
内部维护CHANNELS
,用于管理请求与Channe
之间关联,以及FUTURES
用于管理请求与DefaultFuture
之间关联。
初始化对象时,将完成请求信息存储,并创建定时到期检测任务。创建DefaultFuture
对象时,可从传入的Request
对象中,获取调用编号,并将 <reqId, DefaultFuture>
映射关系存入到静态FUTURES
中,便于在异步发送模式下,后续响应到来后,通过调用编号,找到请求对应的DefaultFuture
。
xxxxxxxxxx
141public class DefaultFuture extends CompletableFuture<Object> {
2 // 请求与`Channe`之间关联
3 private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();
4 // 请求与`DefaultFuture`之间关联
5 private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
6 // 请求ID与请求本身
7 private final Long id;
8 private final Request request;
9 private final Channel channel;
10 // 请求关联的线程池
11 private ExecutorService executor;
12 // 定时到期检测任务
13 private Timeout timeoutCheckTask;
14}
发送请求的doInvoke
方法异步执行,但消费者可以选择同步或者异步发送请求。dubbo
通过使用不同的用于等待响应到来任务的线程池参数,实现消费者端的同步和异步发送。
doInvoke
中获取线程池的方法getCallbackExecutor
的InvokeMode
可分为: SYNC
(默认调用模式)、ASYNC
和FUTURE
。SYNC
模式下将返回ThreadlessExecutor
线程池,另外两种调用模式属于异步调用模式,会根据URL选择对应的共享线程池。
xxxxxxxxxx
111// AbstractInvoker#getCallbackExecutor
2// 根据请求模式获取请求执行等待及回调的线程池
3protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
4 // 同步调用
5 if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
6 return new ThreadlessExecutor();
7 }
8 // 异步调用
9 return ExecutorRepository.getInstance(url.getOrDefaultApplicationModel())
10 .getExecutor(url);
11}
其中ThreadlessExecutor
是一种特殊线程池,其内部不管理任何线程。当执行execute
方法时,只是将任务存储在任务队列中,不会被调度到任何线程执行。当其他线程调用ThreadlessExecutor.waitAndDrain()
方法等待并执行任务时,将由调用waitAndDrain()
方法的线程执行队列中任务。
xxxxxxxxxx
361public class ThreadlessExecutor extends AbstractExecutorService {
2 // 存储提交的任务
3 private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
4 // 保存正在等待任务的线程
5 private final AtomicReference<Object> waiter = new AtomicReference<>();
6
7 // 在指定的时间内等待任务的到来,并执行队列中的所有任务
8 public void waitAndDrain(long deadline) throws InterruptedException {
9 Runnable runnable = queue.poll();
10 // 等待任务到来或超时
11 while ((runnable = queue.poll()) == null && waiter.get() == Thread.currentThread()) {
12 long restTime = deadline - System.nanoTime();
13 if (restTime <= 0) {
14 return;
15 }
16 // 让线程等待一段时间,直到任务到来或超时
17 LockSupport.parkNanos(this, restTime);
18 }
19 // 循环执行队列中的任务,直到队列为空
20 do {
21 if (runnable != null) {
22 runnable.run();
23 }
24 } while ((runnable = queue.poll()) != null);
25 }
26
27 // 提交任务到任务队列
28
29 public void execute(Runnable runnable) {
30 // 将提交的任务封装成RunnableWrapper对象,并添加到队列
31 RunnableWrapper run = new RunnableWrapper(runnable);
32 queue.add(run);
33 // 唤醒正在等待任务的线程
34 LockSupport.unpark((Thread) waiter.get());
35 }
36}
当完成请求发送后,得到异步请求结果AsyncRpcResult
,表示未完成的RPC请求。之后将返回AbstractInvoker#invoke
继续执行AbstractInvoker#waitForResultIfSync
方法,等待请求到来。
如果是异步请求,将直接返回,不必等待;如果是同步请求,由消费者线程直接调用AsyncRpcResult#get
方式柱塞消费者线程,直至获得请求结果。
1// AbstractInvoker#waitForResultIfSync
2// 等待请求到来
3private void waitForResultIfSync(AsyncRpcResult asyncResult, RpcInvocation invocation) {
4 // 如果是异步请求,将直接返回,不必等待
5 if (InvokeMode.SYNC != invocation.getInvokeMode()) {
6 return;
7 }
8 Object timeoutKey = invocation.getObjectAttachmentWithoutConvert(TIMEOUT_KEY);
9 long timeout = RpcUtils.convertToNumber(timeoutKey, Integer.MAX_VALUE);
10 // 如果是同步请求,将在Future上等待响应到来再返回
11 asyncResult.get(timeout, TimeUnit.MILLISECONDS);
12 }
13}
同步请求在发送请求后,将在AsyncRpcResult#get
中使用ThreadlessExecutor
执行等待响应到来的任务,即是消费者线程执行等待任务,消费者线程将被阻塞,直至响应到来,之后在CompletableFuture
上获取响应结果并返回;
异步请求时,一般由消费者使用CompletableFuture
设置回调,由线程池线程执行AsyncRpcResult#get
方法,消费者线程不会阻塞。
xxxxxxxxxx
181public class AsyncRpcResult implements Result {
2 // 等待请求返回及执行回调的线程池
3 private Executor executor;
4 // 阻塞当前线程,等待响应到来
5 public Result get() throws InterruptedException, ExecutionException {
6 // 同步调用方式等待响应到来
7 if (executor instanceof ThreadlessExecutor) {
8 ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
9 // 检测调用结果是否返回
10 while (!responseFuture.isDone() && !threadlessExecutor.isShutdown()) {
11 // ThreadlessExecutor.waitAndDrain()方法将柱塞当前线程,同步等待
12 threadlessExecutor.waitAndDrain(Long.MAX_VALUE);
13 }
14 }
15 // 结束阻塞或者异步请求,获取响应
16 return responseFuture.get();
17 }
18}
发送请求的ExchangeClient#request
方法执行消息编码,在编码后再发送出去。
数据包分为消息头和消息体。消息头用于存储元信息:魔数、数据包类型(Request/Response)、调用方式(单向调用/双向调用)、序列化器、请求编号、消息体长度;消息体中用于存储具体的调用消息:服务名、服务版本、方法名、参数列表。
消息头将通过ExchangeCodec#encodeRequest
方法编码后以NIO的方式写入请求数据,消息实参通过DubboCodec#encodeRequestData
方法使用第三方序列化组件完成编码。
xxxxxxxxxx
261// DubboCodec#encodeRequestData
2// 对消息头和消息体编码
3protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version)
4 throws IOException {
5 RpcInvocation inv = (RpcInvocation) data;
6 // 要请求的服务名和版本
7 out.writeUTF(version);
8 String serviceName = inv.getAttachment(INTERFACE_KEY);
9 if (serviceName == null) {
10 serviceName = inv.getAttachment(PATH_KEY);
11 }
12 out.writeUTF(serviceName);
13 out.writeUTF(inv.getAttachment(VERSION_KEY));
14 // 服务的方法和参数列表
15 out.writeUTF(inv.getMethodName());
16 out.writeUTF(inv.getParameterTypesDesc());
17 Object[] args = inv.getArguments();
18 if (args != null) {
19 for (int i = 0; i < args.length; i++) {
20 // 使用序列化组件对消息实参进行序列化
21 out.writeObject(callbackServiceCodec.encodeInvocationArgument(channel, inv, i));
22 }
23 }
24 out.writeAttachments(inv.getObjectAttachments());
25}
26
服务端收到请求后,在ExchangeCodec#decode
中,将通过检测消息头中的魔数是否与规定的魔数相等,以及收到数据长度是否和请求头中说明的请求头相等,提前拦截掉非常规数据包。
最后调DecodeableRpcInvocation#decode
进行后续的解码工作,得到服务方法名、调用参数列表,最终获得完整请求request
对象。
xxxxxxxxxx
211// DecodeableRpcInvocation#decode
2// 对请求解码
3public Object decode(Channel channel, InputStream input) throws IOException {
4 // 获得服务路径、版本、方法、参数类型
5 String path = in.readUTF();
6 setAttachment(PATH_KEY, path);
7 String version = in.readUTF();
8 setAttachment(VERSION_KEY, version);
9 String keyWithoutGroup = keyWithoutGroup(path, version);
10 checkPayload(keyWithoutGroup);
11 setMethodName(in.readUTF());
12 String desc = in.readUTF();
13 setParameterTypesDesc(desc);
14 // somecode
15 // 设置参数类型数组
16 setParameterTypes(pts);
17 // 解析运行时参数
18 decodeArgument(channel, pts, args);
19 return this;
20}
21
AllChannelHandler
作为响应请求的入口,将消费者请求对象Request
封装到ChannelEventRunnable
中,dubbo
根据不同任务分发策略,依据请求类型,决定ChannelEventRunnable
由任务提交线程即当前IO线程执行,还是提交到线程池执行。
策略 | 用途 |
---|---|
all | 所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等。默认分发策略 |
direct | 所有消息都不派发到线程池,全部在 IO 线程上直接执行 |
message | 只有请求和响应消息派发到线程池,其它消息均在 IO 线程上执行 |
execution | 只有请求消息派发到线程池,不含响应。其它消息均在 IO 线程上执行 |
connection | 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池 |
在ChannelEventRunnable
在被线程池中被调度执行时,先完成对请求数据的解码,然后执行HeaderExchangeHandler#received
,执行后续服务调用逻辑。
xxxxxxxxxx
361// HeaderExchangeHandler#received
2// 处理服务调用
3public void received(Channel channel, Object message) throws RemotingException {
4 final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
5 if (message instanceof Request) {
6 // 处理请求对象
7 Request request = (Request) message;
8 // 双向通信
9 if (request.isTwoWay()) {
10 // 向后调用服务,并返回调用结果,关注调用结果并形成 Response 返回给客户端
11 handleRequest(exchangeChannel, request);
12 } else {
13 // 单向通信,直接交给上层的 DubboProtocol.requestHandler,不会返回任何 Response。
14 handler.received(exchangeChannel, request.getData());
15 }
16 }
17}
18// HeaderExchangeHandler#handleRequest
19// 处理服务调用
20void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
21 // 响应对象
22 Response res = new Response(req.getId(), req.getVersion());
23 // 获取 RpcInvocation 对象
24 Object msg = req.getData();
25 // 异步执行结果
26 CompletionStage<Object> future = handler.reply(channel, msg);
27 // 请求处理完成后回调
28 future.whenComplete((appResult, t) -> {
29 // 设置请求处理状态
30 res.setStatus(Response.OK);
31 // 设置调用结果
32 res.setResult(appResult);
33 // 发送请求响应
34 channel.send(res);
35 });
36}
服务端服务实例对象被封装成DubboInvoker
对象,DubboInvoker
是AbstractInvoker
的实现类,实现自己的doInvoke
方法。当服务端收到请求时,从导出服务容器exporterMap
中取出请求的DubboInvoker
,并调用invoke()
处理请求。
1// DubboProtocol#ExchangeHandlerAdapter#reply
2// 回复请求
3public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
4 Invocation inv = (Invocation) message;
5 // 从导出服务容器`exporterMap `中获取 Invoker 实例
6 Invoker<?> invoker = inv.getInvoker() == null ? getInvoker(channel, inv) : inv.getInvoker();
7 RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());
8 // 通过 Invoker 调用具体的服务
9 Result result = invoker.invoke(inv);
10 return result.thenApply(Function.identity());
11}
12
13// DubboProtocol#getInvoker
14// 获取请求服务对用的Invoker
15Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
16 // 计算 service key,格式为 groupName/serviceName:serviceVersion:port
17 String serviceKey = serviceKey(port, path, (String) inv.getObjectAttachmentWithoutConvert(VERSION_KEY), (String)
18 inv.getObjectAttachmentWithoutConvert(GROUP_KEY));
19 // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
20 DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
21 Invoker<?> invoker = exporter.getInvoker();
22 inv.setServiceModel(invoker.getUrl().getServiceModel());
23 return invoker;
24}
执行请求响应的Invoker#invoke
方法定义在 AbstractProxyInvoker
中,执行服务调用,并构建异步请求响应。
xxxxxxxxxx
141// AbstractProxyInvoker#invoke
2// 执行服务调用,并构建有异步请求响应
3public Result invoke(Invocation invocation) throws RpcException {
4 // 调用 doInvoke 执行后续的调用
5 Object value = doInvoke(
6 proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments());
7 // 构建有异步请求响应
8 CompletableFuture<Object> future = wrapWithFuture(value, invocation);
9 CompletableFuture<AppResponse> appResponseFuture = future.handle((obj, t) -> {
10 result.setValue(obj);
11 return result;
12 });
13 return new AsyncRpcResult(appResponseFuture, invocation);
14 }
AbstractProxyInvoker#doInvoke
是一个抽象方法,由具体的Invoker
实例实现,而Invoker
实例是在运行时通过 JavassistProxyFactory
动态创建。在getInvoker
方法中将创建具备doInvoke
方法的Invoker
。
1// JavassistProxyFactory#getInvoker
2// 创建具备`doInvoke`方法的`Invoker`
3public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
4 // 创建匿名类对象
5 final Wrapper wrapper =
6 Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
7 return new AbstractProxyInvoker<T>(proxy, type, url) {
8
9 protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments)
10 throws Throwable {
11 // 调用 wrapper.invokeMethod 方法进行真正的服务调用
12 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
13 }
14 };
15 }
16}
doInvoke
方法中实际上使用了Wrapper.invokeMethod
去真正执行服务调用,Wrapper
是一个抽象类,其中invokeMethod
是一个抽象方法。Dubbo
会在运行时通过Javassist
框架为Wrapper
生成实现类,并实现invokeMethod
方法,在方法中根据请求信息找到并调用具体的服务。以`HelloServiceImpl
为例,Javassist
为其生成的Wrapper#invokeMethod
如下
xxxxxxxxxx
121// Wrapper#invokeMethod
2// 根据请求信息找到并调用具体的服务
3public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
4 // 具体要被调用的服务
5 HelloService helloService;
6 helloService = (HelloService)object;
7 // 根据方法名调用指定的方法
8 if ("sayHello".equals(string) && arrclass.length == 1) {
9 // 真正的调用服务
10 return helloService.sayHello((String)arrobject[0]);
11 }
12}
服务提供方调用指定服务后,会调用结果封装到Response
对象中,并将该对象返回给服务消费方。服务消费方在收到响应数据后,首先对响应数据的数据头进行解码,并将待解码的数据体转换得到DecodeableRpcResult
对象(不在IO线程完成数据体的解码是为了防止IO线程被柱塞),向下游传递Response
对象。
xxxxxxxxxx
111// DubboCodec
2protected Object decodeBody(Channel channel, InputStream is, byte[] header){
3 // 解码响应数据头,得到请求编号
4 long id = Bytes.bytes2long(header, 4);
5 Response res = new Response(id);
6 // 响应数据部分待解码,由线程池中由线程池中线程完结解码,防止IO线程被柱塞
7 DecodeableRpcResult result=xxx;
8 data = result;
9 res.setResult(data);
10 return res;
11 }
当响应到来时,消费者侧管道Channel
就绪,可以进行数据读取,触发AllChannelHandler#received
方法,将后续ChannelHandler.received
方法的调用封装成任务,提交到与该请求对应的DefaultFuture
内部的线程池中。
在后续处理中由该响应对应的线程池,完成响应数据的解码,并将结果放入该响应对应的DefaultFuture
。
x
1// AllChannelHandler#received
2// 将后续`ChannelHandler.received`方法的调用封装成任务提交到线程池
3public void received(Channel channel, Object message) throws RemotingException {
4 ExecutorService executor = getPreferredExecutorService(message);
5 executor.execute(new ChannelEventRunnable(channel, handler,
6 }
7}
8// WrappedChannelHandler#getPreferredExecutorService
9// 获取执行ChannelHandler.received任务的线程池
10public ExecutorService getPreferredExecutorService(Object msg) {
11 if (msg instanceof Response) {
12 Response response = (Response) msg;
13 // 获取当前响应对应请求的DefaultFuture,返回其内部的ExecutorService
14 DefaultFuture responseFuture = DefaultFuture.getFuture(response.getId());
15 ExecutorService executor = responseFuture.getExecutor();
16 return executor;
17 } else {
18 return getSharedExecutorService(msg);
19 }
20}
发送请求时,得到DefaultFuture
对象表示未完成的请求结果,DefaultFuture
对象可从传入的Request
对象中,获取调用编号,并将 <reqId, DefaultFuture>
映射关系存入到静态Map
中。
在收到Response
对象后,根据Response
对象中的调用编号到FUTURES
集合中取出相应的DefaultFuture
对象,唤醒柱塞在DefaultFuture#get
上的线程。
xxxxxxxxxx
121public class DefaultFuture extends CompletableFuture<Object> {
2 private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
3 // 获得响应对应的请求,唤醒等待线程
4 public static void received(Channel channel, Response response, boolean timeout) {
5 // 根据调用编号从 FUTURES 集合中查找指定的 DefaultFuture 对象
6 // 服务消费方的线程池会收到多个响应对象,通过编号,将响应发送给正确的请求方
7 DefaultFuture future = FUTURES.remove(response.getId());
8 // 唤醒柱塞在`DefaultFuture#get`上的线程
9 future.doReceived(response);
10 shutdownExecutorIfNeeded(future);
11 }
12}
DefaultFuture
继承JDK中的CompletableFuture
,DefaultFuture#doReceived
方法将调用CompletableFuture#postComplete
用于唤醒阻塞线程。
postComplete
工作流程较为复杂,可以简单概述其流程为:
将任务间依赖关系构建为一颗多叉树,CompletableFuture
中的stack
字段,保存依赖该任务完成的后续任务,可以将其理解为当前节点的子节点。
使用DFS
方法遍历该多叉树,首先构建任务栈STACK
,其被初始化为根节点,节点x
出栈表示该任务前置依赖已完成,可以执行。
之后将x
的全部子节点入栈,继续执行节点遍历,直至栈为空。
xxxxxxxxxx
311public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
2 volatile Object result;
3 // 等待当前CompletableFuture完成的后置任务
4 volatile Completion stack;
5
6 // DFS方式唤醒依赖依赖图中,等待当前CompletableFuture完成的后置任务
7 final void postComplete() {
8 // f=当前处理的任务;h=依赖f完成的后置任务
9 CompletableFuture<?> f = this; Completion h;
10 while ((h = f.stack) != null ||
11 (f != this && (h = (f = this).stack) != null)) {
12 // d=依赖f完成的某个后置任务; t=stack中d的下一个后置任务
13 CompletableFuture<?> d; Completion t;
14 // 将 f 的栈顶任务 h 出栈
15 if (STACK.compareAndSet(f, h, t = h.next)) {
16 // h 还不是 f 栈中的最后一个任务
17 if (t != null) {
18 if (f != this) {
19 // 如何已切换到后置任务,将f的栈顶任务h压入this栈中
20 pushStack(h);
21 continue;
22 }
23 // 将STACK的NEXT从指向h,改为指向t
24 NEXT.compareAndSet(h, t, null);
25 }
26 // 直接执行任务 h,并尝试让f指向依赖h任务的节点,如果不存在则再次指向自身
27 f = (d = h.tryFire(NESTED)) == null ? this : d;
28 }
29 }
30 }
31}
当任务执行时,将唤醒等待在CompletableFuture
上的线程,对于同步调用场景,消费者线程将从DefaultFuture#get
返回,并获得响应值;对于异步调用场景,设置在DefaultFuture
上面的回调将被触发,并可以获取到响应结果,至此完成一次完整调用。