Dubbo2.7源码分析-如何发布服务

Dubbo的服务发布逻辑是比较复杂的,我还是以Dubbo自带的示例讲解,这样更方便和容易理解。

Provider配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
 
    <!-- 提供方应用信息,用于计算依赖关系 -->
    <dubbo:application name="hello-world-app"  />
 
    <!-- 使用multicast广播注册中心暴露服务地址 -->
    <dubbo:registry address="multicast://224.5.6.7:1234" />
 
    <!-- 用dubbo协议在20880端口暴露服务 -->
    <dubbo:protocol name="dubbo" port="20880" />
 
    <!-- 声明需要暴露的服务接口 -->
    <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />
 
    <!-- 和本地bean一样实现服务 -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl" />
</beans>

ApplicationContext

ClassPathXmlApplicationContext父类AbstractApplicationContext的方法refresh()在实例化bean之后的最后一步finishRefresh()中,此方法作用是发布相应的事件。

	protected void finishRefresh() {
		//省略LifeCycleProcessor刷新代码

		// Publish the final event.
		publishEvent(new ContextRefreshedEvent(this));

		// 省略注册到 LiveBeansView MBean代码
	}

可以看到发布了一个ContextRefreshedEvent事件。

	protected void publishEvent(Object event,ResolvableType eventType) {
		//省略部分代码
	  getApplicationEventMulticaster().multicastEvent(applicationEvent,eventType);
            //省略部分代码

首先获取ApplicationEvent事件广播对象,然后广播事件。

ApplicationEvent事件广播对象默认是SimpleApplicationEventMulticaster,这个对象是在AbstractApplicationContext的方法initApplicationEventMulticaster()初始化的,如果需要自定义,可以实现接口ApplicationEventMulticaster,并将bean的名字命名为applicationEventMulticaster

接下来看看SimpleApplicationEventMulticaster类的multicastEvent方法。

	@Override
	public void multicastEvent(final ApplicationEvent event,ResolvableType eventType) {
                //事件类型
		ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
               
               //applicationListener
		for (final ApplicationListener<?> listener : getApplicationListeners(event,type)) {
                       //异常执行
			Executor executor = getTaskExecutor();
			if (executor != null) {
				executor.execute(new Runnable() {
					@Override
					public void run() {
						invokeListener(listener,event);
					}
				});
			}
			else {
				invokeListener(listener,event);
			}
		}
	}

可以看到此方法会调用applicationListener的方法,对于Dubbo而言,就是ServiceBean.

怎么样获取到ServiceBean的呢?

ServiceBean实现了好几个接口,其中有两个接口ApplicationContextAwareApplicationListener,其中ApplicationContextAware使ServiceBean具有获取ApplicationContext的能力(了解bean的生命周期),而ApplicationListener使ServiceBean具有响应事件响应的能力。dubbo实现ApplicationContextAware的目的是通过反射把自己添加到ApplicationContext的ApplicationListener列表中,即使不实现ApplicationContextAware接口,spring也会将实现了ApplicationListener接口的bean添加到其listener列表中的,dubbo这样做估计是向后兼容。

接着看invokeListener(listener,event);方法

protected void invokeListener(ApplicationListener<?> listener,ApplicationEvent event) {
		ErrorHandler errorHandler = getErrorHandler();
		if (errorHandler != null) {
			try {
				doInvokeListener(listener,event);
			}
			catch (Throwable err) {
				errorHandler.handleError(err);
			}
		}
		else {
			doInvokeListener(listener,event);
		}
	}

	private void doInvokeListener(ApplicationListener listener,ApplicationEvent event) {
		try {
			listener.onApplicationEvent(event);
		}
		catch (ClassCastException ex) {
			//省略异常处理
		}
	}

invokeListener方法内部调用了doInvokeListener方法,而doInvokeListener方法调用了listener(ServiceBean)的onApplicationEvent方法.

ServiceBean

    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (isDelay() && !isExported() && !isUnexported()) {
            if (logger.isInfoEnabled()) {
                logger.info("The service ready on spring started. service: " + getInterface());
            }
            export();
        }
    }

