Linux安全网 - Linux操作系统_Linux 命令_Linux教程_Linux黑客

会员投稿 投稿指南 本期推荐:
搜索:
您的位置: Linux安全网 > Linux集群 > Architecture > » 正文

dubbo源代码阅读

来源: attend 分享至:

一,自定义的spring配置

基于sping 扩展schma 利用 DubboNamespaceHandler 实现对自定义schema的解析。见配置文件:spring.handlers spring.schemas

二,Consumer对于服务接口的透明调用

基于Javassist的动态代理模式,自动生成代理类。

通过InvokerInvocationHandlerinvoker调用:

return invoker.invoke(new RpcInvocation(method, args)).recreate();

invoker RPC通信,基于minanetty等。

三,dubbo扩展机制

实现方式类似sunspi模式,实现自身的可扩展性。简单实现了接口的注入。

1Extension 注解 value=组件的名字

具体实现见ExtensionLoader

2,主要方法:

loadExtensionClasses 加载所有实现了META-INF/services目录下文件中的类,文件名为接口名。根据Extension注解的名字为keyCLASSVALUE放到缓存的MAP中。

getAdaptiveExtension 利用代码生成创建一下接口的适配器类:

Protocol

Cluster

ProxyFactory

等等

这个适配器类以Adaptive注解声明的值或者接口名为KEY,从URL中的参数或者URL getProtocol() 作为key的值,

然后ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(key ) 获得相应的实例。

getExtension(STR)方法获得接口实例,第一次取时创建,并按顺序实例化包装类,并把最后一个包装类返回。

Class<?> clazz = getExtensionClasses().get(name);

if (clazz == null) {

throw new IllegalStateException("No such extension " + type.getName() + " by name " + name);

}

try {

//实例化并自动注入一些接口的适配器

T instance = injectExtension((T) clazz.newInstance());

Set<Class<?>> wrapperClasses = cachedWrapperClasses;

if (wrapperClasses != null && wrapperClasses.size() > 0) {

for (Class<?> wrapperClass : wrapperClasses) {

instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));

//injectExtension 实例化包装类,并注入接口的适配器, 注意这个地方返回的是最后一个包装类。

}

}

return instance;

} catch (Throwable t) {

throw new IllegalStateException("Extension instance(name: " + name + ", class: " +

type + ") could not be instantiated: " + t.getMessage(), t);

}

比如获得Protocol接口的实例 private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); 过程如下:

ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension() 利用javassist动态生成一个Protocol$Adaptive的实例,

生成的class反编译后代码如下:

//消费者调用这个方法获得Invoker

public Invoker refer(Class paramClass, URL paramURL)

throws RpcException

{

if (paramURL == null)

throw new IllegalArgumentException("url == null");

URL localURL = paramURL;

//默认的协议dubbo

String str = (localURL.getProtocol() == null) ? "dubbo" : localURL.getProtocol();

if (str == null)

throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + localURL.toString() + ") use keys([protocol])");

/*注意这个地方获得的Protocol实例是:

*com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper 包装类。主要是为了实现

*invoker的包装。

*com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper->

*com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper->

*com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol

*层层包装创建ListenerInvokerWrapper,实现InvokerListener调用;创建InvokerChain 实现Filter的调用。

*默认的客户端过滤链 "consumercontext", "compatible", "deprecated", "collect", "genericimpl", "activelimit", "monitor", "future"

*com.alibaba.dubbo.rpc.RpcConstants.DEFAULT_REFERENCE_FILTERS

*/

Protocol localProtocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(str);

return localProtocol.refer(paramClass, paramURL);

}

//服务提供者调用这个方法,发布服务。

public Exporter export(Invoker paramInvoker)

throws RpcException

{

if (paramInvoker == null)

throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");

if (paramInvoker.getUrl() == null)

throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");

URL localURL = paramInvoker.getUrl();

String str = (localURL.getProtocol() == null) ? "dubbo" : localURL.getProtocol();

if (str == null)

throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + localURL.toString() + ") use keys([protocol])");

/*这个地方获得的Protocol实例同样是

*com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper 包装类,实现对于对

*invoker的包装

*加载的服务端过滤链,默认是 "context", "token", "exception", "echo", "generic", "accesslog", "trace", "classloader", "executelimit", "monitor" ,"timeout"

*com.alibaba.dubbo.rpc.RpcConstants.DEFAULT_SERVICE_FILTERS

*/

Protocol localProtocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(str);

return localProtocol.export(paramInvoker);

}

之所以默认获得的Protocol实例是com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper,因为在配置文件中

META-INF/services/com.alibaba.dubbo.rpc.Protocol文件内容如下:

com.alibaba.dubbo.registry.support.RegistryProtocol

com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper

com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper --最后一个包装类

com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol

com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol

com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol

com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol

四,Wrapper帮助类

提高调用某一个类的某一个方法的性能(避免反射调用)

使用javassist动态生成一个Wrapper的子类,实现抽象方法invokeMethod

/**

* invoke method.

*

* @param instance instance.

* @param mn method name.

* @param types

* @param args argument array.

* @return return value.

*/

