pigeon学习

pigeon介绍

pigeon是点评内部一个分布式服务治理框架,和阿里的Dubbo类似,提供高性能和透明得如同本地化调用的RPC调用服务。

pigeon提供除了服务调用的基本功能外,还有服务发现、服务注册注销、服务隔离、监控分析、服务限流、服务健康检测和负载均衡。

pigeon是一个强依赖spring的项目,通过spring,实现了初始化和bean的加载,以及RPC调用的透明和本地化。

下图是一个pigeon请求在框架中的大概流程:

通过上图我们可以看到,一个完整的piegon请求需要动态代理、序列化、服务注册中心和网络请求的支持。

code地址:http://code.dianpingoa.com/arch/pigeon2
github开源地址:https://github.com/dianping/pigeon
用户使用说明参看项目中的USER_GUIDE.md
具体使用可参考其中的pigeon-demo项目,里面有各个功能如何使用的代码

下文的介绍全基于你已经完整仔细且看明白了USER_GUIDE,会介绍以下各个方面:

  • pigeon架构设计
  • Pigeon模块结构及其设计
  • Pigeon功能模块如何实现
  • Pigeon的user-guide里面提到的配置和使用方式的作用和实现方式
  • Pigeon客户端和服务端的初始化过程
  • Pigeon服务调用和服务响应的过程
  • Pigeon后台线程的功能和实现

pigeon中用到的技术

ClassLoader

java应用环境中不同的class分别由不同的ClassLoader负责加载。
一个jvm中默认的classloader有Bootstrap ClassLoader、Extension ClassLoader、App ClassLoader,分别各司其职:

  • Bootstrap ClassLoader:负责加载java基础类,主要是 %JRE_HOME/lib/ 目录下的rt.jar、resources.jar、charsets.jar和class等
  • Extension ClassLoader:负责加载java扩展类,主要是 %JRE_HOME/lib/ext 目录下的jar和class
  • App ClassLoader:负责加载当前java应用的classpath中的所有类。

采用了双亲委派模型,还有个重要的概念,线程的ContextClassLoader。

具体资料参考以下:

Class.forName()

《Class.forName()作用与总结》

jdk动态代理

代理模式上,基本上有Subject角色,RealSubject角色,Proxy角色。其中:Subject角色负责定义RealSubject和Proxy角色应该实现的接口;RealSubject角色用来真正完成业务服务功能;Proxy角色负责将自身的Request请求,调用realsubject 对应的request功能来实现业务功能,自己不真正做业务。

静态代理类图

上面的这幅代理结构图是典型的静态的代理模式:

当在代码阶段规定这种代理关系,Proxy类通过编译器编译成class文件,当系统运行时,此class已经存在了。这种静态的代理模式固然在访问无法访问的资源,增强现有的接口业务功能方面有很大的优点,但是大量使用这种静态代理,会使我们系统内的类的规模增大,并且不易维护;并且由于Proxy和RealSubject的功能 本质上是相同的,Proxy只是起到了中介的作用,这种代理在系统中的存在,导致系统结构比较臃肿和松散。

为了解决这个问题,就有了动态地创建Proxy的想法:在运行状态中,需要代理的地方,根据Subject 和RealSubject,动态地创建一个Proxy,用完之后,就会销毁,这样就可以避免了Proxy 角色的class在系统中冗杂的问题了。

如果仍采用上面静态代理的类图,势必会在代理这样公用可抽象的代码里冗余进大量的业务逻辑,因此引入InvocationHandler这个角色。

InvocationHandler

仔细思考代理模式中的代理Proxy角色。Proxy角色在执行代理业务的时候,无非是在调用真正业务之前或者之后做一些“额外”业务。

有上图可以看出,代理类处理的逻辑很简单:在调用某个方法前及方法后做一些额外的业务。换一种思路就是:在触发(invoke)真实角色的方法之前或者之后做一些额外的业务。那么,为了构造出具有通用性和简单性的代理类,可以将所有的触发真实角色动作交给一个触发的管理器,让这个管理器统一地管理触发。这种管理器就是Invocation Handler。

作用:

  • 在静态代理中,代理Proxy中的方法,都指定了调用了特定的realSubject中的对应的方法。在上面的静态代理模式下,Proxy所做的事情,无非是调用在不同的request时,调用触发realSubject对应的方法;更抽象点看,可以把该对应的方法看成对象Method交付给Invocation Handler在一个合适的情况下触发(invoke)
  • 动态代理工作的基本模式就是将自己的方法功能的实现交给 InvocationHandler角色,外界对Proxy角色中的每一个方法的调用,Proxy角色都会交给InvocationHandler来处理,而InvocationHandler则调用具体对象角色的方法。如下图所示:

在这种模式之中:代理Proxy 和RealSubject应该实现相同的功能,这一点相当重要。(我这里说的功能,可以理解为某个类的public方法)

在面向对象的编程之中,如果我们想要约定Proxy 和RealSubject可以实现相同的功能,有两种方式:

  1. 一个比较直观的方式,就是定义一个功能接口,然后让Proxy 和RealSubject来实现这个接口
  2. 还有比较隐晦的方式,就是通过继承。因为如果Proxy 继承自RealSubject,这样Proxy则拥有了RealSubject的功能,Proxy还可以通过重写RealSubject中的方法,来实现多态。

其中JDK中提供的创建动态代理的机制,是以a 这种思路设计的,而cglib 则是以b思路设计的。

jdk动态代理创建机制

比如现在想为RealSubject这个类创建一个动态代理对象,JDK主要会做以下工作:

  1. 获取 RealSubject上的所有接口列表;
  2. 确定要生成的代理类的类名,默认为:com.sun.proxy.$ProxyXXXX;
  3. 根据需要实现的接口信息,在代码中动态创建 该Proxy类的字节码;
  4. 将对应的字节码转换为对应的class 对象;
  5. 创建InvocationHandler 实例handler,用来处理Proxy所有方法调用;
  6. Proxy的class对象 以创建的handler对象为参数,实例化一个proxy对象

下面这篇文章写得非常好,深入浅出讲解了动态代理,强烈推荐:
《java动态代理》

JDK通过 java.lang.reflect.Proxy包来支持动态代理,一般情况下,我们使用下面的newProxyInstance方法

1
2
//返回一个指定接口的代理类实例,该接口可以将方法调用指派到指定的调用处理程序。
static Object newProxyInstance(ClassLoader loader,Class<?>[] interfaces,InvocationHandler h)

SPI机制

SPI的思想

SPI的全名为Service Provider Interface普通开发人员可能不熟悉,因为这个是针对服务提供厂商与扩展框架功能的开发者的。在java.util.ServiceLoader的文档里有比较详细的介绍。究其思想,其实是和”Callback”差不多。“Callback”的思想是在我们调用API的时候,我们可以自己写一段逻辑代码,传入到API里面,API内部在合适的时候会调用它,从而实现某种程度的“定制”。

我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。

为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制。java spi就是提供这样的一个机制:为某个接口寻找服务实现的机制。有点类似IOC的思想,就是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要。

JAVA SPI的约定

当服务的提供者,提供了服务接口的一种实现之后,在jar包的META-INF/services/目录里同时创建一个以服务接口命名的文件。该文件里就是实现该服务接口的具体实现类。而当外部程序装配这个模块的时候,就能通过该jar包META-INF/services/里的配置文件找到具体的实现类名,并装载实例化,完成模块的注入。服务厂商实现的jar包你需要放到classpath下。

基于这样一个约定就能很好的找到服务接口的实现类,而不需要再代码里制定。

jdk提供服务实现查找的一个工具类:java.util.ServiceLoader

SPI扩展阅读:

ServiceLoader

以pigeon为例,在pigeon-remoting项目中,文件名以服务接口类定义,文件路径如下:
META-INF/services/com.dianping.pigeon.provider.Server

文件内容要求很简单,每一行放一个这个接口的一个实现类的类型名称(包含包名和类名),比如:

1
2
com.dianping.pigeon.remoting.netty.provider.NettyServer
com.dianping.pigeon.remoting.http.provider.JettyHttpServer

准备好上述后,我们就可以调用ServiceLoader获取这些服务实例:

1
ServiceLoader<Server> servers = ServiceLoader.get(Server.class);

这时候ServiceLoader会帮我们去扫描classpath下所有的META-INF/services/目录,如果包含com.dianping.pigeon.provider.Server文件,他初始化配置文件中的服务提供方实例,每一条记录都会对应一个服务实例。

那么问题来了,如果不同的jar包下有相同的文件,ServiceLoader以哪个为准呢?
ServiceLoader是通过当前线程的classloader来加载资源文件的,返回的是一个set,但最终以最后一个为准

zookeeper

数据模型

zookeeper使用了一个类似文件系统的树结构,数据可以挂在某个节点上,可以对这个节点进行删改,当改动一个节点时,集群中活着的机器都会更新到一致的数据。

  • 每个节点在zookeeper中叫做znode,并且其有一个唯一的路径标识,如/SERVER2节点的标识就为/APP3/SERVER2;
  • znode可以有子znode,并且znode里可以存数据,但是EPHEMERAL类型的节点不能有子节点;
  • znode中的数据可以有多个版本,比如某一个路径下存有多个数据版本,那么查询这个路径下的数据就需要带上版本;
  • znode可以是临时节点,一旦创建这个znode的客户端与服务器失去联系,这个znode也将自动删除,Zookeeper的客户端和服务器通信采用长连接方式,每个客户端和 服务器通过心跳来保持连接,这个连接状态称为session,如果znode是临时节点,这个session失效,znode也就删除了。
  • znode的目录名可以自动编号,如App1已经存在,再创建的话,将会自动命名为App2;
  • znode可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等。

使用场景

配置管理

集中式的配置管理在应用集群中是非常常见的,一般商业公司内部都会实现一套集中的配置管理中心,应对不同的应用集群对于共享各自配置的需求,并且在配置变更时能够通知到集群中的每一个机器。

zookeeper很容易实现这种集中式的配置管理,比如将APP1的所有配置配置到/APP1 znode下,APP1所有机器一启动就对/APP1这个节点进行监控(zk.exist(“/APP1”,true)),并且实现回调方法Watcher,那么在zookeeper上/APP1 znode节点下数据发生变化的时候,每个机器都会收到通知,Watcher方法将会被执行,那么应用再取下数据即可(zk.getData(“/APP1”,false,null));

我们的lion,点评配置管理系统就是基于zookeeper来实现

集群管理

应用集群中,我们常常需要让每一个机器知道集群中(或依赖的其他某一个集群)哪些机器是活着的,并且在集群机器因为宕机,网络断链等原因能够不在人工介入的情况下迅速通知到每一个机器。

Zookeeper同样很容易实现这个功能,比如我在zookeeper服务器端有一个znode叫/APP1SERVERS,那么集群中每一个机器启动的时候都去这个节点下创建一个EPHEMERAL类型的节点,比如server1创建/APP1SERVERS/SERVER1(可以使用ip,保证不重复),server2创建/APP1SERVERS/SERVER2,然后SERVER1和SERVER2都watch /APP1SERVERS这个父节点,那么也就是这个父节点下数据或者子节点变化都会通知对该节点进行watch的客户端。因为EPHEMERAL类型节点有一个很重要的特性,就是客户端和服务器端连接断掉或者session过期就会使节点消失,那么在某一个机器挂掉或者断链的时候,其对应的节点就会消失,然后集群中所有对/APP1SERVERS进行watch的客户端都会收到通知,然后取得最新列表即可。

自定义Spring Xml Bean配置

PropertyPlaceholderConfigurer

基本使用

Spring中PropertyPlaceholderConfigurer这个类,它是用来解析Java .properties属性文件值,spring bean配置时的属性可以指定文件中的变量来赋值。我们可以通过它来做到在不同环境下配不同的值,动态替换值。

基本使用方法:

1
2
3
4
5
6
<bean id="propertyConfigurerForAnalysis" 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">

<property name="location">
<value>classpath:/spring/include/dbQuery.properties</value>
</property>
</bean>

或者多个Property文件的使用方法:

1
2
3
4
5
6
7
8
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<!--除了上面使用classpath外,还可以使用下述路径-->
<value>/WEB-INF/config/jdbc/jdbc.properties</value>
</list>
</property>
</bean>

如果想通过多个PropertyPlaceholderConfigurer来整合分散的.properties文件的话,可以申明多个bean,然后通过bean的order属性决定加载的顺序。如果没有设置就按照加载xml文件时的顺序。

原理

PropertyPlaceholderConfigurer的上层有BeanFactoryPostProcessor接口和PlaceholderConfigurerSupport、PropertyResourceConfigurer这两个类。相关定义如下:

1
2
3
4
5
public class PropertyPlaceholderConfigurer extends PlaceholderConfigurerSupport
public abstract class PlaceholderConfigurerSupport extends PropertyResourceConfigurer
implements BeanNameAware, BeanFactoryAware

public abstract class PropertyResourceConfigurer extends PropertiesLoaderSupport
implements BeanFactoryPostProcessor, PriorityOrdered

spring提供了的一种叫做BeanFactoryPostProcessor的容器扩展机制。它允许我们在容器实例化对象之前,对容器中的BeanDefinition中的信息做一定的修改(比如对某些字段的值进行修改,这就是占位符替换的根本)。于是就需要说下BeanFactoryPostProcessor接口了,以下BeanFactoryPostProcessor的定义:

1
2
3
public interface BeanFactoryPostProcessor {
void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException;
}

在web项目的spring上下文初始化中,spring在实例化bean之前,会先实例化出实现了BeanFactoryPostProcessor接口的bean,并调用postProcessBeanFactory方法,对BeanFactory中的BeanDefinition进行处理。

先看看PropertyResourceConfigurer的postProcessBeanFactory()方法

1
2
3
4
5
6
7
8
9
10
11
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
try {
Properties mergedProps = mergeProperties();
// Convert the merged properties, if necessary.
convertProperties(mergedProps);
// Let the subclass process the properties.
processProperties(beanFactory, mergedProps);
} catch (IOException ex) {
throw new BeanInitializationException("Could not load properties", ex);
}
}

这个方法整合好Properties,然后以BeanFactory和Properties作为参数调用PropertyPlaceholderConfigurer的processProperties方法。

接着看PropertyPlaceholderConfigurer的processProperties()方法

1
2
3
4
5
protected void processProperties(ConfigurableListableBeanFactory beanFactoryToProcess, Properties props)
throws BeansException {

StringValueResolver valueResolver = new PlaceholderResolvingStringValueResolver(props);
this.doProcessProperties(beanFactoryToProcess, valueResolver);
}

这个类就实例化了一个StringValueResolver对象,然后用BeanFactory和StringValueResolver对象调用PlaceholderConfigurerSupport#doProcessProperties()

再看PlaceholderConfigurerSupport的doProcessProperties()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected void doProcessProperties(ConfigurableListableBeanFactory beanFactoryToProcess,
StringValueResolver valueResolver) {
BeanDefinitionVisitor visitor = new BeanDefinitionVisitor(valueResolver);
String[] beanNames = beanFactoryToProcess.getBeanDefinitionNames();
for (String curName : beanNames) {
// Check that we're not parsing our own bean definition,
// to avoid failing on unresolvable placeholders in properties file locations.
if (!(curName.equals(this.beanName) && beanFactoryToProcess.equals(this .beanFactory ))) {
BeanDefinition bd = beanFactoryToProcess.getBeanDefinition(curName);
try {
visitor.visitBeanDefinition(bd);
} catch (Exception ex) {
throw new BeanDefinitionStoreException(bd.getResourceDescription(), curName, ex.getMessage());
}
}
}

// New in Spring 2.5: resolve placeholders in alias target names and aliases as well.
beanFactoryToProcess.resolveAliases(valueResolver);

// New in Spring 3.0: resolve placeholders in embedded values such as annotation attributes.
beanFactoryToProcess.addEmbeddedValueResolver(valueResolver);
}

这个方法就会取出BeanFactory中的BeanDefinition,然后循环处理除了本身以外的bean的占位符替换,我们lion的替换和这个代码的处理就差不多。

通过上述流程,我们可以发现定制自己占位符的实现,最重要的其实是BeanFactoryPostProcessor接口,它是一个入口,让我们能拿到用了占位符的属性及其所在的类。Properties主要是占位符的key和value。

实现思路

定制占位符实现有以下几个思路:

  1. 实现PropertyPlaceholderConfigurer类的mergeProperties();
  2. 实现PropertyPlaceholderConfigurer类的loadProperties();
  3. 实现PropertyPlaceholderConfigurer类的resolvePlaceholder();

1和2其实本质都一样,都是通过改动Properties来实现,推荐第一种方法,美团waimai-api加载rabbitmq不同环境的配置用的方法2,个人觉得还是1好些。

点评的${}读的是lion的值,由lion-client这个jar包实现了spring初始化bean占位符的替换,lion的值修改时动态修改相关bean的属性(重要功能)。

点评占位符替换用的方法三,resolvePlaceholder()的含义其实是根据占位符内容来获取其属性,那么我们的思路可以是:抛弃Properties,自己实现一个map,存储了lion上所有的变量,从我们维护的map中来获取响应的值。

那么动态修改又是如何实现的呢?上面说了,BeanFactoryPostProcessor可以让我们获取到spring管理的bean,将占位符的key和bean的相关信息存到map里,在zookeeper因为Znode更改通知到时,从map里获取到该key,通过spring获取到该bean对象,再通过反射将值set进去。

参考:点评lion设计——com.dianping.lion.client.LionPlaceholderConfigurer

自定义spring标签

