目录:
1.使用git工具把项目clone到本地
git clone https://github.com/DTStack/flinkx.git
cd flinkx
2.直接下载源码
wget https://github.com/DTStack/flinkx/archive/1.10_release.zip
unzip 1.10_release.zip
cd flinkx-1.10_release
mvn clean package -DskipTests
对于不需要的插件,可以修改$FLINKX_HOME目录下的pom文件,可以将不需要的模块和flinkx-test
模块注释掉,在编译时将不会编译该插件,这样可以缩短编译时间.
注:部分模块有依赖关系,请注意。若遇到这种情况,请根据maven报错提示,将对应依赖的模块取消注释。
<modules>
<module>flinkx-core</module>
<module>flinkx-launcher</module>
<module>flinkx-test</module>
<module>flinkx-stream</module>
<!--******离线******-->
<module>flinkx-rdb</module>
<module>flinkx-mysql</module>
<module>flinkx-polardb</module>
<module>flinkx-oracle</module>
<module>flinkx-sqlserver</module>
<module>flinkx-postgresql</module>
<module>flinkx-db2</module>
<module>flinkx-dm</module>
<module>flinkx-gbase</module>
<module>flinkx-clickhouse</module>
<module>flinkx-saphana</module>
<module>flinkx-teradata</module>
<module>flinkx-greenplum</module>
<module>flinkx-kingbase</module>
<module>flinkx-hdfs</module>
<module>flinkx-hive</module>
<module>flinkx-es</module>
<module>flinkx-ftp</module>
<module>flinkx-odps</module>
<module>flinkx-hbase</module>
<module>flinkx-phoenix5</module>
<module>flinkx-carbondata</module>
<module>flinkx-kudu</module>
<module>flinkx-cassandra</module>
<module>flinkx-redis</module>
<module>flinkx-mongodb</module>
<!--******实时******-->
<module>flinkx-binlog</module>
<module>flinkx-kb</module>
<module>flinkx-kafka09</module>
<module>flinkx-kafka10</module>
<module>flinkx-kafka11</module>
<module>flinkx-kafka</module>
<module>flinkx-emqx</module>
<module>flinkx-pulsar</module>
<module>flinkx-pgwal</module>
<module>flinkx-restapi</module>
<module>flinkx-oraclelogminer</module>
</modules>
解决办法:在$FLINKX_HOME/jars目录下有这些驱动包,可以手动安装,也可以使用$FLINKX_HOME/bin目录下的脚本安装:
## windows平台
./install_jars.bat
## unix平台
./install_jars.sh
解决办法:在$FLINKX_HOME/jars目录下有maven的setting文件,内容如下,修改仓库路径后替换本地maven的setting文件,重新安装步骤一中的驱动包,然后再编译插件
<?xml version="1.0" encoding="UTF-8"?>
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
<!--maven仓库jar包存放路径,改成你自己的路径-->
<localRepository>/home/apache-maven-3.6.1/repository</localRepository>
<mirrors>
<mirror>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
<mirrorOf>central</mirrorOf>
</mirror>
</mirrors>
</settings>
首先准备要运行的任务json,这里以stream插件为例:
{
"job" : {
"content" : [ {
"reader" : {
"parameter" : {
"column" : [ {
"name": "id",
"type" : "id"
}, {
"name": "string",
"type" : "string"
} ],
"sliceRecordCount" : [ "10"]
},
"name" : "streamreader"
},
"writer" : {
"parameter" : {
"print" : true
},
"name" : "streamwriter"
}
} ],
"setting" : {
"speed" : {
"channel" : 1
}
}
}
}
命令模板:
bin/flinkx \
-mode local \
-job docs/example/stream_stream.json \
-pluginRoot syncplugins \
-flinkconf flinkconf
修改flink配置文件,指定web UI端口
vi flinkconf/flink-conf.yaml
## web服务端口,不指定的话会随机生成一个
rest.bind-port: 8888
使用下面的命令运行任务:
bin/flinkx \
-mode local \
-job docs/example/stream_stream.json \
-pluginRoot syncplugins \
-flinkconf flinkconf
任务运行后可以通过8888端口访问flink界面查看任务运行情况:
命令模板:
bin/flinkx \
-mode standalone \
-job docs/example/stream_stream.json \
-pluginRoot syncplugins \
-flinkconf $FLINK_HOME/conf \
-confProp "{\"flink.checkpoint.interval\":60000}"
首先启动flink集群:
# flink集群默认端口是8081
$FLINK_HOME/bin/start-cluster.sh
通过8081端口检查集群是否启动成功
把任务提交到集群上运行:
./bin/flinkx \
-mode standalone \
-job docs/example/stream_stream.json \
-flinkconf $FLINK_HOME/conf
在集群上查看任务运行情况
命令示例:
bin/flinkx \
-mode yarn \
-job docs/example/stream_stream.json \
-pluginRoot syncplugins \
-flinkconf $FLINK_HOME/conf \
-yarnconf $HADOOP_HOME/etc/hadoop \
-confProp "{\"flink.checkpoint.interval\":60000}"
下载对应Hadoop版本的flink shade包,放入$FLINK_HOME/lib目录下(从flink1.11开始官方不再提供打包好的flink shade包,需要自行下载打包)
下载对应版本的flink prometheus包,放入$FLINK_HOME/lib目录下
修改flink配置文件,指定flink类加载方式
vi ../conf/flink-conf.yaml
## flink类加载方式,指定为父类优先
classloader.resolve-order: parent-first
确保yarn集群是可用的,然后手动启动一个yarn session:
注:-ship: 启动flink session时上传FlinkX插件包,这样只需要在提交FlinkX任务的节点部署FlinkX插件包,其他服务器节点不需要部署,同时更换FlinkX插件包后需要重启yarn session,需要配合修改flink的类加载方式。
nohup $FLINK_HOME/bin/yarn-session.sh -qu a -ship $FLINKX_HOME/syncplugins/ &
把任务提交到这个yarn session上:
bin/flinkx \
-mode yarn \
-job docs/example/stream_stream.json \
-flinkconf $FLINK_HOME/conf \
-yarnconf $HADOOP_HOME/etc/hadoop \
-queue a
然后在flink界面查看任务运行情况:
命令示例:
bin/flinkx \
-mode yarnPer \
-job docs/example/stream_stream.json \
-pluginRoot syncplugins \
-flinkconf $FLINK_HOME/conf \
-yarnconf $HADOOP_HOME/etc/hadoop \
-flinkLibJar $FLINK_HOME/lib \
-confProp "{\"flink.checkpoint.interval\":60000}" \
-queue default
首先确保yarn集群是可用的,启动一个Yarn Application运行任务:
bin/flinkx \
-mode yarnPer \
-job docs/example/stream_stream.json \
-pluginRoot $FLINK_HOME/conf \
-yarnconf $HADOOP_HOME/etc/hadoop \
-flinkLibJar $FLINK_HOME/lib \
-queue a
然后在集群上查看任务运行情况
名称 | 说明 | 可选值 | 是否必填 | 默认值 |
---|---|---|---|---|
mode | 执行模式,也就是flink集群的工作模式 | 1.local: 本地模式 2.standalone: 独立部署模式的flink集群 3.yarn: yarn模式的flink集群,需要提前在yarn上启动一个flink session,使用默认名称"Flink session cluster" 4.yarnPer: yarn模式的flink集群,单独为当前任务启动一个flink session,使用默认名称"Flink per-job cluster" |
否 | local |
job | 数据同步任务描述文件的存放路径;该描述文件中使用json字符串存放任务信息 | 无 | 是 | 无 |
jobid | 指定flink任务名称 | 无 | 否 | Flink Job |
pluginRoot | 插件根目录地址,也就是打包后产生的pluginRoot目录。 | 无 | 否 | $FLINKX_HOME/syncplugins |
flinkconf | flink配置文件所在的目录 | $FLINK_HOME/conf | 否 | $FLINK_HOME/conf |
flinkLibJar | flink lib所在的目录(单机模式下不需要),如/opt/dtstack/flink-1.10.1/lib | $FLINK_HOME/lib | 否 | $FLINK_HOME/lib |
yarnconf | Hadoop配置文件(包括hdfs和yarn)所在的目录 | $HADOOP_HOME/etc/hadoop | 否 | $HADOOP_HOME/etc/hadoop |
queue | yarn队列,如default | 无 | 否 | default |
pluginLoadMode | yarn session模式插件加载方式 | 1.classpath:提交任务时不上传插件包,需要在yarn-node节点pluginRoot目录下部署插件包,但任务启动速度较快 2.shipfile:提交任务时上传pluginRoot目录下部署插件包的插件包,yarn-node节点不需要部署插件包,任务启动速度取决于插件包的大小及网络环境 |
否 | shipfile |
confProp | flink额外配置,如checkpoint、内存 | flink.checkpoint.interval:快照生产频率(毫秒) flink.checkpoint.timeout:快照超时时间(毫秒) jobmanager.memory.mb:perJob模式下jobmanager内存设置 taskmanager.memory.mb:perJob模式下taskmanager内存设置 taskmanager.slots:perJob模式下jobmanager slots个数设置 |
否 | 无 |
s | checkpoint快照路径,设置后从该快照恢复任务 | 否 | 无 | |
p | 自定义入参,用于替换脚本中的占位符,如脚本中存在占位符${pt1},${pt2},则该参数可配置为pt1=20200101,pt2=20200102 | 否 | 无 | |
appId | yarn session模式下,提交到指定的的flink session的application Id | 否 | 无 | |
krb5conf | 提交到开启kerberos的Hadoop集群的krb5文件路径 | 否 | 无 | |
keytab | 提交到开启kerberos的Hadoop集群的keytab文件路径 | 否 | 无 | |
principal | kerberos认证的principal | 否 | 无 |