硬核技术分析帖:Presto之调度模块源码解析

易观 2018-10-10 301

手把手教你分析源代码,Presto调度模块源码解析过程大复盘。

导读:Presto适合PB级海量数据复杂分析,交互式SQL查询,持跨数据源查询。本文就将详细解析Client提交查询请求到PrestoServer端响应的细节,以及还原Presto资源组校验的实现过程。


▌Presto调度模块源码解析-Client提交请求

 

当客户端以控制台Console、脚本--execute等方式提交SQL作业时,Presto的Client会将作业相关的参数和脚本封装成一个Restful请求,提交给PrestoServer端。然后再进行后续的诸如解析执行计划、拆分Stage、调度task到Worker端执行等操作。下面就介绍一下Client提交作业到服务端部分的源码。


Main函数


客户端提交作业的代码是从Presto的main函数开始的:

 

在Console的run方法中,如果入参中含有--execute,会直接将值取出作为待执行的SQL语句。否则认为是通过--file指定了SQL文件,此时会通过文件IO读取该文件中的SQL脚本。这种情况对应着通过脚本提交作业的情况,而如果--execute和--file都没有指定,则认为是通过控制台Console的方式提交SQL。


通过脚本提交作业


通过脚本的方式会直接执行Console类的executeCommand方法。按照“;”切分出SQL语句,并依次调用Console类的process方法来提交作业。


通过控制台提交作业


这种情况相对麻烦一些,他会执行Console.runConsole方法处理客户提交的请求。Presto为这种方式设置了一个AtomicBoolean existing变量来判断Client是否存在,如果不存在则不再提交后续的SQL(对应在控制台中输入了多条sql语句,并用;间隔,当前边的语句正在执行时退出Console,此时后续的sql就不会被提交了)。

 

在runConsole方法中可以看到,他会有一个while循环不断的循环处理LineReader对象读取到的命令,LineReader继承自jline.console.ConsoleReader,是一个专门处理控制台输入的Java类库(官方网址是https://jline.github.io/)。这个类每读取一行输入就会将值传递给一个名为buffer的StringBuilder对象,然后根据“;”和“\\G”来识别一个完整的SQL,并将SQL交给process方法进行调度。最后会将剩下不完整的语句赋值给重新初始化的buffer对象作为下一条SQL的开头。

 

构建请求并发送


下面我们看一下process方法,这个方法中最重要的部分是他会在try with resources中调用QueryRunner的startQuery方法,如下:

这个语法表示小括号中创建的对象如果实现了closable接口,则无论是否出现异常,都会在try catch结束后调用其close方法。


如下是startQuery方法:


然后我们一路点进去,经过QueryRunner的startInternalQuery、StatementClientFactory的newStatementClient方法之后,我们来到了StatementClientV1的构造函数:


在buildQueryRequest方法中,会构建一个目标Rest地址为/v1/statement的请求。


随后在JsonResponse.execute中会发起这个请求。然后我们搜索一下Rest地址/v1/statement,发现他的目标服务类为StatementResouce。

 

以上就是Client提交查询请求,到PrestoServer端响应的过程。


▌Presto调度模块源码解析-服务端响应-资源组选择(2)


当用户提交一个SQL作业时,Presto客户端会封装一个Request通过Restful接口将请求发送到服务端,下面就详细讲解一下服务端的处理过程。

 

Client端发送请求的地址是/v1/statement,对应到StatementResource的createQuery方法。在该方法中会调用Query的static方法create,在create方法中new了一个Query对象,然后会调用SqlQueryManager的createQuery方法。


在createQuery方法中首先会创建QueryId,生成规则是:


然后presto会判断集群是否有可用节点,其中isIncludeCoordinator变量对应config.properties配置文件中的node-scheduler.include-coordinator配置项,表示是否允许调度task到coordinator节点进行计算。

  

如果集群可用节点小于最小值1(参数query-manager.initialization-required-workers),则给出熟悉的报错信息“Cluster is still initializing……”。

 

除此之外presto还对sql长度做了限制,要求不能超过query.max-length(默认1_000_000_000,表示10亿)。


然后presto会根据提交作业的客户端信息选择资源组。

上图代码中selectGroup方法对应到InternalResourceGroupManager的selectGroup方法,其中configurationManager的类型是AtomicReference<ResourceGroupConfigurationManager>。selectGroup方法实现如下:

  

然后我们点进match方法,来到了ResourceGroupConfigurationManager接口中,我们看到这个方法的实现类有如下三个:

  

那么问题来了,当我们调用match方法时,执行的是这三个实现类中的哪一个呢?


我们首先看一下configurationManager初始化时的值,如下图所示初始化时其类型为LegacyResourceGroupConfigurationManager:

然后我们搜一下configurationManager的引用,发现在InternalResourceGroupManager类的setConfigurationManager方法中修改了他的值。如下图:

该方法在同一个类的loadConfigurationManager方法中被调用。loadConfigurationManager方法会判断常量RESOURCE_GROUPS_CONFIGURATION对应的etc/resource-groups.properties文件是否存在,如果存在会读取文件中配置的resource-groups.configuration-manager参数为Key值,到configurationManagerFactories中取出对应的ResourceGroupConfigurationManagerFactory对象,然后调用其create方法构造一个ResourceGroupConfigurationManager对象,最终赋值给configurationManager。方法的实现如下图:

 

而loadConfigurationManager方法又在PrestoServer类的初始化方法中被调用。

PS:ResourceGroupManager的实现类型是在CoordinatorModule这个类中被注入的:

 

也就是说,当PrestoServer通过其main方法调用run方法进行初始化时, 会读取etc/resource-groups.properties文件中的配置项resource-groups.configuration-manager,再以它为Key值读取configurationManagerFactories中对应的ResourceGroupConfigurationManagerFactory,然后调用读取出来的工厂类的create方法构建ResourceGroupConfigurationManager对象,最后赋值给InternalResourceGroupManager类的configurationManager。


另一个问题出现了,configurationManagerFactories这个Map<String,ResourceGroupConfigurationManagerFactory>类型的全局变量是在什么时候赋值的,里边都有哪些值呢?


我们还是搜索一下它的引用,发现在InternalResourceGroupManager的addConfigurationManagerFactory方法中对其进行了putIfAbsent操作(不存在则put)。

 

搜索引用发现,在PluginManager的installPlugin方法中调用了这个方法:

 

然后我们看一下plugin.getResourceGroupConfigurationManagerFactories方法的定义,发现他有两个实现类:

 

ResourceGroupManagerPlugin的实现如下:

 

H2ResourceGroupManagerPlugin的实现如下:

 

我们在addConfigurationManagerFactory方法中可以看到,添加到configurationManagerFactories这个Map中时,是以factory的name作为Key值,factory为Value的:

 

所以我们看一下这三个实现类对应的name值,也就是resource-groups.configuration-manager参数的可选值:

dbDbResourceGroupConfigurationManagerFactory

h2H2ResourceGroupConfigurationManagerFactory

fileFileResourceGroupConfigurationManagerFactory


然后,我们回过头来看一下PluginManager的installPlugin方法,该方法在同类的loadPlugin方法中被调用:

loadPlugin方法又在该类中再次被调用:

再往上是loadPlugins方法:

再次向上查找,原来loadPlugins方法是在PrestoServer的run方法中,先与loadConfigurationManager方法被调用的:

也就是说,Presto默认是按照LegacyResourceGroupConfigurationManager进行资源组管理的。


在PrestoServer调用run方法进行初始化时,首先会执行PluginManager的loadPlugins方法,向InternalResourceGroupManager中一个存放ResourceGroupManagerFactory类型元素的Map添加可用的资源组管理工厂类。


然后会调用InternalResourceGroupManager的loadConfigurationManager方法,判断是否配置了参数resource-groups.configuration-manager,如果配置了则会按照配置的manager类型从这个Map中根据ResourceGroupFactory的name取出相应的factory。


最后会根据取出的factory对象create一个ResourceGroupConfigurationManager,并将其赋值给configurationManager。

 

在Presto的官方文档中我们看到,presto只描述了一种name为file的ResourceGroupManagerFactory,对应FileResourceGroupConfigurationManagerFactory。看来这是官方比较推荐的类型。

接下来我们看一下FileResourceGroupConfigurationManager类的match方法,如下图:

入参SelectionCriteria是从session中取得的用户信息,如下图:

从match方法可以看到他会从selectors中找到跟session中用户信息相匹配的ResourceGroupSelector,如果得到的Optional对象不存在value,则给出熟悉的异常信息Query did not match any selection rule,如果存在value则作业继续向下执行。


selectors对象是从resource-groups.config-file配置项指定的文件中解析得到的ResourceGroup配置信息。其初始化的代码是在FileResourceGroupConfigurationManager的构造函数中,节选如下:

……

其中,config.getConfigFile方法对应配置项resource-groups.config-file:

在buildSelectors方法中可以看到selectors中添加的对象类型是StaticSelector,这样在match方法的lambda表达式s -> s.match中,s对象就是StaticSelector类型的了。

在StaticSelector的match方法中我们看到,它会根据json文件中读取到的信息与客户端信息依次做校验,如校验不通过则返回一个没有值的Optional对象,以便selectGroup方法抛出异常。如果全部校验通过,最终会封装一个SelectionContext类型的Optional对象返回。

以上就是Presto资源组校验的代码,后续将继续整理服务端响应作业提交请求的代码。

 

2018易观A10峰会

单日票新鲜出炉

26日通票279元

27日通票209元

随心组合

共享年度大数据巅峰盛典

这里,了解更多易观A10参会详情~