官方给出的实现
Creating new XML configuration extensions can be done by following these (relatively) simple steps:

  1. Authoring an XML schema to describe your custom element(s).
  2. Coding a custom NamespaceHandler implementation (this is an easy step, don’t worry).
  3. Coding one or more BeanDefinitionParser implementations (this is where the real work is done).
  4. Registering the above artifacts with Spring (this too is an easy step).

具体参考:

pigeon架构

pigeon内部模块结构

如下图所示,pigeon各模块之间依赖抽象,而不依赖具体实现。如系统配置,依赖的是抽象的pigeon-config模块,而不是具体的pigeon-lion模块。解耦是通过java的ServiceLoader来实现的
pigeon客户端内部模块结构

zookeeper的协议格式

每台机器的/data/webapps/config/appenv里会写上所处环境和zk地址,例如:

1
2
deployenv=qa
zkserver=10.66.13.144:2181,10.66.11.251:2181,10.66.13.167:2181,10.66.32.77:2181,10.66.33.203:2181

  • 服务地址配置(ps:下文的+为字符串连接符,并非代表):
    • /DP/SERVER/ + enscape(serviceName),举例:/DP/SERVER/http:^^service.dianping.com^takeawayApiService^takeawayElemeActivityService_1.0.0,为了不让serviceName里的”/“让zookeeper误认为是节点,因此将”/“替换为”^”。
    • 上述znode的值是服务ip加端口号,如192.168.8.120:4088
  • 服务权重配置
    • /DP/WEIGHT/ip:port
    • 值为1代表权重,如果为0代表这台机器暂时不提供服务,目前只有1和0两种值
  • 服务所属应用配置
    • /DP/APP/ip:port
    • 值为这个服务所属的应用名,这个应用名是读取本地classpath下META-INF/app.properties里的app.name值

pigeon客户端初始化

服务是通过spring的xml来配置,使用ProxyBeanFactory来定制实现一个类。

ProxyBeanFactory:实现了Spring的FactoryBean接口,用于在客户端建造Service类。通过getObject()返回动态代理后的类,初始化方法init()。

init()方法:

  1. 用ClassLoader装载接口类
  2. 构造该Service对应的InvokerConfig
  3. ServiceFactory调用ServiceProxy.getService(InvokerConfig):
    • 触发ServiceFactory的静态初始化
    • 调用ProviderBootStrap的init()
  4. ServiceProxy.getService(InvokerConfig)的内部流程:
    a. 调用InvokerBootStrap的初始化:初始化客户端的相关资源
    b. 通过DefaultSerializer(Serializer的默认实现)的proxyRequest(),构造一个动态代理请求的对象(ps:java的动态代理)。其业务逻辑是由ServiceInvocationProxy来实现
    c. LoadBalanceManager注册服务对应的LoadBalance策略类
    d. ClientManager初始化
    - 启动线程池执行HeartBeatListener任务;
    - 启动线程池执行ReConnectListener任务;
    - ClusterListenerManager添加DefaultClusterListener,HeartBeatListener,ReConnectListener;
    - RegistryEventListener(或者称之为RegistryEventListenerManager表意更合适些)添加ClientManager.InnerServiceProviderChangeListener,ClientManager.InnerRegistryConnectionListener
    
    e. ClientManager注册该service对应的所有client
    - 先通过注册中心RegistryManager找到对应服务地址,即ip:port,ip:port
    - 再根据地址,找到对应的权重
    - 将上述已知信息存储到RegistryManager的map中
    - 根据上述connect(即ip:port)信息来决定构造netty还是http client,并将该client存储到DefaultClusterListener的map中,HeartBeatListener,ReConnectListener均持有该map的引用
    
    f. 将InvokerConfig对应的动态代理的service存储到map中,InvokerConfig本身的hashcode是由其每个字段的值来确定的

InvokerBootStrap

stratup():

  1. 用线程池执行服务调用超时检测的任务
  2. InvokerProcessHandlerFactory的初始化:
    • 采用了责任链设计模式
    • 依次初始化了ClusterInvokeFilter,GatewayInvokeFilter,RemoteCallMonitorInvokeFilter,ContextPrepareInvokeFilter,RemoteCallInvokeFilter
    • 构造了一个匿名的ServiceInvocationHandler,它本身包装了一个Filter,并拥有下一个filter的引用(下一个即filter的初始化顺序)
    • 对外最终返回的产品是包装了ClusterInvokeFilter的匿名Handler
  3. 初始化SerializerFactory:
    • 提供产品:Serializer
  4. 使用ExtensionLoader加载Monitor对应的实现CatMonitor,并初始化

pigeon客户端设计实现

存储服务信息,即服务对应的机器、服务对应的应用信息,client的集合等等,那么pigeon内部如何管理这些信息,在发生变化时又如何通知呢(观察者模式,事件触发更新由zookeeper的watch来触发)?类图如下:
pigeon客户端类图

InvokerConfig

存储了服务接口类,服务名称,超时设置,调用方式,序列化方式等,相当于服务调用的配置

ClientManager

ClientManager是pigeon里面最重要的一个类,注册Client、获取Client。在注册和获取中有各种各样的功能分化到各个类来实现,比如RegistryManager,ClusterListenerManager等等。ClientManager是将这些功能聚合起来,并对外暴露这些接口。

RegistryManager

Registry主要用于服务的注册和发现。RegistryManager类似一个策略管理类,Registry即是策略,不同的策略实现由ExtensionLoader来加载。RegistryManager包装了Registry的服务,并决定使用了哪个策略来实现服务并对外暴露。

ExtensionLoader

根据pigeon的理念,我定义一个抽象的接口,具体的实现就像插件一样可替换。所以在pigeon中,具体的配置模块、监控模块和注册模块等,都是如同可插拔的插件。ExtensionLoader就是来加载这些插件用。使用map来存储接口类对应的实现,若map中没有,则通过ServiceLoader来加载。

插件分为两种,一种可替换的,非此即彼;还有一种,是可以同时存在的,类似过滤器这种概念。

