flink sql源码分析一之执行流程梳理

向量数据库大模型人工智能与算法

前言

我们在梳理flink sql 执行流程时以sql解析、sql校验、sql转化及sql优化的顺序来展开,本篇主要是对过程的梳理,不会涉及过多的代码部分,后面会针对各环节进行逐一分析。

Parser

picture.image

Validate

picture.image

这里以SqlQuery操作的convert过程为例:

picture.image

转换过程

picture.image

上面是TableEnvironmentImpl中的translate方法入口,我们来具体分析下planner.translate方法在PlannerBase转换过程如下:

picture.image

我们来看一下具体的optimize过程:

picture.image

将FlinkPhysicalRel DAG转换成ExecNode DAG


          
@VisibleForTesting
          
private[flink] def translateToExecNodePlan(
          
                                            optimizedRelNodes: Seq[RelNode]): util.List[ExecNode[_, _]] = {
          
  require(optimizedRelNodes.forall(_.isInstanceOf[FlinkPhysicalRel]))
          
  // Rewrite same rel object to different rel objects
          
  // in order to get the correct dag (dag reuse is based on object not digest)
          
  val shuttle = new SameRelObjectShuttle()
          
  val relsWithoutSameObj = optimizedRelNodes.map(_.accept(shuttle))
          
  // reuse subplan
          
  val reusedPlan = SubplanReuser.reuseDuplicatedSubplan(relsWithoutSameObj, config)
          
  // convert FlinkPhysicalRel DAG to ExecNode DAG
          
  reusedPlan.map(_.asInstanceOf[ExecNode[_, _]])
          
}
      

在translateToExecNodePlan方法中将FlinkPhysicalRel DAG转换成ExecNode DAG并尝试复用重复的子计划。

ExecNode转换成Transformation

下面代码是将ExecNode列表转成Transformation列表的入口:


          
  override protected def translateToPlan(
          
      execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
          
    val planner = createDummyPlanner()
          
    planner.overrideEnvParallelism()
          

          
    execNodes.map {
          
          // 将execNode转成Transformation
          
      case node: StreamExecNode[_] => node.translateToPlan(planner)
          
      case _ =>
          
        throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
          
          "This is a bug and should not happen. Please file an issue.")
          
    }
          
  }
      

结语

本篇主要梳理sql执行的流程中涉及到的各个步骤,针对内部调用apache calcite的api进行flink sql的优化及使用javacc 进行代码生成的部分在后续的篇幅中会逐一进行分析。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
字节跳动大数据容器化构建与落地实践
随着字节跳动旗下业务的快速发展,数据急剧膨胀,原有的大数据架构在面临日趋复杂的业务需求时逐渐显现疲态。而伴随着大数据架构向云原生演进的行业趋势,字节跳动也对大数据体系进行了云原生改造。本次分享将详细介绍字节跳动大数据容器化的演进与实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论