您的位置:首页 > 运维架构

spark加载properties配置文件方法

2017-06-14 17:47 701 查看
首先我先介绍一下错误的写法,这个加载配置文件方法是错误的:

val props = new Properties();
val loader = getClass.getClassLoader;
props.load(new FileInputStream(loader.getResource("config.properties").getFile()))这个是把配置文件直接放在resource的目录下,去获得配置文件信息,这个写法在spark程序中会报找不到配置文件。
正确写法:

val props = new Properties();

props.load(new FileInputStream("config.properties"));
val hdfspath = props.getProperty("hdfspath");
val mysqlpath = props.getProperty("mysql");

你可以在这些地方加载配置文件
1. kafkaStream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>

val filePath = "config.properties"
LogUtil.info(filePath)
val props = new Properties()
props.load(new FileInputStream(filePath))

LogUtil.info("一")
props.keySet().toArray().foreach { x =>
LogUtil.info(x + "\t一" + props.getProperty(x.toString()))
}

 
2.    partition.foreach { x =>
LogUtil.info(x)

val filePath1 = "config.properties"
LogUtil.info(filePath1)
val props1 = new Properties()
props1.load(new FileInputStream(filePath1))

LogUtil.info("二")
props1.keySet().toArray().foreach { x =>
LogUtil.info(x + "\t二" + props1.getProperty(x.toString()))
}

}
3.
def main(args: Array[String]): Unit = {

var kafkaZkQuorum = ""
var group = "EventETL_test_group"
var topics = ""
var numThreads = 1
var timeDuration = 3

var checkpointDir = "/Users/test/sparktemp"

println("Usage: configuration file")

val filePath = "config.properties"
LogUtil.info(filePath)
val props = new Properties()
props.load(new FileInputStream(filePath))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: