Flink提交任务命令整理

发布时间 2023-06-16 17:34:25作者: 超级无敌小剑

 环境: Flink 1.13.6和Flink 1.14.4

yarn-session模式:


--启动yarn seeion
bin/yarn-session.sh \
-s 8 \
-jm 4g \
-tm 16g \
-nm yarn-session-flink \
-d 


yarn-session.sh  -jm 1g -tm 8g -s 4 -d
参数解释:
 -jm 1024 表示jobmanager 1024M内存
 -tm 1024表示taskmanager 1024M内存
 -s  每一个TaskManager上的slots数量。
 -d 任务后台运行
 -nm,--name YARN上为一个自定义的应用设置一个名字
 -D<property=value> 动态属性  类似于这种-Dparallelism.default=3
 -q,--query显示可用的YARN资源(内存,内核)
 -qu,--queue 指定YARN队列。
 -t,--ship 传输指定目录下的文件(t用于传输)
 -nl,--nodeLabel 为YARN应用程序指定YARN节点标签
 -z,--Zookeeper Namespace 命名空间,用于创建高可用模式下的Zookeeper子路径
 -j,--jar <arg>        Flink jar文件的路径

 
--提交到创建的这个yarn-session上执行
flink run    -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar
flink run  -t yarn-session -Dyarn.application.id=application_1650018331890_0018   -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar
flink run   -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar


yarn-per-job模式:

提交任务的时候如果需要自己指定tm和jm的内存大小,可以注释掉配置文件里面相关的内存配置。

--旧版本的flink提交per-job任务是通过 -m yarn-cluster来提交的
flink run -m yarn-cluster  -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar

参数列表

./flink run \
-m yarn-cluster \
-yjm 1024  \
-ytm 1024  \
-ynm ?????? \
-c org.apache.flink.examples.java.wordcount.WordCount \
-yj /u01/isi/application/component/flink-1.13.6/examples/batch/WordCount.jar

-m	    执行模式为yarn-cluster
        也可以指定要连接的JobManager的地址。使用这个标志可以连接到配置中指定的不同的JobManager。注意:只有高可用性配置为NONE时才会考虑此选项。
 
-yjm	指定JobManager所在的Container内存。单位:MB
-ytm	每一个TaskManager Container的内存,单位MB。
-ys	    每一个TaskManager中slots的数量。
-ynm	YARN中application的名称。
-c	    指定Job对应的jar包中主函数所在类名。
-yj,--yarnjar <arg>              jar包位置
-yt,--yarnship 传输指定目录下的文件(t用于传输)
-yqu,--yarnqueue <arg>      指定yarn队列
-yD <property=value>        自定义参数 
-yid,--yarnapplicationId <arg>    指定yarnid执行
-yq,--yarnquery       显示可用的YARN资源(内存,核心)                  
-d,--detached         后台执行
     




--新版本的Flink可以通过-t参数提交统一格式的任务
flink run -t yarn-per-job -Dyarn.application.name=nmmd -c org.apache.flink.examples.java.wordcount.WordCount examples/batch/WordCount.jar



参数列表

./bin/flink run \
# 指定yarn的Per-job模式,-t等价于-Dexecution.target
-t yarn-per-job \
# yarn应用的自定义name
-Dyarn.application.name=consumerDemo \
# 未指定并行度时的默认并行度值, 该值默认为1
-Dparallelism.default=3 \
# JobManager进程的内存
-Djobmanager.memory.process.size=2048mb \
# TaskManager进程的内存
-Dtaskmanager.memory.process.size=2048mb \
# 每个TaskManager的slot数目, 最佳配比是和vCores保持一致
-Dtaskmanager.numberOfTaskSlots=2 \
# 防止日志中文乱码
-Denv.java.opts="-Dfile.encoding=UTF-8" \
# 支持火焰图, Flink1.13新特性, 默认为false, 开发和测试环境可以开启, 生产环境建议关闭
-Drest.flamegraph.enabled=true \
# 入口类
-c xxxx.MainClass \
# 提交Job的jar包
xxxx.jar

-t
给定应用程序的部署目标,相当于“run”。-Dexecution.target。
对于“run”操作,当前可用的目标是:“remote”,“local”,“kubernetes-session”,“yarn-per-job”,“yarn-session”。
对于“run-application”操作,当前可用的目标是:“kubernetes-application”,“yarn-application”。



--list 下面需要的FlinkJobID可以通过这个命令查看
参数:
-a,--all显示所有程序及其jobid
-r,--running只显示正在运行的程序及其jobid
-s,--scheduled只显示已调度的程序及其jobid

flink list \
-a \
-t yarn-session \
-Dyarn.application.id=application_1650018331890_0010

--cancel 此时无法指定savepoint

flink cancel \
-t yarn-session \
# Yarn的ApplicationID值, 可以通过Yarn的webUI直接查看
-Dyarn.application.id=${YarnApplicationID} \
# Flink的JobID, 可以通过Yarn找到Flink的WebUI进行查看
${FlinkJobID}

--stop 可以指定savepoint,执行完成可以看到savepoint的具体地址,用于Job启动时的savepoint地址
参数:
-d,--drain                           在获取保存点和停止连接之前发送MAX_WATERMARK
-p,--savepointPath <savepointPath>   savepoint保存地址,等价于-Dstate.savepoints.dir=xxx


flink stop \
-t yarn-session \
-Dyarn.application.id=${YarnApplicationID} \
# 指定savepoint存放位置
--savepointPath hdfs://xx:8020/test/checkpoint_test \
${FlinkJobID}


--从savepoint启动

./bin/flink run \
-t yarn-session \
-Djobmanager.memory.process.size=2048mb \
-Dtaskmanager.memory.process.size=2048mb \
-Dtaskmanager.numberOfTaskSlots=2 \
-Denv.java.opts="-Dfile.encoding=UTF-8" \
-Drest.flamegraph.enabled=true \
# 指定savepoint地址
--fromSavepoint hdfs://xx:8020/test/checkpoint_test/savepoint-e28bdd-ef7febad087e \
-c xxx \
xxx.jar



yarn-session.sh \
-s 8 \
-jm 4g \
-tm 16g \
-nm yarn-session-flink \
-d