因此ExtensionLoader中的map有两种,一种是存的接口对应的实现(哪个实现在前面哪个被加载),一种存的是接口对应的实现list。

SerializerFactory

  • 作用:根据序列化方式,返回对应的序列化类
  • 工厂类
  • 产品类接口Serializer:
    • 正反序列化客户端请求
    • 正反序列化服务器端返回值
    • 动态代理客户端请求

ClusterListenerManager

管理了三种listener,DefaultClusterListener,HeartBeatListener,ReConnectListener。

对外暴露主要以下几个方法,addConnect(),removeConnect(),均为遍历listener实现

DefaultClusterListener的addConnect():

  • 调用ClientSelector根据connect来决定到底是netty还是http client
  • 将client存储到本地map中

ClientSelector

抽象工厂模式:抽象工厂ClientFactory,两个实现类:HttpInvokerClientFactory,NettyClientFactory。产出的产品接口Client,分别两种产品HttpClient,NettyClient。

通过connect信息来决定到底使用哪个具体工厂

Pigeon客户端请求过程

ServiceInvocationProxy是实现了InvocationHandler(jdk) 的类,但其实际业务是由ServiceInvocationHandler——InvokerProcessHandlerFactory的产品来实现的。

InvokerProcessHandlerFactory

  • 产品是ServiceInvocationHandler
  • ServiceInvocationHandler包装了ServiceInvocationFilter
  • ServiceInvocationHandler采用了责任链的设计模式,指明了下一个处理的handler是哪个
  • 责任链的顺序分别是ClusterInvokeFilter,GatewayInvokeFilter,RemoteCallMonitorInvokeFilter,ContextPrepareInvokeFilter,RemoteCallInvokeFilter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface ServiceInvocationHandler {

/**
*
*
* @param invocationContext
* @return
* @throws Throwable
*/
InvocationResponse handle(InvocationContext invocationContext) throws Throwable;

}

public interface ServiceInvocationFilter<I extends InvocationContext> {

/**
*
*
* @param handler
* @param invocationContext
* @return
* @throws Throwable
*/
InvocationResponse invoke(ServiceInvocationHandler handler, I invocationContext) throws Throwable;

}

ClusterInvokeFilter

pigeon客户端集群策略模式有4种,分别是:快速失败failfast/失败转移failover/失败忽略failsafe/并发取最快返回forking

  • failfast:调用服务的一个节点失败后抛出异常返回,可以同时配置重试timeoutRetry和retries属性
  • failover:调用服务的一个节点失败后会尝试调用另外的一个节点,可以同时配置重试timeoutRetry和retries属性
  • failsafe:调用服务的一个节点失败后不会抛出异常,返回null,后续版本会考虑按配置默认值返回
  • forking:同时调用服务的所有可用节点,返回调用最快的节点结果数据

根据服务的配置来使用某个cluster,一般是failfast。连接服务一般有两种,http请求走tomcat容器或者netty走tcp连接。获取到该服务下client的list,通过RouterManager(内部主要LoadBalanceManager来获取当前合适的client,如WeightedLoadBalance是将调用量按从小到大排序,返回调用量最小的)获取到当前合适的client,并将该client设到context中,以便在handler中传递。调用下一个handler,若抛出异常,则重试。

GatewayInvokeFilter

调用InvokerStatisticsHolder的flowIn(),设置服务的app(即服务端项目的名称,配置在resources/META-INF/app.properties,服务端在注册服务到注册中心时,会将项目名称注册到zookeeper)对应的调用量,包括按天、按分、按秒的一些调用量统计。然后将请求传给下一个handler

上述调用量可以通过ip:4080/stats.json来查看。

RemoteCallMonitorInvokeFilter

pigeon的监控打点,打出了请求的信息和配置,对当前秒、请求的大小(请求大小做了散列)打点

对cat的Transaction提取了接口,cat的Transaction只是其中一实现。保证了pigeon-remotting该包没有依赖cat的具体实现

ContextPrepareInvokeFilter

将InvocationContext中的参数抽取出来设置到InvocationRequest中去,将用户自定义的参数和pigeon内部自定义的参数传到InvocationRequest的RequestValues、GlobalValues中去。

用户如何自定义参数:
1、简单的客户端A->服务端B的一级调用链路的参数传递
客户端:
String url = “http://service.dianping.com/com.dianping.pigeon.demo.EchoService“;
EchoService service = ServiceFactory.getService(url, EchoService.class);

ContextUtils.putRequestContext(“key1”, “1”);
System.out.println(“service result:” + service.echo(input));
服务端:
public String echo(String input) {
System.out.println(ContextUtils.getLocalContext(“key1”));
return “echo:” + input;
}
2、服务端B->客户端A的参数传回
服务端:
ContextUtils.putResponseContext(“key1”, “1”);
客户端:
ContextUtils.getResponseContext(“key1”);
3、全链路传递
如果需要在全链路传递对象,如A->B->C->D,需要使用以下接口:
在A发送请求端:ContextUtils.putGlobalContext(“key1”, “1”);
在D接收请求端:ContextUtils.getGlobalContext(“key1”);

RemoteCallInvokeFilter

实际网络调用发生的地方,在调用前后都有调beforeInvoke()和afterInvoke(),用户自定义的拦截器就是在这两个方法中被调用的

从代码中可以看到,一共有三种timeout,分别是:服务的timeout、方法的timeout和ThreadLocal的timeout,其优先级也是依次递增。不过方法的timeout配置并没有开放出来,我们有很多服务,其不同的方法超时是不一样的,客户端配服务的timeout时被最短的模板决定了。

从服务设计的角度来讲,不同重要性、服务响应时间的方法不应该放在一个服务中,但是pigeon给我们提供了对方法级别线程池的配置,以及自动的服务隔离措施,从这点上讲,pigeon保证了我们在代码设计时无需考虑这些额外因素,因为它把这些因素都引到了xml配置上。

通过client调用请求,会先调用AbstractClient的实现:

  • 将对应服务地址(ip:port)的调用量记下来,其每次加的调用量为1.0f/weight,这个调用量会影响到loadBalance
  • 调NettyClient的doWrite()实现:

NettyClient

使用的netty3,设置的pipeline:InvokerDecoder、NettyClientHandler、InvokerEncoder

InvokerEncoder:

  • 消息头固定7个字节:
    • 第1-3个字节固定为:57,58,2,其中57和58写死的,但2是指序列化的方式,hessian为2
    • 第4-7个字节:消息体长度,int,占4个字节,值为消息体长度(hessian序列化字节长度)
  • 消息体,从第8个字节开始:hessian序列化字节,对象类型为DefaultRequest和DefaultResponse

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public class DefaultRequest implements InvocationRequest {

    private byte serialize;//必填,序列化类型,hessian为2

    private long seq;//必填,消息sequence,long型,值请从0开始递增,每个消息的sequence都不同

    private int callType = 1;//必填,如果调用需要返回结果,固定为1

    private int timeout = 0;//必填,超时时间,单位毫秒

    private String serviceName;//必填,服务名称url,服务唯一的标识

    private String methodName;//必填,服务方法名称

    private Object[] parameters;//必填,服务方法的参数值

    private int messageType = 2;//必填,消息类型,服务调用固定为2,心跳为1

    private Object context;//老的avatar-tracker上下文传递内容,如果要传递对象,需设置

    private String app = "";//必填,调用者所属应用名称,在META-INF/app.properties里的app.name值

    private Map<String, Serializable> globalValues = null;//用于新的全局上下文传递,可不填

    private Map<String, Serializable> requestValues = null;//用于新的上下文传递,可不填

    }

    public class DefaultResponse implements InvocationResponse {

    private long seq;//返回的消息sequence,对应发送的消息sequence,long型

    private int messageType;//消息类型,服务调用为2,服务调用业务异常为4,服务框架异常为3,心跳为1

    private Object returnVal;//返回服务调用结果,如果是异常,返回类型为

    private Object context;//老的avatar-tracker上下文传递内容

    private Map<String, Serializable> responseValues = null;//用于新的上下文传递返回的结果

    }
  • 消息尾:固定11个字节

    • 前8个字节为消息sequence,long型,值请从0开始递增,每个消息的sequence都不同;
    • 后3个字节固定为:29,30,31

doWrite():调用Channel的write(),若抛出异常,则返回默认返回值

NettyClientHandler:获取服务端的返回值,交付NettyClient处理

NettyClient处理Response,会将Response里的sequence来和之前的请求对应上
处理Response用了有界线程池,corePoolSize 10,maxPoolSize 100,队列大小 800

下述代码是同步调用的,调用请求时将CallbackFuture和调用请求关联起来,请求发出去后,future处于blocking状态,直到被notify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
if (Constants.CALL_SYNC.equalsIgnoreCase(callType)) {
CallbackFuture future = new CallbackFuture();
response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);
if (response == null) {
response = future.get(timeout);
}
}

public class CallbackFuture implements Callback, CallFuture {
private boolean done = false;

public void run() {
synchronized (this) {
this.done = true;
if (this.response.getMessageType() == Constants.MESSAGE_TYPE_SERVICE) {
this.success = true;
}
this.notifyAll();
}
}

public InvocationResponse get(long timeoutMillis) throws InterruptedException {
synchronized (this) {
//判断有没有超时,若没有超时,则wait
long start = request.getCreateMillisTime();
while (!this.done) {
long timeoutMillis_ = timeoutMillis - (System.currentTimeMillis() - start);
if (timeoutMillis_ <= 0) {
StringBuilder sb = new StringBuilder();
sb.append("request timeout, current time:").append(System.currentTimeMillis())
.append("\r\nrequest:").append(request).append("\r\nhost:").append(client.getHost())
.append(":").append(client.getPort());
ServiceStatisticsHolder.flowOut(request, client.getAddress());
NetTimeoutException e = new NetTimeoutException(sb.toString());
throw e;
} else {
this.wait(timeoutMillis_);
}
}
processContext();//处理从服务器端传过来的自定义参数

if (response.getMessageType() == Constants.MESSAGE_TYPE_EXCEPTION) {
RpcException cause = InvokerUtils.toRpcException(response);
StringBuilder sb = new StringBuilder();
sb.append("remote call exception\r\nrequest:").append(request).append("\r\nhost:")
.append(client.getHost()).append(":").append(client.getPort()).append("\r\nresponse:")
.append(response);
logger.error(sb.toString(), cause);
monitor.logError(sb.toString(), cause);
} else if (response.getMessageType() == Constants.MESSAGE_TYPE_SERVICE_EXCEPTION) {
if (Constants.INVOKER_LOG_APP_EXCEPTION) {
Throwable cause = InvokerUtils.toApplicationException(response);
StringBuilder sb = new StringBuilder();
sb.append("remote service exception\r\nrequest:").append(request).append("\r\nhost:")
.append(client.getHost()).append(":").append(client.getPort()).append("\r\nresponse:")
.append(response);
logger.error(sb.toString(), cause);
monitor.logError(sb.toString(), cause);
}
}
return this.response;
}
}
}

InvokerHelper

这是一个很重要的类,主要是用ThreadLocal存储本次请求的timeout,服务地址以及callback(回调)。优先级最高,即高于spring中的配置。可以用来hack做一些事情,不推荐线上使用,主要用于测试

Pigeon后台线程

心跳

目的是保持客户端与可用的服务端之间的连接,客户端发起,服务端响应,1.5s超时,每3s发送一次,连续5次不成功则考虑摘除该服务端,是否摘除是判断服务端有20%以上的节点可用就可以摘除该不用的节点,摘除后会放入重连线程

重连

目的是检测不可用的服务端,如果连续5次检测成功则恢复该服务端的调用

统计

Pigeon服务端

pigeon自定义服务标签

定义了schema验证文件,在pigeon-remotting项目的classpath下定义了pigeon-service-2.0.xsd

