spark编程中设置hadoop的属性
2016-04-08 09:18
225 查看
当使用spark进行编程时,如果使用yarn-client方式提交,可以通过以下方式设置hadoop的属性
源码分析:
当spark任务提交时采用yarn模式,程序会启动YarnClientSchedulerBackend进行调度
/**
* Create a Yarn client to submit an application to the ResourceManager.
* This waits until the application is running.
*/
override def start() {
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) }
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport)
argsArrayBuf ++= getExtraClientArguments
logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors
<strong><span style="color:#ff0000;">client = new Client(args, conf)</span></strong>
appId = client.submitApplication()
val conf = new SparkConf() conf.set("spark.hadoop.yarn.resourcemanager.address", "test01:8050")
源码分析:
当spark任务提交时采用yarn模式,程序会启动YarnClientSchedulerBackend进行调度
/**
* Create a Yarn client to submit an application to the ResourceManager.
* This waits until the application is running.
*/
override def start() {
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIAddress) }
val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += ("--arg", hostport)
argsArrayBuf ++= getExtraClientArguments
logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" "))
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors
<strong><span style="color:#ff0000;">client = new Client(args, conf)</span></strong>
appId = client.submitApplication()
Client创建的时候用到的conf
private[spark] class Client( val args: ClientArguments, val hadoopConf: Configuration, val sparkConf: SparkConf) extends Logging { import Client._ def this(clientArgs: ClientArguments, spConf: SparkConf) = this(clientArgs, <span style="color:#ff0000;">SparkHadoopUtil.get.newConfiguration(spConf)</span>, spConf)SparkHadoopUtil
def newConfiguration(conf: SparkConf): Configuration = { val hadoopConf = new Configuration() // Note: this null check is around more than just access to the "conf" object to maintain // the behavior of the old implementation of this code, for backwards compatibility. if (conf != null) { // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { val keyId = System.getenv("AWS_ACCESS_KEY_ID") val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY") hadoopConf.set("fs.s3.awsAccessKeyId", keyId) hadoopConf.set("fs.s3n.awsAccessKeyId", keyId) hadoopConf.set("fs.s3a.access.key", keyId) hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey) hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey) hadoopConf.set("fs.s3a.secret.key", accessKey) } // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" <strong><span style="color:#ff0000;">conf.getAll.foreach { case (key, value) => if (key.startsWith("spark.hadoop.")) { hadoopConf.set(key.substring("spark.hadoop.".length), value) } }</span></strong> val bufferSize = conf.get("spark.buffer.size", "65536") hadoopConf.set("io.file.buffer.size", bufferSize) }
相关文章推荐
- [Java基础]常用Java类方法
- PHP自定义函数指定参数类型
- python sys模块
- 請問 localtime() 為什麼不會造成 memory leak?
- 关于C语言
- hdu 2037 今年暑假不AC (java)
- 2016第七届蓝桥杯C/C++ B组省赛题解 D题
- Scala For Java的一些参考
- Spring+quartz实现自动任务调度
- python随机数
- 【代码笔记】浇花动画
- 解决Asp.net 部署后弹出登陆框
- PHP使用PDO操作数据库的乱码问题解决方法
- WordPress配置文件wp-config.php详解
- Java 内存溢出(java.lang.OutOfMemoryError)的常见情况和处理方式总结
- Eclipse4.4.2手动安装Veloeclipse-2.0.8
- Zookeeper .Net客户端代码
- Raevo的github账号被followed!!!
- 每天laravel-20160708|Repository
- 通过ip地址判断所在区域