博客
关于我
flink 读取外部.properties的配置文件
阅读量:798 次
发布时间:2023-04-02

本文共 2611 字,大约阅读时间需要 8 分钟。

Flink JOB环境配置优化:使用ParameterTool读取外部配置文件

在Flink项目中,将环境配置信息从代码中抽离放入配置文件是常见的做法。这可以让配置管理更加灵活,便于维护和扩展。以下将详细介绍如何使用Flink自带的ParameterTool读取外部配置文件,并分享遇到的问题及解决方案。


使用ParameterTool读取外部配置文件

项目背景

随着项目复杂度的增加,环境配置信息逐渐增多。传统的方式是直接在代码中管理这些配置,导致代码冗余且难以维护。为了解决这一问题,我们选择使用Flink提供的ParameterTool工具来读取外部配置文件。


解决方案

方案一:从命令行参数获取配置信息

我们可以通过在命令行添加--config_path参数来指定配置文件路径。例如:

flink run Test.jar -config_path /home/hadoop/config.properties

在代码中,通过ParameterTool从命令行参数中获取配置信息,并将其加载到项目中。以下是示例代码:

package com.application;import com.ProjectConfig;import org.apache.flink.api.java.utils.ParameterTool;public class TestApplication extends App {    public static void main(String[] args) {        // 获取命令行参数        val parameters = ParameterTool.fromArgs(args)        // 获取配置文件路径        val path = parameters.get("config_path")                // 初始化配置信息        ParameterTool configTool = ParameterTool.fromPropertiesFile(path)        ProjectConfig.initConfig(configTool)    }}

方案二:注册全局变量

在Flink程序中,可以通过注册全局变量的方式将配置信息传递给子任务。以下是具体实现方式:

package com.util;import java.util.Properties;import org.apache.flink.api.java.utils.ParameterTool;object ProjectConfig {    var HBASE_ZOOKEEPER_QUORUM = ""    var HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "2181"    var KAFKA_QUORUM = ""        def initConfig(configname: ParameterTool): Unit = {        HBASE_ZOOKEEPER_QUORUM = configname.get("hbase.zookeeper_quorum")        HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = configname.get("hbase.zookeeper_property_clientport")        KAFKA_QUORUM = configname.get("kafka.zookeeper.connect")    }}

Yarn模式下的问题及解决方案

问题描述

在Yarn模式下运行Flink job时,可能会遇到以下问题:

  • 配置信息无法传递:从Kafka获取数据流的RichSourceFunction正常运行,但从HBase获取数据流的RichSourceFunction显示无法获取配置信息。

解决方案

为了确保配置信息能够传递到所有子任务中,我们可以使用ParameterTool的注册全局变量功能。以下是具体实现:

package com.util;import org.apache.flink.api.java.utils.ParameterTool;import scala.collection.immutable.Map;object ProjectConfig {    var HBASE_ZOOKEEPER_QUORUM = ""    var HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = "2181"    var KAFKA_QUORUM = ""        def initConfig(parameters: ParameterTool): Unit = {        HBASE_ZOOKEEPER_QUORUM = parameters.get("hbase.zookeeper_quorum")        HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT = parameters.get("hbase.zookeeper_property_clientport")        KAFKA_QUORUM = parameters.get("kafka.zookeeper.connect")    }}

Yarn运行命令示例

flink-1.9.0/bin/flink run -c com.application.TestApplication -m yarn-cluster -p 6 -yjm 1024m -ytm 2048m -ynm TestApplication Test.jar -config_path /home/hadoop/config.properties

总结

通过上述方法,我们可以将Flink JOB的环境配置信息从代码中抽离,放入外部配置文件中。这种方式不仅提高了代码的可维护性,还使得配置管理更加灵活。对于Yarn模式下的配置传递问题,使用ParameterTool的注册全局变量功能可以有效解决问题,确保所有子任务能够正常获取配置信息。

转载地址:http://iqefk.baihongyu.com/

你可能感兴趣的文章
Oracle安装、Navicat for Oracle、JDBCl连接、获取表结构
查看>>
ORACLE客户端连接
查看>>
oracle常用SQL——创建用户、表空间、授权(12C)
查看>>
Oracle数据库异常--- oracle_10g_登录em后,提示java.lang.Exception_Exception_in_sending_Request__null或Connection
查看>>
oracle数据库异常---SP2-1503: 无法初始化 Oracle 调用界面 SP2-1503: 无法初始化 Oracle 问题的解决办法
查看>>
oracle数据库笔记---oracleweb视图使用流程,及plsql安装
查看>>
oracle数据库笔记---pl/sql的基础使用方法
查看>>
Transformer 架构解释
查看>>
Oracle数据库表空间 数据文件 用户 以及表创建的SQL代码
查看>>
oracle数据库零碎---Oracle Merge 使用,表中存在数据就修改,没有数据自动添加
查看>>
Oracle数据库验证IMP导入元数据是否会覆盖历史表数据
查看>>
Oracle未开启审计情况下追踪表变更记录
查看>>
Oracle条件查询
查看>>
Oracle查看数据库会话连接
查看>>
Oracle查询前几条数据的方法
查看>>
oracle树形查询 start with connect by
查看>>
oracle毕业论文题目,历届毕业论文申报题目大全.doc
查看>>
oracle求助---win7下oracle配置相关疑问Starting Oracle Enterprise Manager 10g Database Control ...发生系统错误 5。
查看>>
Oracle流程控制语句
查看>>
oracle深度解析检查点
查看>>