flink之udf执行流程分析篇

技术
flink UDX

1.UDF: 自定义标量函数(User Defined Scalar Function)。一行输入一行输出。 2.UDAF: 自定义聚合函数。多行输入一行输出。 3.UDTF: 自定义表函数。一行输入多行输出或一列输入多列输出。

sql 语句


          
select
          
            first_non_null(businessId) as id
          
          from
          
            test_new
          
          where
          
             eventType = '1'
          
          group by
          
            businessId
      
执行流程:

自定义udaf


          
public class FirstNonNull extends AggregateFunction<String[],ArrayList<String>> {
          

          
    @Override
          
    public ArrayList<String> createAccumulator() {
          
        return new ArrayList<>();
          
    }
          

          
    @Override
          
    public String[] getValue(ArrayList<String> data) {
          
        if (data == null || data.size() == 0) {
          
            return null;
          
        }
          
        return data.toArray(new String[data.size()]);
          
    }
          

          
    public void accumulate(ArrayList<String> src, String... input) {
          
        if (src.size() == 0) {
          
            addAll(src, input);
          
        } else {
          
            String curr_order_by_value = String.valueOf(input[0]);
          
            String src_order_by_value = String.valueOf(src.get(0));
          
            if (src_order_by_value.compareTo(curr_order_by_value) > 0) {
          
                addAll(src, input);
          
            } else if (src.contains(null)) {
          
                fillNull(src, input);
          
            }
          
        }
          
    }
          

          
    public void fillNull(ArrayList<String> src, String[] input) {
          
        int size = src.size();
          
        for (int i = 0; i < size; i++) {
          
            if (src.get(i) == null) {
          
                src.set(i, input[i] == null ? null : String.valueOf(input[i]));
          
            }
          
        }
          
    }
          

          
    public void addAll(ArrayList<String> src, String[] input) {
          
        for (int i = 0; i < input.length; i++) {
          
            Object value = input[i];
          
            if (i >= src.size()) {
          
                src.add(i, value == null ? null : String.valueOf(value));
          
            } else {
          
                if (value != null) {
          
                    src.set(i, String.valueOf(value));
          
                }
          
            }
          
        }
          
    }
          
}    
      

一个aggFunction必须要实现的方法有:

  • createAccumulator创建accumulator
  • accumulate(ACC accumulator, [user defined inputs])
  • getValue返回结果

一个aggFunction可选的方法有:

•retract: OVER窗口聚合时使用; •merge: 使用窗口操作时必须实现(SessionWindow)。用于优化hop的场景,详细说明见:https://www.zhihu.com/question/346639699; •resetAccumulator:used for data set grouping aggregates

重点说一下accumulate方法和retract方法

•accumulate方法


          
/**
          
* param: accumulator           the accumulator which contains the current aggregated results
          
* param: [user defined inputs] the input value (usually obtained from a new arrived data).
          
*/
          
public void accumulate(ACC accumulator, [user defined inputs])
          
}
      

它的输入的第一参数为包含所有结果集的accumulator(归集器); 第二个参数是当前到达的输入数据。这里是用于归集的逻辑。

•retract方法


          
 /**
          
 * param: accumulator           the accumulator which contains the current aggregated results
          
 * param: [user defined inputs] the input value (usually obtained from a new arrived data).
          
 */
          
  public void retract(ACC accumulator, [user defined inputs])
          
 }
      

它的输入的第一参数为包含所有结果集的accumulator(归集器); 第二个参数是当前到达的输入数据。这里是用于回撤的逻辑。

任务层面

org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:


          
protected void processInput(ActionContext context) throws Exception {
          
        if (!inputProcessor.processInput()) {
          
            context.allActionsCompleted();
          
        }
          
    }
      

在这里会使用inputProcessor来往下进行processInput操作。由于是单一的source源,所以这个inputProcessor对应的是StreamOneInputProcessor类型的,对应的processInput方法为org.apache.flink.streaming.runtime.io.StreamOneInputProcessor#processInput:


          
    @Override
          
    public boolean processInput() throws Exception {
          
        // 初始化输入的记录数量
          
        initializeNumRecordsIn();
          
        StreamElement recordOrMark = input.pollNextNullable();
          
        if (recordOrMark == null) {
          
            input.isAvailable().get();
          
            return !checkFinished();
          
        }
          
        // 从input中获取到对应的channel
          
        int channel = input.getLastChannel();
          
        checkState(channel != StreamTaskInput.UNSPECIFIED);
          
        // 处理对应channel的记录
          
        processElement(recordOrMark, channel);
          
        return true;
          
    }
      

这里是处理input的地方,我们主要关注下processElement方法。

算子层面

