Skip to content

Commit

Permalink
[optimization][docs] add udfdemo
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyongtian committed Nov 11, 2023
1 parent 1c816ce commit c3c5b47
Showing 1 changed file with 89 additions and 36 deletions.
125 changes: 89 additions & 36 deletions docs/docs/administrator_guide/udf_develop/how_to/Java&Scala.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,107 @@ sidebar_position: 1
id: java&scala_udf
title: Java&Scala UDF
---
## 作业创建
1. 选择java作业,并输入对应参数
```text
sub
截取函数
com.test.SubFunction
```
![create_java_udf_work.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/create_java_udf_work.png)
![create_java_udf_work2.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/create_java_udf_work2.png)
> 此时从模板构建了代码,剩下函数逻辑填补即可。

![java_udf_code.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_code.png)
### 初始环境准备

> 这里为了方便测试,返回一段字符串
1. 创建对应的表。
```sql
CREATE TABLE `Order`
(
id INT,
product_id INT,
quantity INT,
order_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '100000',
'fields.product_id.min' = '1',
'fields.product_id.max' = '100',
'rows-per-second' = '1'
);

CREATE TABLE `Product`
(
id INT,
name VARCHAR,
price DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'fields.id.min' = '1',
'fields.id.max' = '100',
'rows-per-second' = '1'
);
```

2. 接下来创建一个 `FlinkSql` 作业
![java_udf_flink_sql.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_flink_sql.png)
在数据开发界面点击保存,并且检查成功没有问题一会,运行就会在 **Catalog** 界面看到创建的 ```Order``````Product``` 表。

![catalog界面](http://www.aiwenmo.com/dinky/docs/test/yuanshuj.png)

2. 执行查询任务。

```sql
SELECT o.id, p.name, o.order_time
FROM `Order` o
INNER JOIN
`Product` p
ON o.product_id = p.id;
```

点击调试以后得到下面的效果图。

![查询任务图](http://www.aiwenmo.com/dinky/docs/test/yuanshishuj.png)

创建函数时,复制类名,以下为测试代码
```sql
create temporary function sb_j as 'com.test.SubFunction';

### 定义UDF函数

CREATE TABLE sourceTable (
id int,
java_c string
) WITH (
'connector' = 'datagen'
);
创建一个UDF函数

CREATE TABLE sinkTable
WITH (
'connector' = 'print'
)
LIKE sourceTable (EXCLUDING ALL);
对应函数的类名。

```java
com.test.SubFunction
```

![udf函数创建](http://www.aiwenmo.com/dinky/docs/test/duiyinghangshu.png)

insert into sinkTable select id,sb_j(java_c) from sourceTable;
下面是udf函数的代码。

```java
package com.test;
import org.apache.flink.table.functions.ScalarFunction;
public class SubFunction extends ScalarFunction {
public String eval(String s) {
if(null==s){
return s;
}
return s.substring(0,3);
}
}
```

![java_udf_exec.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_exec.png)
> 选择执行模式,我这里采用 `pre-job` 进行演示
如下图所示。

![udf函数代码](http://www.aiwenmo.com/dinky/docs/test/hanshubaocun.png)

### 函数的使用

3. 执行,结果查看
定义一个flinksql任务,执行下面代码。
```sql
create temporary function sb_j as 'com.test.SubFunction';
SELECT o.id, sb_j(p.name), o.order_time
FROM `Order` o
INNER JOIN
`Product` p
ON o.product_id = p.id;
```

![java_udf_flink_sout.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_flink_sout.png)
查看 `Taskmanager` 输出,正常输出,验证成功
看到如下那么表示执行成功了。

## 动图演示
![java_udf_show.gif](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_show.gif)
![使用成功](http://www.aiwenmo.com/dinky/docs/test/hangshushiyongchenggonlg.png)

0 comments on commit c3c5b47

Please sign in to comment.