onApplicationEvent方法调用了export方法,export方法首先判断是否已经发布了服务,发布了则直接返回,没有发布则会判断是否需要延迟发布,如果需要延迟,则将发布服务做为一个任务添加到ScheduledThreadPoolExecutor线程池中,如果不延迟,则调用doExport方法立即发布服务。
doExport方法中会获取application/registries/monitor/module/protocols,并做一些检查和属性填充,然后调用doExportUrls();发布服务。doExportUrls()首先调用loadRegistries方法得到要注册的url,然后发布相关Protocol的服务。

简单叙述一下获取url的过程,url通过map组装参数和对应的值,参数有ApplicationConfigRegistryConfig对象的属性以及pathdubbotimestamppidprotocolregistry

本示例applicationConfig是:

<dubbo:application name="demo-provider" qosPort="22222" id="demo-provider" />

registryURL
registryConfig是:

<dubbo:registry address="multicast://224.5.6.7:1234" id="org.apache.dubbo.config.RegistryConfig" />

最终map组装结果是:

url parameters

最后得到registryURL是:

registry://224.5.6.7:1234/org.apache.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&pid=4892&qos.port=22222&registry=multicast&timestamp=1536112339884

然后调用doExportUrlsFor1Protocol方法发布服务,此方法开始部分是构造发布的服务URL,然后再发布url。

服务URL
URL包括以下几部分:服务端还是客户端标识Dubbo版本,时间戳,Pid,服务的方法名tokenApplicationConfig,MoudleConfig,ProviderConfig,ProtocolConfig,*MethodConfig对象的相关属性等。
例如本示例的url:

dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8004&qos.port=22222&side=provider&timestamp=1536114090787

我们来着重看一下在构造URL过程中port的获取过程。

//protocolConfig是配置的<dubbo:protocol />生成的对象
//name是protocol的name,本示例为"dubbo"
//map保存了url的键值对
Integer port = this.findConfigedPorts(protocolConfig,name,map);

findConfigedPorts顾名思义是查找配置的port,从哪查呢,先从系统环境变量查,如果没找到,再查找名字为name的protocol协义。

    private Integer findConfigedPorts(ProtocolConfig protocolConfig,String name,Map<String,String> map) {
        Integer portToBind = null;

        // 从环境变量从查找绑定的port
        String port = getValueFromConfig(protocolConfig,Constants.DUBBO_PORT_TO_BIND);
        portToBind = parsePort(port);

        // 如果没有从环境变量从查到,则从名称为name的protocol查找
        if (portToBind == null) {
            portToBind = protocolConfig.getPort();
            if (provider != null && (portToBind == null || portToBind == 0)) {
                portToBind = provider.getPort();
            }
           //这一句是关键,示例中name值是"dubbo",所以会实例化DubboProtocol,得到默认的port:20880
            final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
            if (portToBind == null || portToBind == 0) {
                portToBind = defaultPort;
            }
            if (portToBind == null || portToBind <= 0) {
                portToBind = getRandomPort(name);
                if (portToBind == null || portToBind < 0) {
                    portToBind = getAvailablePort(defaultPort);
                    putRandomPort(name,portToBind);
                }
                logger.warn("Use random available port(" + portToBind + ") for protocol " + name);
            }
        }

        //保存port到map中,以便后面url使用
        map.put(Constants.BIND_PORT_KEY,String.valueOf(portToBind));

        // 从环境变量中查找注册的port,如果没有找到,则等于绑定的Port.
        String portToRegistryStr = getValueFromConfig(protocolConfig,Constants.DUBBO_PORT_TO_REGISTRY);
        Integer portToRegistry = parsePort(portToRegistryStr);
        if (portToRegistry == null) {
            portToRegistry = portToBind;
        }
        return portToRegistry;
    }

有人或许有疑问,ServiceConfig在实例化时,不是已经加载过Protocol了吗?为什么还要使用ExtensionLoader加载一次呢?

final int defaultPort =ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();

