本文共 2611 字,大约阅读时间需要 8 分钟。
在Flink项目中,将环境配置信息从代码中抽离放入配置文件是常见的做法。这可以让配置管理更加灵活,便于维护和扩展。以下将详细介绍如何使用Flink自带的ParameterTool读取外部配置文件,并分享遇到的问题及解决方案。
随着项目复杂度的增加,环境配置信息逐渐增多。传统的方式是直接在代码中管理这些配置,导致代码冗余且难以维护。为了解决这一问题,我们选择使用Flink提供的ParameterTool工具来读取外部配置文件。
我们可以通过在命令行添加--config_path参数来指定配置文件路径。例如:
flink run Test.jar -config_path /home/hadoop/config.properties
在代码中,通过ParameterTool从命令行参数中获取配置信息,并将其加载到项目中。以下是示例代码:
package com.application;import com.ProjectConfig;import org.apache.flink.api.java.utils.ParameterTool;public class TestApplication extends App { public static void main(String[] args) { // 获取命令行参数 val parameters = ParameterTool.fromArgs(args) // 获取配置文件路径 val path = parameters.get("config_path") // 初始化配置信息 ParameterTool configTool = ParameterTool.fromPropertiesFile(path) ProjectConfig.initConfig(configTool) }} 在Flink程序中,可以通过注册全局变量的方式将配置信息传递给子任务。以下是具体实现方式:
package com.util;import java.util.Properties;import org.apache.flink.api.java.utils.ParameterTool;object ProjectConfig { var HBASE_ZOOKEEPER_QUORUM = "" var HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "2181" var KAFKA_QUORUM = "" def initConfig(configname: ParameterTool): Unit = { HBASE_ZOOKEEPER_QUORUM = configname.get("hbase.zookeeper_quorum") HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = configname.get("hbase.zookeeper_property_clientport") KAFKA_QUORUM = configname.get("kafka.zookeeper.connect") }} 在Yarn模式下运行Flink job时,可能会遇到以下问题:
为了确保配置信息能够传递到所有子任务中,我们可以使用ParameterTool的注册全局变量功能。以下是具体实现:
package com.util;import org.apache.flink.api.java.utils.ParameterTool;import scala.collection.immutable.Map;object ProjectConfig { var HBASE_ZOOKEEPER_QUORUM = "" var HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "2181" var KAFKA_QUORUM = "" def initConfig(parameters: ParameterTool): Unit = { HBASE_ZOOKEEPER_QUORUM = parameters.get("hbase.zookeeper_quorum") HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = parameters.get("hbase.zookeeper_property_clientport") KAFKA_QUORUM = parameters.get("kafka.zookeeper.connect") }} flink-1.9.0/bin/flink run -c com.application.TestApplication -m yarn-cluster -p 6 -yjm 1024m -ytm 2048m -ynm TestApplication Test.jar -config_path /home/hadoop/config.properties
通过上述方法,我们可以将Flink JOB的环境配置信息从代码中抽离,放入外部配置文件中。这种方式不仅提高了代码的可维护性,还使得配置管理更加灵活。对于Yarn模式下的配置传递问题,使用ParameterTool的注册全局变量功能可以有效解决问题,确保所有子任务能够正常获取配置信息。
转载地址:http://iqefk.baihongyu.com/