实现了CommonNamespaceHandler,其实现了NamespaceHandlerSupport(spring对NamespaceHandler的实现,通过它,我们只需要注册BeanDefinitionParser对象即可)

定义了pigeon:server、pieon-service标签,由ServerBeanDefinitionParser和ServiceBeanDefinitionParser来负责解析对应的xml标签的。这两个类均实现了BeanDefinitionParser接口

为了让spring加载xml时能够去加载我们的NamespaceHandler类和xsd schema文件,还需要两个配置文件

  • pigeon-config中配置的 META-INF/spring.handlers
    http\://code.dianping.com/schema/pigeon=com.dianping.pigeon.config.spring.CommonNamespaceHandler
  • pigeon-remotting中配置的 META-INF/spring.schemas
    http\://code.dianping.com/schema/pigeon/pigeon-service-2.0.xsd=com/dianping/pigeon/remoting/config/spring/pigeon-service-2.0.xsd

偶然发现还有pigeon-extension-2.0.xsd,猜测可能是开放出标签让用户选择使用哪个插件,比如配置选择是用zookeeper还是本地文件配置还是自定义的配置类。目前对于多个插件,如果是只能用一个的话,默认选配在第一个的标签。然并卵,相应的parser类还没有开发出来。

最终我们在使用自定义标签时,需要在beans里面加入我们上面配置的namespace,如下:

1
2
3
4
5
6
7
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:pigeon="http://code.dianping.com/schema/pigeon"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
http://code.dianping.com/schema/pigeon http://code.dianping.com/schema/pigeon/pigeon-service-2.0.xsd">

xmlns 默认命名空间
xmlns:xsi 声明XML Schema实例的名称空间,,并将xsi前缀与该名称空间绑定,这样模式处理器就可以识别xsi:schemaLocation属性。XML Schema实例名称空间的前缀通常使用xsi。
使用xsi:schemaLocation属性指定名称空间和对应的schema文件
xmlns:pigeon是将pigeon作为http://code.dianping.com/schema/pigeon的别名

pigeon的parser类在碰到value的前缀是${、后缀是}时,会把它作为lion值去lion读取,然而并没有动态更新,也就是说一但配置,lion值改了是没法更新配置的

Pigeon服务端初始化

pigeon初始化服务有三种方式:

  1. xml配置,通过ServiceRegistry注册服务,服务的属性都是默认的
  2. xml配置,通过来注册服务,并可以配置服务的属性
  3. 通过注解来注入service和引用service

服务端的配置分为两方面:服务和服务器的
服务的包括:url,version,useSharedPool
服务器的包括:corePoolSize,port,maxPoolSize,workQueueSize

当我们用方法1配置时,只能简单的将服务key和服务对象关联起来,服务和服务器的配置都是默认的:port:4080,corePoolSize:60,maxPoolSize:500,workQueueSize:1000,useSharedPool:true。使用方法2和3配置,均可以配置服务的属性,若不配置,则默认值同上。

方法2是通过自定义spring服务标签来实现的,上面一个章节已经阐述过实现。主要是通过RootBeanDifinition(spring对bean的包装)、ManagedList(spring对集合的包装)来实现的,通过BeanDefinitionRegistry来注入bean。pigeon:server标签解析出来的类是ServerBean,pigeon:service标签解析出来的类是SingleServiceBean,注入spring时指明了初始化方法init()。它的初始化和ServiceRegistry很相似,不同的是它多了对方法的配置。

方法3也定义了服务标签pigeon:annotation,主要是让用户来配置扫描哪些包下面的类,将配置了@Service标签的类载入管理,将配了@Reference的注入相应的服务类

服务初始化过程

  • 将服务的配置属性装配到ProviderConfig类中,每一个服务对应一个ProviderConfig
  • 将服务key和ProviderConfig关联起来,存储到map里
  • 如果service实现了InitializingService,则调用该初始化方法。功效有点类似spring的postConstruct(),但如果我们使用pigeon的注解,那么无疑通过实现该接口来初始化是比较好的选择。
  • 将服务key和服务下面的Method关联起来,放到map里(ps:过滤了Object类和Class类的方法,过滤Class类方法比较奇怪)
  • 在NettyServer中管理服务key对应的线程池以及服务方法key(服务key-methodname)对应的线程池。
    • 如果服务配置了线程池,且没有配置方法,则该服务key会有一个对应的线程池
    • 如果配了方法,则服务方法key会有一个对应的线程池。方法用户在配置时只能配置最大线程数量,corePoolSize是最大/3
    • 如果服务配置了线程池,配了部分方法的,那么该方法会走Server的共享线程池
  • 如果Server没有启动,则启动,同时启动线程检测请求处理是否超时,若超时记录次数,并中断
  • 通过反射获取service提供的方法,除去了Object自带的方法
  • ProviderBootStrap初始化
  • 启动NettyServer
  • 发布管理的服务到注册中心

ProviderBootStrap

init():

  1. 使用ExtensionLoader加载ConfigManager对应的实现LionConfigManager;
  2. 初始化ProviderProcessHandlerFactory:
    • 提供产品:ServiceInvocationHandler
  3. 初始化SerializerFactory:
    • 提供产品:Serializer
  4. 使用ExtensionLoader加载Monitor对应的实现CatMonitor,并初始化
  5. 启动一守护线程,用于jvm关闭时,销毁资源
  6. 初始化RegistryManager(注册中心)
  7. 初始化JettyServer,并启动

服务请求超时设计

处理客户端请求是通过线程池执行的,任务提交到线程池会返回一个Future,这个future传递给了RequestTimeoutListener,这个listener睡眠5s后遍历当前在处理的请求,将超过时间的,通过future来cancel

Pigeon服务端请求过程

在客户端通过netty把请求发过来时,先挑选该请求或方法对应的线程池,将调用service处理请求的任务提交到线程池执行。