答: ServiceConfig实例化时,加载的Protocol是自适应的Protocol,是动态生成的,类名是Protocol$Adaptive(见Dubbo源码分析-SPI的应用中有分析)。而这里获取Port时加载的也是Protocol类,但指名了具体加载的是哪个Protocol(本示例是名称为dubbo的Protocol,即DubboProtocol,此类默认的端口是20880)。

发布URL

发布本地服务

调用ServiceConfig类的exportLocal(URL url)发布本地服务。

    private void exportLocal(URL url) {
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
              //本地服务url
               URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST)
                    .setPort(0);
            
            
           ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref,(Class) interfaceClass,local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }

本示例的本地服务 url是:

injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=3008&qos.port=22222&side=provider&timestamp=1536125473655

重点看这一句:

Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref,local));

其中涉及到ProxyFactory和Protocol,下面分别来看一看。

ProxyFactory

proxyFactory也是通过SPI加载的自适应类对象,类名为ProxyFactory$Adaptive,我们来看一下其class文件反编译后的源码。

package org.apache.dubbo.rpc;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements ProxyFactory {
    public ProxyFactory$Adaptive() {
    }

    public Invoker getInvoker(Object var1,Class var2,URL var3) throws RpcException {
        if (var3 == null) {
            throw new IllegalArgumentException("url == null");
        } else {
            String var5 = var3.getParameter("proxy","javassist");
            if (var5 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");
            } else {
                ProxyFactory var6 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var5);
                return var6.getInvoker(var1,var2,var3);
            }
        }
    }

    public Object getProxy(Invoker var1,boolean var2) throws RpcException {
        if (var1 == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        } else if (var1.getUrl() == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        } else {
            URL var3 = var1.getUrl();
            String var4 = var3.getParameter("proxy","javassist");
            if (var4 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var3.toString() + ") use keys([proxy])");
            } else {
                ProxyFactory var5 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var4);
                return var5.getProxy(var1,var2);
            }
        }
    }

    public Object getProxy(Invoker var1) throws RpcException {
        if (var1 == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        } else if (var1.getUrl() == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        } else {
            URL var2 = var1.getUrl();
            String var3 = var2.getParameter("proxy","javassist");
            if (var3 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(" + var2.toString() + ") use keys([proxy])");
            } else {
                ProxyFactory var4 = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(var3);
                return var4.getProxy(var1);
            }
        }
    }
}

其中有三个方法,两个获取代理,一个获取Invoker。我们来看其中的getInvoker方法,默认获取名称为javassist的ProxyFactory。
由于本地服务URL中没有proxy参数,所以会调用JavassistProxyFactory的getInvoker(T proxy,Class type,URL url)方法,返回AbstractProxyInvoker的匿名类对象,此对象代理了服务对象(本示例中为DemoServiceImpl对象)。