abstract public Object invokeMethod(Object instance, String mn, Class<?>[] types, Object[] args) throws NoSuchMethodException, InvocationTargetException;

生成的class的代码类似

public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)

throws InvocationTargetException

{

RegistryService localRegistryService;

try

{

localRegistryService = (RegistryService)paramObject;

}

catch (Throwable localThrowable1)

{

throw new IllegalArgumentException(localThrowable1);

}

try

{

if ("register".equals(paramString))

{

localRegistryService.register((URL)paramArrayOfObject[0]);

return null;

}

if ("subscribe".equals(paramString))

{

localRegistryService.subscribe((URL)paramArrayOfObject[0], (NotifyListener)paramArrayOfObject[1]);

return null;

}

if ("unregister".equals(paramString))

{

localRegistryService.unregister((URL)paramArrayOfObject[0]);

return null;

}

if ("unsubscribe".equals(paramString))

{

localRegistryService.unsubscribe((URL)paramArrayOfObject[0], (NotifyListener)paramArrayOfObject[1]);

return null;

}

}

catch (Throwable localThrowable2)

{

throw new InvocationTargetException(localThrowable2);

}

throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class com.alibaba.dubbo.registry.RegistryService.");

}

直接动态生成了一个子类,没有通过反射调用。

Consumer 服务消费方分析

dubbo-demo-simple-consumer的源码为分析的起点,解析Consumer的运行流程。

1,配置文件的解析

<!-- 引用远程服务配置 -->

<dubbo:reference id="demoService" interface="com.alibaba.dubbo.demo.DemoService"/>

首先dubbo基于springschma扩展机制实现了自定义的命名空间定义和配置的解析。

源码见:com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler

com.alibaba.dubbo.config.spring.schema.DubboBeanDefinitionParser

registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));

DubboBeanDefinitionParserReferenceBean实现对dubbo:reference的解析。

2,获得服务代理

DemoService demoService = (DemoService)context.getBean("demoService"); ReferenceBean作为FactoryBean实现DemoService接口的代理对象的创建,见源码:

com.alibaba.dubbo.config.spring.ReferenceBean.getObject()
com.alibaba.dubbo.config.ReferenceConfig.get()
com.alibaba.dubbo.config.ReferenceConfig.init()

ReferenceConfig.java

获得protocol,cluster,proxyFactory接口的实例,实际调用如下:

protocol --Protocol$Adaptive- com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper--

cluster --com.alibaba.dubbo.rpc.cluster.support.FailoverCluster

proxyFactory--com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory

createProxy 方法解析:

I,获得URL,从dubbo:reference url属性或者从loadRegistries();通过注册中心配置拼装URL。设置URLprotocolConstants.REGISTRY_PROTOCOL registry

II获得 invoker = protocol.refer(interfaceClass, urls.get(0));

Protocol$Adaptive-->根据URL的协议获得 com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper->com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper-->com.alibaba.dubbo.registry.support.RegistryProtocol

refer(interfaceClass, urls.get(0)) -> com.alibaba.dubbo.registry.support.RegistryProtocol.refer->

1RegistryFactory--getRegistry RegistryFactory根据url配置可能是DubboRegistryFactoryMulticastRegistryFactoryZookeeperRegistryFactory

2new RegistryDirectory 创建注册中心目录服务

3registry.subscribe 订阅服务,--rpc远程访问registryService,RegistryDirectory作为 NotifyListener

-->RegistryDirectory.notify(urls)-->urls-toInvokers //此时生成了接口及接口的方法对应的invoker列表

4,cluster.merge(directory) 默认FailoverCluster生成FailoverClusterInvoker(RegistryDirectory)

创建invoker完成

III,创建代理(T) proxyFactory.getProxy(invoker) --JavassistProxyFactory-(T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));

使用Javassist创建了两个CLASS

public class Proxy1 extends Proxy

implements ClassGenerator.DC

{

public Object newInstance(InvocationHandler paramInvocationHandler)

{

return new proxy1(paramInvocationHandler);

}

}

public class proxy1

implements ClassGenerator.DC, EchoService, DemoService

{

public static Method[] methods;

private InvocationHandler handler;

public String sayHello(String paramString)

{

Object[] arrayOfObject = new Object[1];

arrayOfObject[0] = paramString;

Object localObject = this.jdField_handler_of_type_JavaLangReflectInvocationHandler.invoke(this, jdField_methods_of_type_ArrayOfJavaLangReflectMethod[0], arrayOfObject);

return ((String)localObject);

}

public Object $echo(Object paramObject)

{

Object[] arrayOfObject = new Object[1];

arrayOfObject[0] = paramObject;

Object localObject = this.jdField_handler_of_type_JavaLangReflectInvocationHandler.invoke(this, jdField_methods_of_type_ArrayOfJavaLangReflectMethod[1], arrayOfObject);

return ((Object)localObject);

}

public proxy1(InvocationHandler paramInvocationHandler)

{

this.jdField_handler_of_type_JavaLangReflectInvocationHandler = paramInvocationHandler;

}

}

返回proxy1xxx接口的子类。

4,动态代理背后的故事,以dubbo协议的通信为例

