From c3c5b47004292521f5659a935305788be7d1b5e8 Mon Sep 17 00:00:00 2001 From: ZYT Date: Sat, 11 Nov 2023 21:43:28 +0800 Subject: [PATCH] [optimization][docs] add udfdemo --- .../udf_develop/how_to/Java&Scala.md | 125 +++++++++++++----- 1 file changed, 89 insertions(+), 36 deletions(-) diff --git a/docs/docs/administrator_guide/udf_develop/how_to/Java&Scala.md b/docs/docs/administrator_guide/udf_develop/how_to/Java&Scala.md index ab8eb56b15..a28be3a55c 100644 --- a/docs/docs/administrator_guide/udf_develop/how_to/Java&Scala.md +++ b/docs/docs/administrator_guide/udf_develop/how_to/Java&Scala.md @@ -3,54 +3,107 @@ sidebar_position: 1 id: java&scala_udf title: Java&Scala UDF --- -## 作业创建 -1. 选择java作业,并输入对应参数 -```text -sub -截取函数 -com.test.SubFunction -``` -![create_java_udf_work.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/create_java_udf_work.png) -![create_java_udf_work2.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/create_java_udf_work2.png) -> 此时从模板构建了代码,剩下函数逻辑填补即可。 -![java_udf_code.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_code.png) +### 初始环境准备 -> 这里为了方便测试,返回一段字符串 +1. 创建对应的表。 + ```sql + CREATE TABLE `Order` + ( + id INT, + product_id INT, + quantity INT, + order_time TIMESTAMP, + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'datagen', + 'fields.id.kind' = 'sequence', + 'fields.id.start' = '1', + 'fields.id.end' = '100000', + 'fields.product_id.min' = '1', + 'fields.product_id.max' = '100', + 'rows-per-second' = '1' + ); + + CREATE TABLE `Product` + ( + id INT, + name VARCHAR, + price DOUBLE, + PRIMARY KEY (id) NOT ENFORCED + ) WITH ( + 'connector' = 'datagen', + 'fields.id.min' = '1', + 'fields.id.max' = '100', + 'rows-per-second' = '1' + ); + ``` -2. 接下来创建一个 `FlinkSql` 作业 -![java_udf_flink_sql.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_flink_sql.png) + 在数据开发界面点击保存,并且检查成功没有问题一会,运行就会在 **Catalog** 界面看到创建的 ```Order``` 和 ```Product``` 表。 + + ![catalog界面](http://www.aiwenmo.com/dinky/docs/test/yuanshuj.png) + +2. 执行查询任务。 + + ```sql + SELECT o.id, p.name, o.order_time + FROM `Order` o + INNER JOIN + `Product` p + ON o.product_id = p.id; + ``` + + 点击调试以后得到下面的效果图。 + + ![查询任务图](http://www.aiwenmo.com/dinky/docs/test/yuanshishuj.png) -创建函数时,复制类名,以下为测试代码 -```sql -create temporary function sb_j as 'com.test.SubFunction'; +### 定义UDF函数 -CREATE TABLE sourceTable ( - id int, - java_c string -) WITH ( - 'connector' = 'datagen' -); +创建一个UDF函数 -CREATE TABLE sinkTable -WITH ( - 'connector' = 'print' -) -LIKE sourceTable (EXCLUDING ALL); +对应函数的类名。 +```java +com.test.SubFunction +``` + +![udf函数创建](http://www.aiwenmo.com/dinky/docs/test/duiyinghangshu.png) -insert into sinkTable select id,sb_j(java_c) from sourceTable; +下面是udf函数的代码。 + +```java +package com.test; +import org.apache.flink.table.functions.ScalarFunction; + +public class SubFunction extends ScalarFunction { + public String eval(String s) { + if(null==s){ + return s; + } + return s.substring(0,3); + } +} ``` -![java_udf_exec.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_exec.png) -> 选择执行模式,我这里采用 `pre-job` 进行演示 +如下图所示。 + +![udf函数代码](http://www.aiwenmo.com/dinky/docs/test/hanshubaocun.png) + +### 函数的使用 -3. 执行,结果查看 +定义一个flinksql任务,执行下面代码。 +```sql +create temporary function sb_j as 'com.test.SubFunction'; + +SELECT o.id, sb_j(p.name), o.order_time +FROM `Order` o +INNER JOIN +`Product` p +ON o.product_id = p.id; +``` -![java_udf_flink_sout.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_flink_sout.png) -查看 `Taskmanager` 输出,正常输出,验证成功 +看到如下那么表示执行成功了。 -## 动图演示 -![java_udf_show.gif](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_show.gif) \ No newline at end of file +![使用成功](http://www.aiwenmo.com/dinky/docs/test/hangshushiyongchenggonlg.png) \ No newline at end of file