其实(ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");获取到的并不是JavassistProxyFactory对象,而是StubProxyFactoryWrapper对象,为什么呢?我们可以看下ExtensionLoader的getExtension(String name)方法

   public T getExtension(String name) {
       //检查name是否合法
       if (name == null || name.length() == 0)
           throw new IllegalArgumentException("Extension name == null");
      //如果name等于true,则加载SPI的默认插件
     if ("true".equals(name)) {
         return getDefaultExtension();
      }
      //从当前插件类的缓存实例对象中获取
      Holder<Object> holder = cachedInstances.get(name);
      if (holder == null) {
         cachedInstances.putIfAbsent(name,new Holder<Object>());
         holder = cachedInstances.get(name);
     }
     Object instance = holder.get();
     if (instance == null) {
       synchronized (holder) {
             instance = holder.get();
             if (instance == null) {
                 //创建插件实例
                 instance = createExtension(name);
                 holder.set(instance);
             }
         }
     }
     return (T) instance;
 }

   private T createExtension(String name) {
      //从文件目录中加载插件类
      Class<?> clazz = getExtensionClasses().get(name);
      if (clazz == null) {
         throw findException(name);
    }
   
    //从已加载的所有插件实例集合中获取
     try {
         T instance = (T) EXTENSION_INSTANCES.get(clazz);
         if (instance == null) {
             //实例化插件实例,并放入集合
             EXTENSION_INSTANCES.putIfAbsent(clazz,clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
         
        //注入属性
        injectExtension(instance);

        //插件的包裹类
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
           for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ",class: " +
                type + ")  could not be instantiated: " + t.getMessage(),t);
    }
 }

重点的地方就在于插件的包裹类,StubProxyFactoryWrapper就是JavassistProxyFactory的包裹类,为什么这么说呢,因为StubProxyFactoryWrapper有一个带ProxyFactory参数的构造函数而且实现了ProxyFactory接口,具体可以看Extension的loadExtensionClasses方法源码(装饰者模式)。

Protocol

protocol对象也是一个自适应插件类,类名为Protocol$Adaptive,在上一篇文章中已有讲解。这个类会根据url的协义取得对应转义的插件类,没有的话,默认为dubbo协义,本地服务url协义为injvm,所以会加载InjvmProtocol,但是在加载InjvmProtocol并实例化后,发现InjvmProtocol还有对应的包裹类即(其实是所有Protocol的包裹类):ProtocolFilterWrapper和ProtocolListenerWrapper。ProtocolFilterWrapper类的作用是添加一些过滤器,ProtocolListenerWrapper的作用是添加ExporterListener。InjvmProtocol的export方法仅仅创建一个InjvmExporter实例,没有开启服务。

发布远程服务

如果注册url不为空,调用proxyFactory得到服务对象的代理类,然后使用protocol发布服务。由于注册url的协义是registry,在使用ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension("registry");会加载RegistryProtocol类并实例化,而且会添加其包裹类:ProtocolFilterWrapper和ProtocolListenerWrapper。而在这两个包裹类的export方法的首行,都会对registry协义进行单独处理。

RegistryProtocol
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }

经过这两个包裹类后,最终会调用RegistryProtocol的export方法。

    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //发布服务
       //originInvoker中包含了代理服务对象的代理类
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);

        //注册相关代码省略

        //订阅相关代码省略
    }

       private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
       
         //key为发布的服务url
        String key = getCacheKey(originInvoker);
        //从map缓存中获取
        ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
      //double check
        if (exporter == null) {
            synchronized (bounds) {
                exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
                if (exporter == null) {
                    final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker,getProviderUrl(originInvoker));
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete),originInvoker);
                    bounds.put(key,exporter);
                }
            }
        }
        return exporter;
    }

最重要的是这一句:

exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete),originInvoker);

其中protocol也是Protocol$Adaptive对象,而invokerDelegete的URL是服务的url.

本示例中为:

dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8468&qos.port=22222&side=provider&timestamp=1536138127517

DubboProtocol

Protocol$Adaptive在解析URL的时得到dubbo,所以会加载DubboProtocol并实例化(DubboProtocol实际在前面获取默认接口时已经实例化并缓存起来了,此处取的是缓存的实例),并调用了DubboProtocol的export方法(与上面一样,在得到DubboProtocol实例后,仍然会在外面包裹一下)。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // 服务名:本例中为org.apache.dubbo.demo.DemoService:20880
        String key = serviceKey(url);
       //exporter 控制服务打开与关闭
        DubboExporter<T> exporter = new DubboExporter<T>(invoker,key,exporterMap);
        exporterMap.put(key,exporter);

        //省略发布子服务的相关代码
       
       //打开服务
        openServer(url);
       //优化序列化处理
        optimizeSerialization(url);
        return exporter;
    }

经过层层探索,曲折迂回,终于到openServer了,进去看看。

    private void openServer(URL url) {
        // 服务ip:端口号
        String key = url.getAddress();
        
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        serverMap.put(key,createServer(url));
                    }
                }
            } else {
                // 服务支持重置
                server.reset(url);
            }
        }
    }

可以看到其中有一个重要方法createServer(url)。

    private ExchangeServer createServer(URL url) {
        // 当服务关闭时,默认启动发送只读事件
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,Boolean.TRUE.toString());
        // 默认启动心跳
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY,String.valueOf(Constants.DEFAULT_HEARTBEAT));

        //str默认为netty
        String str = url.getParameter(Constants.SERVER_KEY,Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ",url: " + url);
        
        //添加编解码器
        url = url.addParameter(Constants.CODEC_KEY,DubboCodec.NAME);
        ExchangeServer server;
        try {
            //启动服务,并传入请求处理器
            server = Exchangers.bind(url,requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(),e);
        }
       
         //判断客户端使用的是网络传输层框架是否支持服务端的网络传输层。
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
Exchangers

