Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[optimization][docs] add udfdemo #2519

Merged
merged 1 commit into from
Nov 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 89 additions & 36 deletions docs/docs/administrator_guide/udf_develop/how_to/Java&Scala.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,107 @@ sidebar_position: 1
id: java&scala_udf
title: Java&Scala UDF
---
## 作业创建
1. 选择java作业,并输入对应参数
```text
sub
截取函数
com.test.SubFunction
```
![create_java_udf_work.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/create_java_udf_work.png)
![create_java_udf_work2.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/create_java_udf_work2.png)
> 此时从模板构建了代码,剩下函数逻辑填补即可。

![java_udf_code.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_code.png)
### 初始环境准备

> 这里为了方便测试,返回一段字符串
1. 创建对应的表。
```sql
CREATE TABLE `Order`
(
id INT,
product_id INT,
quantity INT,
order_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '100000',
'fields.product_id.min' = '1',
'fields.product_id.max' = '100',
'rows-per-second' = '1'
);

CREATE TABLE `Product`
(
id INT,
name VARCHAR,
price DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'fields.id.min' = '1',
'fields.id.max' = '100',
'rows-per-second' = '1'
);
```

2. 接下来创建一个 `FlinkSql` 作业
![java_udf_flink_sql.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_flink_sql.png)
在数据开发界面点击保存,并且检查成功没有问题一会,运行就会在 **Catalog** 界面看到创建的 ```Order``` 和 ```Product``` 表。

![catalog界面](http://www.aiwenmo.com/dinky/docs/test/yuanshuj.png)

2. 执行查询任务。

```sql
SELECT o.id, p.name, o.order_time
FROM `Order` o
INNER JOIN
`Product` p
ON o.product_id = p.id;
```

点击调试以后得到下面的效果图。

![查询任务图](http://www.aiwenmo.com/dinky/docs/test/yuanshishuj.png)

创建函数时,复制类名,以下为测试代码
```sql
create temporary function sb_j as 'com.test.SubFunction';

### 定义UDF函数

CREATE TABLE sourceTable (
id int,
java_c string
) WITH (
'connector' = 'datagen'
);
创建一个UDF函数

CREATE TABLE sinkTable
WITH (
'connector' = 'print'
)
LIKE sourceTable (EXCLUDING ALL);
对应函数的类名。

```java
com.test.SubFunction
```

![udf函数创建](http://www.aiwenmo.com/dinky/docs/test/duiyinghangshu.png)

insert into sinkTable select id,sb_j(java_c) from sourceTable;
下面是udf函数的代码。

```java
package com.test;

import org.apache.flink.table.functions.ScalarFunction;

public class SubFunction extends ScalarFunction {
public String eval(String s) {
if(null==s){
return s;
}
return s.substring(0,3);
}
}
```

![java_udf_exec.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_exec.png)
> 选择执行模式,我这里采用 `pre-job` 进行演示
如下图所示。

![udf函数代码](http://www.aiwenmo.com/dinky/docs/test/hanshubaocun.png)

### 函数的使用

3. 执行,结果查看
定义一个flinksql任务,执行下面代码。
```sql
create temporary function sb_j as 'com.test.SubFunction';

SELECT o.id, sb_j(p.name), o.order_time
FROM `Order` o
INNER JOIN
`Product` p
ON o.product_id = p.id;
```

![java_udf_flink_sout.png](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_flink_sout.png)
查看 `Taskmanager` 输出,正常输出,验证成功
看到如下那么表示执行成功了。

## 动图演示
![java_udf_show.gif](http://www.aiwenmo.com/dinky/docs/zh-CN/udf_develop/how_to/java_udf_show.gif)
![使用成功](http://www.aiwenmo.com/dinky/docs/test/hangshushiyongchenggonlg.png)
Loading