flink-udf函数实践

udf介绍

自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。

自定义函数可以用JVM语言(例如JavaScala)或Python实现,实现者可以在UDF中使用任意第三方库,本文聚焦于使用JVM语言开发自定义函数。

当前 Flink 有如下几种函数:

  • 标量函数 将标量值转换成一个新标量值;
  • 表值函数 将标量值转换成新的行数据;
  • 聚合函数 将多行数据里的标量值转换成一个新标量值;
  • 表值聚合函数 将多行数据里的标量值转换成新的行数据;
  • 异步表值函数 是异步查询外部数据系统的特殊函数。

注意 标量和表值函数已经使用了新的基于数据类型的类型系统,聚合函数仍然使用基于TypeInformation的旧类型系统。

以下示例展示了如何创建一个基本的标量函数,以及如何在SQL里调用这个函数。

标量函数【多进一出】

自定义标量函数可以把0到多个标量值映射成1个标量值,即多进1出。它的场景就是你会将数据库表多个字段作为入参传递到函数中,然后返回值就一个,那这种情况直接选择标量函数。

udf实践

想要实现自定义标量函数,你需要扩展org.apache.flink.table.functions里面的ScalarFunction并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。求值方法必须是public的,而且名字必须是eval

下面实现一个将json串中所有content的内容通过逗号连接起来并且返回:

函数实现:

  
//@FunctionHint(output = @DataTypeHint("ROW<pool\_id int, pool\_type INT>"))  
public class WorkExperienceUDF extends ScalarFunction { // 扩展ScalarFunction  
  
    public String eval(String workExperienceJson) { // 必须是public  
        RegularUtil regularUtil = new RegularUtil(); // 自定义工具类 调用工具方法  
        String workExperienceJsonMatch = regularUtil.workExperienceJsonMatch(workExperienceJson);  
        return workExperienceJsonMatch;  
    }  
}  
  
// 自定义类RegularUtil.java中实现自定义拼接content方法  
public String workExperienceJsonMatch(String workExperienceJson) {  
        JSONArray jsonArray = new JSONArray(workExperienceJson.replaceAll("\n", ""));  
        StringBuilder concatenatedContent = new StringBuilder();  
        for (int i = 0; i < jsonArray.size(); i++) {  
            JSONObject obj = jsonArray.getJSONObject(i);  
            if (obj.getInt("type") == 3 || obj.getInt("type") == 10 || obj.getInt("type") == 14) {  
                concatenatedContent.append(obj.getStr("content")).append(",");  
            }  
        }  
  
        // 移除最后一个逗号  
        if (concatenatedContent.length() > 0) {  
            concatenatedContent.setLength(concatenatedContent.length() - 1);  
        }  
  
        return concatenatedContent.toString();  
    }  

管理自定义UDF【阿里云连接】

本地开发好之后,就可以按照如下步骤打包上传udf文件了

  • 1、先打包源文件成jar包【 mvn clean package
  • 2、上传jar包到阿里云flink实时计算页面的自定义注册函数中
  • 3、直接使用函数到sql中

具体参考以下连接维护管理自定义函数

https://help.aliyun.com/zh/flink/user-guide/manage-udfs?spm=5176.28426678.J_HeJR_wZokYt378dwP-lLl.11.9dc15181sJwhvS&scm=20140722.S_help@@%E6%96%87%E6%A1%A3@@176059.S_BB1@bl+RQW@ag0+BB2@ag0+os0.ID_176059-RL_flinkudf-LOC_search~UND~helpdoc~UND~item-OR_ser-PAR1_2150467c17280043714247336eb993-V_3-P0_0

flink sql实践

左侧圈出来的就是已经上传的udf自定义标量函数,右侧就是flink sql中用到了自定义函数picture.image

udf总结

总之udf函数是你flink sql无能为力的时候才使用的法宝,它有多种自定义函数实现,按自己的应用场景选择合适的自定义函数,并且实现起来也非常简单,同时维护和管理成本非常低,建议大家在平时实时流计算中多多尝试。

0
0
0
0
评论
未登录
暂无评论