进入Exchangers.bind方法一探究竟。

    public static ExchangeServer bind(URL url,ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        //如果编码码器没有,则添加参数exchange
        url = url.addParameterIfAbsent(Constants.CODEC_KEY,"exchange");
        return getExchanger(url).bind(url,handler);
    }

getExchanger(url)默认得到的是HeaderExchanger,可通过exchanger参数配置。
到HeaderExchanger中看看bind方法

    public ExchangeServer bind(URL url,ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url,new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
Transporter

看下Transporters的bind方法。

    public static Server bind(URL url,ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
        return getTransporter().bind(url,handler);
    }

通过getTransporter方法获取一个自适应的Transporter,类名为Transporter$Adaptive,我们来看一下其源码:

package org.apache.dubbo.remoting;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;

public class Transporter$Adaptive implements Transporter {
    public Transporter$Adaptive() {
    }

    public Client connect(URL var1,ChannelHandler var2) throws RemotingException {
        if (var1 == null) {
            throw new IllegalArgumentException("url == null");
        } else {
            String var4 = var1.getParameter("client",var1.getParameter("transporter","netty"));
            if (var4 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.remoting.Transporter) name from url(" + var1.toString() + ") use keys([client,transporter])");
            } else {
                Transporter var5 = (Transporter)ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(var4);
                return var5.connect(var1,var2);
            }
        }
    }

    public Server bind(URL var1,ChannelHandler var2) throws RemotingException {
        if (var1 == null) {
            throw new IllegalArgumentException("url == null");
        } else {
            String var4 = var1.getParameter("server","netty"));
            if (var4 == null) {
                throw new IllegalStateException("Fail to get extension(org.apache.dubbo.remoting.Transporter) name from url(" + var1.toString() + ") use keys([server,transporter])");
            } else {
                Transporter var5 = (Transporter)ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(var4);
                return var5.bind(var1,var2);
            }
        }
    }
}

可以看到Transporter$Adaptive通过判断URL中是否有transporter参数,如果没有,就默认为netty。

示例中服务的URL为

dubbo://192.168.124.1:20880/org.apache.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.124.1&bind.port=20880&dubbo=2.0.2&generic=false&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&pid=8004&qos.port=22222&side=provider&timestamp=1536114090787

其中没有transporter参数,所以就使用netty。然后dubbo就去查找netty对应的是哪个Transporter,结果找到是NettyTransporter。

package org.apache.dubbo.remoting.transport.netty4;

//省略导入部分

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    @Override
    public Server bind(URL url,ChannelHandler listener) throws RemotingException {
        return new NettyServer(url,listener);
    }

    @Override
    public Client connect(URL url,ChannelHandler listener) throws RemotingException {
        return new NettyClient(url,listener);
    }

}

NettyTransporter很简单,只有两个方法,一个用于开启服务,一个用于连接服务。到这里已经明白了Dubbo是如何发布一个服务的。

我们再进一步看下NettyServer的构造函数

    public NettyServer(URL url,ChannelHandler handler) throws RemotingException {
        super(url,ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url,SERVER_THREAD_POOL_NAME)));
    }

可以看出其调用父类的构造函数,并传入url和handler的包裹类。handler的包裹类有哪些呢,进去看一看。

    public static ChannelHandler wrap(ChannelHandler handler,URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler,url);
    }
    
     protected ChannelHandler wrapInternal(ChannelHandler handler,URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler,url)));
    }

注意到有一个接口Dispatcher,其自适应插件类是AllDispatcher,AllDispatcher的dispatch方法返回AllChannelHandler实例(此实例会将所有请求做为任务放入线程池中处理),在此实例基础上又包裹了HeartbeatHandlerMultiMessageHandler
NettyServer会将MultiMessageHandler层层往上传到其父类AbstractPeer

我们来回忆一下正向流程

从ServiceConfig发布registryURL开始(见doExportUrlsFor1Protocol方法)
1.ServiceConfig生成服务实例的代理工厂类JavassistProxyFactory(ProxyFactory SPI默认代理工厂类)并包裹到DelegateProviderMetaDataInvoker(此类记录代理工厂类和服务信息ServiceBean(<dubbo:service />标签对应的类))
2.由于registryURL的protocol协义是registry,所以会加载RegistryProtocol(Protocol类的外面都包裹了ProtocolFilterWrapper和ProtocolListenerWrapper,下面不再特殊说明),并传入上一步的invoker。
3.RegistryProtocol又找到DubboProtocol,也会带上Invoker(此时的Invoker包含上一次的Invoker并带有服务地址(dubbo://IP:端口/服务接口全称?参数=xxx))。

所以requestHandler又会调用正向传过来的Invoker,经过ProtocolFilterWrapper和ProtocolListenerWrapper,最终调用到服务实现类相应的方法。

最后以一张图总结:

标识为SPI的类,是可以动态加载的。图片看不清楚的话,请查看原图

再简单说下接收到请求后的处理流程:NettyServer接收到请求后,交给NettyServerHandler处理,NettyServerHandler转交给NettyServer的父类AbstractPeer处理,AbstractPeer又交给MultiMessageHandler处理,这样就开始了handler链的处理,handler的终点是HeaderExchangerHandler,HeaderExchangerHandler调用DubboProtocol传过来的成员变量requestHandler调用相应的服务类方法,然后得到结果,调用NettyServerHandler传过来的NettyChannel发送结果到Client。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


在网络请求时,总会有各种异常情况出现,我们需要提前处理这种情况。在完善的rpc组件dubbo中,自然是不会少了这一层东西的。我们只需要通过一些简单的配置就可以达到超时限制的作用了。dubbo的设计理念是,客户端控制优先,服务端控制兜底。 1.超时机制的实现思路要想实
作者:宇曾背景软件技术的发展历史,从单体的应用,逐渐演进到分布式应用,特别是微服务理念的兴起,让大规模、高并发、低延迟的分布式应用成为可能。云原生时代下,微服务框架本身也在不断地进化和迭代演进。微服务框架一般会涉及到以下几个知识点:本文我们着重探讨以下三大微服务框架:
hello,大家好呀,我是小楼。最近一个技术群有同学at我,问我是否熟悉Dubbo,这我熟啊~他说遇到了一个Dubbo异步调用的问题,怀疑是个BUG,提到BUG我可就不困了,说不定可以水,哦不...写一篇文章。问题复现遇到问题,尤其不是自己遇到的,必须要复现出来才好排查,截一个当时的聊天记录:他的问题
 一个软件开发人员,工作到了一定的年限(一般是3、4年左右),如果他还没学会阅读源码,那么他就会遇到瓶颈。因为到了这个时候的开发,他应该不仅仅只会做那些CURD的业务逻辑,而应该会根据公司的实际情况去写框架。而基本上没有谁能像天才一样从零写出一个框架,很多人写框架其实
当一个服务调用另一个远程服务出现错误时的外观Dubbo提供了多种容错方案,默认值为failover(重试)1)、FailoverCluster(默认)失败自动切换,当出现失败,重试其他服务器,通常用于读操作,但重试会带来更长延迟,可以通过属性retries来设置重试次数(不含第一次)2)、FailfastC
最近在看阿里开源RPC框架Dubbo的源码,顺带梳理了一下其中用到的设计模式。下面将逐个列举其中的设计模式,并根据自己的理解分析这样设计的原因和优劣。责任链模式责任链模式在Dubbo中发挥的作用举足轻重,就像是Dubbo框架的骨架。Dubbo的调用链组织是用责任链模式串连起来的。责任链
在过去持续分享的几十期阿里Java面试题中,几乎每次都会问到Dubbo相关问题,比如:“如何从0到1设计一个Dubbo的RPC框架”,这个问题主要考察以下几个方面:你对RPC框架的底层原理掌握程度。考验你的整体RPC框架系统设计能力。具体,mike来为大家详解。RPC和RPC框架1.RPC(RemoteProcedure
Dubbo在启动时会检查服务提供者所提供的服务是否可用,默认为True。(1)、单个服务关闭启动时检查(check属性置为false)1)、基于xml文件配置方式1<!--3、声明需要调用的远程服务接口,生成远程服务代理,可以和本地Bean一样使用-->2<dubbo:referenceid="userService"i
(1)、新建一个普通Maven项目,用于存放一些公共服务接口及公共的Bean等。项目: 公共Bean:1packagecn.coreqi.entities;23importjava.io.Serializable;45publicclassUserimplementsSerializable{6privateIntegerid;7privateStringuserName;
1.安装java:yuminstalljava2.下载Tomcat:wgethttp://mirrors.shu.edu.cn/apacheomcatomcat-9/v9.0.14/bin/apache-tomcat-9.0.14-fulldocs.tar.gz3.解压Tomcat:tar-xvfapache-tomcat-9.0.14.tar.gz-C/usr/local/cd/usr/local/mvapache-tomcat-9.0.14//usr/local
工程结构:主pom<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.ap
微服务架构到底应该如何选择? 什么是微服务?微服务的概念最早是在2014年由MartinFowler和JamesLewis共同提出,他们定义了微服务是由单一应用程序构成的小服务,拥有自己的进程与轻量化处理,服务依业务功能设计,以全自动的方式部署,与其他服务使用HTTPAPI通讯。同时,服务会
(1)、dubbo-admin(管理控制台)1)、从https://github.com/apache/incubator-dubbo-ops下载解压2)、修改dubbo-admin配置文件中zookeeper的注册地址3)、使用Maven命令打包mvncleanpackage4)、使用java-jar dubbo-admin-0.0.1-SNAPSHOT.jar命令运行5)、访
Dubbo概述Dubbo的背景随着互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,亟需一个治理系统确保架构有条不紊的演进。  单一应用架构当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。
前言跳槽时时刻刻都在发生,但是我建议大家跳槽之前,先想清楚为什么要跳槽。切不可跟风,看到同事一个个都走了,自己也盲目的开始面试起来,期间也没有准备充分,到底是因为技术原因,影响自己的发展,偏移自己规划的轨迹,还是钱给少了,不受重视。准备不充分的面试,完全是浪费时间,更是对自己的不负责
Dubbo是阿里巴巴内部使用的分布式业务框架,2012年由阿里巴巴开源。由于Dubbo在阿里内部经过广泛的业务验证,在很短时间内,Dubbo就被许多互联网公司所采用,并产生了许多衍生版本,如网易,京东,新浪,当当等等。由于阿里策略变化,2014年10月Dubbo停止维护。随后部分互联网公司公开了自行维护的Du
1.java.lang.NoSuchMethodError:org.jboss.resteasy.specimpl.BuiltResponse.getHeaders()Ljavax/wss/core/MultivaluedMap;解决:参考https://stackoverflow.com/questions/17618587/jetty-9-0-embedded-and-resteasy-3-0-keeps-throwing-nosuchmethoderror将依赖提到最前
服务消费者引用服务提供者的服务时可能由于网络原因导致长时间未返回相应,此时大量的线程将会阻塞,引起性能下降等问题。可以通过引入服务超时来解决该问题 服务超时指服务在给定的时间内未返回相应将立即终止该请求,一般配合retries(重试次数)使用。单位毫秒,默认值1000 
服务超时后重试次数【retries】,不包含第一次调用,0代表不重试*我们应该在幂等方法上设置重试次数【查询、删除、修改】,在非幂等方法上禁止设置重试次数。★幂等:指多次运行方法所产生的最终效果是一致的1<!--3、声明需要调用的远程服务接口,生成远程服务代
一、Web应用架构的演变​随着互联网的发展,网站应用的规模不断扩大,Web应用架构也在不断的演变​四个阶段:单一应用、垂直应用、分布式服务、流动计算1.单一应用架构​当网站访问量很小时,只需要一个应用程序,将所有的功能都部署在一起,以减少部署节点和成本​此时关键