从生成的proxy1的代码我们可以看到 sayHello(String str)时调用了 InvokerInvocationHandler.invoke(Object proxy, Method method, Object[] args)

-----invoker.invoke(new RpcInvocation(method, args)).recreate();

这里的invoker是对 com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker 的层层包装,实现负载均衡、失败转移(FailoverClusterInvoker)、InvokerListenerInvokerChain的顺序调用。

Protocol.refer生成。

如:FailoverClusterInvoker-

public Result invoke(final Invocation invocation) throws RpcException {

if(destroyed){

throw new RpcException("Rpc invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()

+ " use dubbo version " + Version.getVersion()

+ " is not destroyed! Can not invoke any more.");

}

LoadBalance loadbalance;

List<Invoker<T>> invokers = directory.list(invocation);//从服务目录中找到所有的invoker,处理了router ,目前router只有ScriptRouter实现。

if (invokers != null && invokers.size() > 0) {

//加载负载均衡算法

loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()

.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));

} else {

loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);

}

return doInvoke(invocation, invokers, loadbalance);

}

public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {

if (invokers == null || invokers.size() == 0)

throw new RpcException("No provider available for service " + getInterface().getName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", Please check whether the service do exist or version is right firstly, and check the provider has started.");

int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;

if (len <= 0)

len = 1;

// retry loop.

RpcException le = null; // last exception.

List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(invokers.size()); // invoked invokers.

Set<URL> providers = new HashSet<URL>(len);

//挨个试,如果失败就继续。实现Failover

for (int i = 0; i < len; i++) {

//boolean pp = false; // is provider problem.

Invoker<T> invoker = select(loadbalance, invocation, invokers, invoked);

invoked.add(invoker);

providers.add(invoker.getUrl());

try {

return invoker.invoke(invocation);

} catch (RpcException e) {

if (e.isBiz()) throw e;

le = e;

//pp = true;

} catch (Throwable e) // biz exception.

{

throw new RpcException(e.getMessage(), e);

} finally {

//if (pp) // if provider problem, fail over.

// inv.setWeight(0);

}

}

List<URL> urls = new ArrayList<URL>(invokers.size());

for(Invoker<T> invoker : invokers){

if(invoker != null )

urls.add(invoker.getUrl());

}

throw new RpcException(le.getCode(),"Tried " + len + " times to invoke providers " + providers + " " + loadbalance.getClass().getAnnotation(Extension.class).value() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + (le != null ? le.getMessage() : ""), le);

}

}

再来具体看看实际执行远程调用DubboInvoker

DubboInvokercom.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol 创建-

public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {

// find client.

int channels = url.getPositiveParameter(Constants.CONNECTIONS_KEY, 1);

ExchangeClient[] clients = new ExchangeClient[channels];

if ( channels == 1){

clients[0] = getOrInitClient(url);

} else {

for (int i = 0; i < clients.length; i++) {

clients[i] = getOrInitClient(url); //默认初始化了一个LazyConnectExchangeClient-->init Exchangers.connect(url, requestHandler);

}

}

System.out.println(serviceType.getName()+":"+clients.length);

// create rpc invoker.

DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, clients);

invokers.add(invoker);

return invoker;

}

Protocol里创建了,ExchangeClient,及 requestHandler

DubboInvoker.doInvoke(final Invocation invocation) --> (Result) currentClient.request(inv, timeout).get(); 调用ExchangeClient发起请求

默认通过NETTY框架通信。

com.alibaba.dubbo.remoting.transport.netty.NettyClient

对协议的encode,decode实现:

com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec 对于配置文件中的

<dubbo:method name="subscribe">

<dubbo:argument index="1" callback="true" />

</dubbo:method>

callbacktrue的,DubboCodec在服务端自动生成参数代理, 通过RPC远程调用消费者的方法。此时的invokerChannelWrappedInvoker

ChannelWrappedInvoker发起的请求,由DubboProtocolrequestHandler处理 received--

if (message instanceof Invocation) {

reply((ExchangeChannel) channel, message);

}

比如消费者 subscribe 时,com.alibaba.dubbo.registry.support.SimpleRegistryService 处理完成后要调用消费者的NotifyLisenter .notify(urls)

消费者在发送请求时,DubboCodec根据URL中配置的方法的某一个参数的callback属性是否为true自动发布服务,以接受服务端的callback

TODO

六、服务提供者分析

七、注册中心服务分析

总体上说,dubboRPC做了很好的封装,能够实现透明的远程调用,在消费端实现了对于服务端调用的负载均衡算法、支持服务的集群并且提供了监控接口,可通过WEB界面了解服务的情况(请求次数、有哪些服务、服务有几个提供者)等。 提供的注册中心支持服务的注册、取消注册,并通知消费端服务列表的变更。

没时间排版,有兴趣的凑合着看吧。

<!--EndFragment-->

Tags:
分享至:
最新图文资讯
1 2 3 4 5 6
验证码:点击我更换图片 理智评论文明上网,拒绝恶意谩骂 用户名:
关于我们 - 联系我们 - 广告服务 - 友情链接 - 网站地图 - 版权声明 - 发展历史