我们主要关注下org.apache.flink.streaming.runtime.io.StreamOneInputProcessor#processElement方法:


          
private void processElement(StreamElement recordOrMark, int channel) throws Exception {
          
        if (recordOrMark.isRecord()) {// 如果输入是记录
          
            // now we can do the actual processing
          
            StreamRecord<IN> record = recordOrMark.asRecord();
          
            synchronized (lock) {
          
                 // 增加输入的记录数
          
                numRecordsIn.inc();
          
                streamOperator.setKeyContextElement1(record);
          
                 // 使用算子处理record
          
                streamOperator.processElement(record);
          
            }
          
        }
          
        else if (recordOrMark.isWatermark()) {// 如果输入是水位信息
          
            // handle watermark
          
            statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), channel);
          
        } else if (recordOrMark.isStreamStatus()) {// 如果输入是stream的状态信息
          
            // handle stream status
          
            statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), channel);
          
        } else if (recordOrMark.isLatencyMarker()) {// 如果是延迟的水平
          
            // handle latency marker
          
            synchronized (lock) {
          
                streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
          
            }
          
        } else {// 不知道StreamElement的类型时抛出异常
          
            throw new UnsupportedOperationException("Unknown type of StreamElement");
          
        }
          
    }
      

这个方法处理的StreamElement类型比较多,我们主要看下它对普通记录的处理方法streamOperator.processElement(record),由于我们这里使用的是group by 操作,所以对应的算子为KeyedProcessOperator,方法为org.apache.flink.streaming.api.operators.KeyedProcessOperator#processElement:


          
    @Override
          
    public void processElement(StreamRecord<IN> element) throws Exception {
          
        // 设置时间戳
          
        collector.setTimestamp(element);
          
        context.element = element;
          
        // 使用用户定义的udf来处理元素
          
        userFunction.processElement(element.getValue(), context, collector);
          
        context.element = null;
          
    }
      

这里需要注意的一点是这个userFunction应当算是我们自定义的udf的一个代理,它会动态编译产生一个GroupAggsHandler类,在类内部的方法中处理时会回调我们自定义的udf中实现的方法(接口中约定好的那些方法)。

Agg层面

我们看下上面的userFunction的一些属性:

picture.image

这个genAggHandler是在哪里生成的呢?这里简单提一下,见下图:

picture.image

在flink解析sql生成streamGraph的过程中会调用org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate#translateToPlanInternal方法,在该方法中会创建aggsHandler对象。