选择请求的线程池的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private ThreadPool selectThreadPool(final InvocationRequest request) {
ThreadPool pool = null;
if (!CollectionUtils.isEmpty(methodThreadPools)) {
pool = methodThreadPools.get(request.getServiceName() + "#" + request.getMethodName());
}
if (!CollectionUtils.isEmpty(serviceThreadPools)) {
pool = serviceThreadPools.get(request.getServiceName());
}
if (pool == null) {
if (enableSlowPool && requestTimeoutListener.isSlowRequest(request)) {
pool = slowRequestProcessThreadPool;
} else {
if ("server".equals(poolStrategy)) {
pool = requestProcessThreadPool;
} else {
pool = sharedRequestProcessThreadPool;
}
}
}
return pool;
}

处理请求的过程和客户端相似,也是一个责任链处理请求的模式。一共有以下filter(handler包裹filter):MonitorProcessFilter,WriteResponseProcessFilter,ContextTransferProcessFilter,ExceptionProcessFilter,GatewayProcessFilter,BusinessProcessFilter

MonitorProcessFilter: cat打点,并将请求和返回的数据打进去,清除ThreadLocal的数据

WriteResponseProcessFilter:将返回写入到channel,并调用用户注册的拦截器做返回后的处理

ContextTransferProcessFilter:将请求里面带过来的额外参数,即自定义参数放入ThreadLocal。

ExceptionProcessFilter:包装服务调用抛出的异常

GatewayProcessFilter:对客户端应用做服务限流,比如配置pigeon.provider.applimit=takeaway-mapi:10000

BusinessProcessFilter:调用用户注册的拦截器做返回前的处理,将请求里服务的方法和服务提供的方法作匹配,返回匹配度最高的那个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private int matching(ServiceMethod method, String[] paramClassNames, boolean cast)
throws InvocationFailureException {
int k = 0;
for (int i = 0; i < paramClassNames.length; i++) {
if (paramClassNames[i].equals(Constants.TRANSFER_NULL)) {
continue;
}
Class<?> paramClass = null;
try {
paramClass = ClassUtils.loadClass(paramClassNames[i]);
} catch (ClassNotFoundException e) {
throw new InvocationFailureException("no class found for parameter:" + paramClassNames[i]);
}
if (paramClass == method.getParameterClasses()[i]) {
k++;
} else if (cast) {
if (paramClassNames[i].equals(Double.class.getName())) {
paramClass = Float.class;
} else if (paramClassNames[i].equals(Integer.class.getName())) {
paramClass = Short.class;
}
if (paramClass == method.getParameterClasses()[i]) {
k++;
}
}
if (!method.getParameterClasses()[i].isAssignableFrom(paramClass)) {
return -1;
}
}
return k;
}

pigeon如何提供http形式的服务调用

pigeon除了启动nettyserver,默认还启动JettyServer,绑定了一个DispatcherServlet,根据http请求中参数明确的服务和方法以及序列化方式来给服务的返回值,默认的请求需要按照架构的规范来表明一些参数,你也可以自定义HttpAdapter,来自定义请求参数

总结:

  • 初始化最好加一个全局变量来控制,防止初始化函数被误调用

依次初始化正常业务请求、心跳、健康监测的ServiceInvocationFilter和ServiceInvocationHandler;

有些监听修改的listener,是放在一些类中,而不是这个类实现了监听修改

weight 控制了预热 0是怎么弄的

pigeon代码很多写的也比较有问题 前后不一致 设计了后没有按设计去实现

pigeon三种配置文件
config/applicationContext.properties
config/pigeon.properties
/data/webapps/config/pigeon.properties
优先级越来越高
针对测试环境,自动注册服务的问题,建议每人修改本地文件/data/webapps/config/pigeon.properties,将自动注册关闭

文章目录
  1. 1. pigeon介绍
  2. 2. pigeon中用到的技术
    1. 2.1. ClassLoader
    2. 2.2. Class.forName()
    3. 2.3. jdk动态代理
      1. 2.3.1. InvocationHandler
      2. 2.3.2. jdk动态代理创建机制
    4. 2.4. SPI机制
      1. 2.4.1. SPI的思想
      2. 2.4.2. JAVA SPI的约定
      3. 2.4.3. ServiceLoader
    5. 2.5. zookeeper
      1. 2.5.1. 数据模型
      2. 2.5.2. 使用场景
        1. 2.5.2.1. 配置管理
      3. 2.5.3. 集群管理
  3. 3. 自定义Spring Xml Bean配置
    1. 3.1. PropertyPlaceholderConfigurer
      1. 3.1.1. 基本使用
      2. 3.1.2. 原理
      3. 3.1.3. 实现思路
    2. 3.2. 自定义spring标签
  4. 4. pigeon架构
    1. 4.1. pigeon内部模块结构
    2. 4.2. zookeeper的协议格式
  5. 5. pigeon客户端初始化
    1. 5.1. InvokerBootStrap
  6. 6. pigeon客户端设计实现
    1. 6.1. InvokerConfig
    2. 6.2. ClientManager
    3. 6.3. RegistryManager
    4. 6.4. ExtensionLoader
    5. 6.5. SerializerFactory
    6. 6.6. ClusterListenerManager
    7. 6.7. ClientSelector
  7. 7. Pigeon客户端请求过程
    1. 7.1. InvokerProcessHandlerFactory
    2. 7.2. ClusterInvokeFilter
    3. 7.3. GatewayInvokeFilter
    4. 7.4. RemoteCallMonitorInvokeFilter
    5. 7.5. ContextPrepareInvokeFilter
    6. 7.6. RemoteCallInvokeFilter
      1. 7.6.1. NettyClient
    7. 7.7. InvokerHelper
  8. 8. Pigeon后台线程
    1. 8.1. 心跳
    2. 8.2. 重连
    3. 8.3. 统计
  9. 9. Pigeon服务端
    1. 9.1. pigeon自定义服务标签
    2. 9.2. Pigeon服务端初始化
      1. 9.2.1. 服务初始化过程
      2. 9.2.2. ProviderBootStrap
    3. 9.3. 服务请求超时设计
    4. 9.4. Pigeon服务端请求过程
    5. 9.5. pigeon如何提供http形式的服务调用
,