也就是说GroupAggFunction中实际产生作用的是GroupAggsHandler对象,genAggHandler中动态编译产生的就是GroupAggsHandler,在genAggHandler中code的具体内容如下:


          
 public final class GroupAggsHandler$39 implements org.apache.flink.table.runtime.generated.AggsHandleFunction {
          

          
          private transient com.test.dream.flink.udf.aggfunctions.FirstNonNull function_com$test$dream$flink$udf$aggfunctions$FirstNonNull$53ab4e4c4415303976432217433a2633;
          
          private transient org.apache.flink.table.dataformat.DataFormatConverters.GenericConverter converter$21;
          
          private transient org.apache.flink.table.dataformat.DataFormatConverters.GenericConverter converter$24;
          
          private org.apache.flink.table.dataformat.BinaryGeneric agg0_acc_internal;
          
          private java.util.ArrayList agg0_acc_external;
          
          private transient org.apache.flink.table.dataformat.DataFormatConverters.GenericConverter converter$27;
          
          private transient org.apache.flink.table.dataformat.DataFormatConverters.GenericConverter converter$28;
          
          private transient org.apache.flink.table.runtime.typeutils.BinaryStringSerializer typeSerializer$31;
          
          private transient org.apache.flink.table.dataformat.DataFormatConverters.StringConverter converter$33;
          
          private transient org.apache.flink.table.dataformat.DataFormatConverters.ObjectArrayConverter converter$37;
          

          
          public GroupAggsHandler$39(java.lang.Object[] references) throws Exception {
          
            function_com$test$dream$flink$udf$aggfunctions$FirstNonNull$53ab4e4c4415303976432217433a2633 = (((com.test.dream.flink.udf.aggfunctions.FirstNonNull) references[0]));
          
            converter$21 = (((org.apache.flink.table.dataformat.DataFormatConverters.GenericConverter) references[1]));
          
            converter$24 = (((org.apache.flink.table.dataformat.DataFormatConverters.GenericConverter) references[2]));
          
            converter$27 = (((org.apache.flink.table.dataformat.DataFormatConverters.GenericConverter) references[3]));
          
            converter$28 = (((org.apache.flink.table.dataformat.DataFormatConverters.GenericConverter) references[4]));
          
            typeSerializer$31 = (((org.apache.flink.table.runtime.typeutils.BinaryStringSerializer) references[5]));
          
            converter$33 = (((org.apache.flink.table.dataformat.DataFormatConverters.StringConverter) references[6]));
          
            converter$37 = (((org.apache.flink.table.dataformat.DataFormatConverters.ObjectArrayConverter) references[7]));
          
          }
          

          
          @Override
          
          public void open(org.apache.flink.table.runtime.dataview.StateDataViewStore store) throws Exception {
          

          
            function_com$test$dream$flink$udf$aggfunctions$FirstNonNull$53ab4e4c4415303976432217433a2633.open(new org.apache.flink.table.functions.FunctionContext(store.getRuntimeContext()));
          

          
          }
          

          
          @Override
          
          public void accumulate(org.apache.flink.table.dataformat.BaseRow accInput) throws Exception {
          

          
            org.apache.flink.table.dataformat.BinaryString field$29;
          
            boolean isNull$29;
          
            isNull$29 = accInput.isNullAt(0);
          
            field$29 = org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8;
          
            if (!isNull$29) {
          
              field$29 = accInput.getString(0);
          
            }
          
            org.apache.flink.table.dataformat.BinaryString field$30 = field$29;
          
            if (!isNull$29) {
          
              field$30 = (org.apache.flink.table.dataformat.BinaryString) (typeSerializer$31.copy(field$30));
          
            }
          
            org.apache.flink.table.dataformat.BinaryString field$32 = field$30;
          
            if (!isNull$29) {
          
              field$32 = (org.apache.flink.table.dataformat.BinaryString) (typeSerializer$31.copy(field$32));
          
            }
          
            function_com$test$dream$flink$udf$aggfunctions$FirstNonNull$53ab4e4c4415303976432217433a2633.accumulate(agg0_acc_external, isNull$29 ? null : (java.lang.String) converter$33.toExternal((org.apache.flink.table.dataformat.BinaryString) field$32));
          
          }
          

          
          @Override
          
          public void retract(org.apache.flink.table.dataformat.BaseRow retractInput) throws Exception {
          

          
            throw new java.lang.RuntimeException("This function not require retract method, but the retract method is called.");
          
          }
          
          @Override
          
          public void merge(org.apache.flink.table.dataformat.BaseRow otherAcc) throws Exception {
          
            throw new java.lang.RuntimeException("This function not require merge method, but the merge method is called.");
          
          }
          

          
          @Override
          
          public void setAccumulators(org.apache.flink.table.dataformat.BaseRow acc) throws Exception {
          
            org.apache.flink.table.dataformat.BinaryGeneric field$26;
          
            boolean isNull$26;
          
            isNull$26 = acc.isNullAt(0);
          
            field$26 = null;
          
            if (!isNull$26) {
          
              field$26 = acc.getGeneric(0);
          
            }
          
            agg0_acc_internal = field$26;
          
            agg0_acc_external = (java.util.ArrayList) converter$27.toExternal((org.apache.flink.table.dataformat.BinaryGeneric) agg0_acc_internal);
          
          }
          
          @Override
          
          public void resetAccumulators() throws Exception {
          
            agg0_acc_external = (java.util.ArrayList) function_com$test$dream$flink$udf$aggfunctions$FirstNonNull$53ab4e4c4415303976432217433a2633.createAccumulator();
          
            agg0_acc_internal = (org.apache.flink.table.dataformat.BinaryGeneric) converter$28.toInternal((java.util.ArrayList) agg0_acc_external);
          
          }
          
          @Override
          
          public org.apache.flink.table.dataformat.BaseRow getAccumulators() throws Exception {
          
            final org.apache.flink.table.dataformat.GenericRow acc$25 = new org.apache.flink.table.dataformat.GenericRow(1);
          
            agg0_acc_internal = (org.apache.flink.table.dataformat.BinaryGeneric) converter$24.toInternal((java.util.ArrayList) agg0_acc_external);
          
            if (false) {
          
              acc$25.setNullAt(0);
          
            } else {
          
              acc$25.setField(0, agg0_acc_internal);;
          
            }
          
            return acc$25;
          
          }
          
          @Override
          
          public org.apache.flink.table.dataformat.BaseRow createAccumulators() throws Exception {
          
            final org.apache.flink.table.dataformat.GenericRow acc$23 = new org.apache.flink.table.dataformat.GenericRow(1);
          
            org.apache.flink.table.dataformat.BinaryGeneric acc_internal$22 = (org.apache.flink.table.dataformat.BinaryGeneric) (org.apache.flink.table.dataformat.BinaryGeneric) converter$21.toInternal((java.util.ArrayList) function_com$test$dream$flink$udf$aggfunctions$FirstNonNull$53ab4e4c4415303976432217433a2633.createAccumulator());
          
            if (false) {
          
              acc$23.setNullAt(0);
          
            } else {
          
              acc$23.setField(0, acc_internal$22);;
          
            }
          
            return acc$23;
          
          }
          
          @Override
          
          public org.apache.flink.table.dataformat.BaseRow getValue() throws Exception {
          
            final org.apache.flink.table.dataformat.GenericRow aggValue$38 = new org.apache.flink.table.dataformat.GenericRow(1);
          
            java.lang.String[] value_external$34 = (java.lang.String[])
          
            org.apache.flink.table.dataformat.BaseArray value_internal$35 =
          
              (org.apache.flink.table.dataformat.BaseArray) converter$37.toInternal((java.lang.String[]) value_external$34);
          
            boolean valueIsNull$36 = value_internal$35 == null;
          
            if (valueIsNull$36) {
          
              aggValue$38.setNullAt(0);
          
            } else {
          
              aggValue$38.setField(0, value_internal$35);;
          
            }
          
            return aggValue$38;
          
          }
          

          
          @Override
          
          public void cleanup() throws Exception {
          
          }
          

          
          @Override
          
          public void close() throws Exception {
          
          function_com$test$dream$flink$udf$aggfunctions$FirstNonNull$53ab4e4c4415303976432217433a2633.close();               
          
          }
          
        }
      

在代码中可以看到,内部调用了FirstNonNull函数的实现方法。

紧接着来看org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction#processElement方法:


          
    @Override
          
    public void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) throws Exception {
          
        long currentTime = ctx.timerService().currentProcessingTime();
          
        // register state-cleanup timer
          
        registerProcessingCleanupTimer(ctx, currentTime);
          

          
        BaseRow currentKey = ctx.getCurrentKey();
          

          
        boolean firstRow;
          
        BaseRow accumulators = accState.value();
          
        if (null == accumulators) {
          
            firstRow = true;
          
            // 这个function就是上面的GroupAggsHandler$39类型的对象,在GroupAggsHandler$39对象内部的createAccumulators方法中会回调我们自定义的udf的createAccumulator()方法
          
            accumulators = function.createAccumulators();
          
        } else {
          
            firstRow = false;
          
        }
          

          
        // set accumulators to handler first
          
        function.setAccumulators(accumulators);
          
        // get previous aggregate result
          
        BaseRow prevAggValue = function.getValue();
          

          
        // update aggregate result and set to the newRow
          
        if (isAccumulateMsg(input)) {
          
            // accumulate input
          
            //在GroupAggsHandler$39对象内部的accumulate方法中会回调我们自定义的udf的accumulate()方法
          
            function.accumulate(input);
          
        } else {
          
            // retract input
          
            function.retract(input);
          
        }
          
        // get current aggregate result
          
        // 在GroupAggsHandler$39对象内部的getValue方法中会回调我们自定义的udf的getValue()方法
          
        BaseRow newAggValue = function.getValue();
          

          
        // get accumulator
          
         // 在GroupAggsHandler$39对象内部的getAccumulators方法中会回调我们自定义的udf的getAccumulators()方法
          
        accumulators = function.getAccumulators();
          

          
        -------------省略回撤处理和状态清理部分代码----------------
          
    }
      

这里主要需要注意以下几点:

•上面代码中的function就是上面的GroupAggsHandler39类型的对象,GroupAggsHandler39类型的对象,在GroupAggsHandler39对象内部的createAccumulators方法中会回调我们自定义的udf的createAccumulator()方法; •在GroupAggsHandler39对象内部的accumulate方法中会回调我们自定义的udfaccumulate()方法;•在GroupAggsHandler39对象内部的accumulate方法中会回调我们自定义的udf的accumulate()方法; •在GroupAggsHandler39对象内部的getValue方法中会回调我们自定义的udf的getValue()方法; •在GroupAggsHandler$39对象内部的getAccumulators方法中会回调我们自定义的udf的getAccumulators()方法。

这步执行完成后,去进入整个graph的下一个算子中,调用下一个算子的processElement方法,直到sink算子,完成sink操作。

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
高性能存储虚拟化方案 NVMe over Fabric 在火山引擎的演进
在云计算中,虚拟化存储扮演着重要角色,其中 iSCSI 协议在业界开放、流行多年。近年来,拥有更优性能的 NVMe over Fabrics 协议也得到了发展。本次分享介绍了 NVMe over Fabrics 在云原生和虚拟化方向的演进工作和成果。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论