您的位置:首页 > 其它

IDEA+SPARK 本地伪分布式开发日志调测

2017-05-30 22:15 507 查看
IDEA+SPARK 本地伪分布式开发日志调测:
要在windows本地IDEA环境中伪分布式开发spark程序,观察spark框架运行的日志。
开发代码:
object SparkShell {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ALL)
val conf = new SparkConf() //创建SparkConf对象
conf.setAppName("Wow,My First Spark App!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local-cluster[1, 1, 1024]")
conf.setSparkHome(System.getenv("SPARK_HOME"))
val sc = new SparkContext(conf) //创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
sc.parallelize("100").count()
sc.stop()

}
}

spark伪分布式的配置
class LocalSparkCluster(
numWorkers: Int,
coresPerWorker: Int,
memoryPerWorker: Int,
conf: SparkConf)
extends Logging {

运行报错:17/05/30 22:00:47 ERROR ExecutorRunner: Error running executor
java.lang.IllegalStateException: Cannot find any build directories.
at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)
at org.apache.spark.launcher.AbstractCommandBuilder.getScalaVersion(AbstractCommandBuilder.java:241)
at org.apache.spark.launcher.AbstractCommandBuilder.buildClassPath(AbstractCommandBuilder.java:195)
at org.apache.spark.launcher.AbstractCommandBuilder.buildJavaCommand(AbstractCommandBuilder.java:118)
at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:39)
at org.apache.spark.launcher.WorkerCommandBuilder.buildCommand(WorkerCommandBuilder.scala:47)
at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:63)
at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:51)
at org.apache.spark.deploy.worker.ExecutorRunner.org$apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:145)
at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)
查看源码:

String getScalaVersion() {
String scala = getenv("SPARK_SCALA_VERSION");
if (scala != null) {
return scala;
}
String sparkHome = getSparkHome();
File scala210 = new File(sparkHome, "launcher/target/scala-2.10");
File scala211 = new File(sparkHome, "launcher/target/scala-2.11");
checkState(!scala210.isDirectory() || !scala211.isDirectory(),
"Presence of build for both scala versions (2.10 and 2.11) detected.\n" +
"Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
if (scala210.isDirectory()) {
return "2.10";
} else {
checkState(scala211.isDirectory(), "Cannot find any build directories.");
return "2.11";
}
}

解决办法:
1,在winodos 系统环境中加上spark home目录
2,创建 spark home目录,在目录下创建launcher/target/scala-2.10
assembly\target\scala-2.10\jars

问题解决。就可以在idea的本地日志查看伪分布式集群的框架日志。

17/05/30 22:12:24 INFO Master: I have been elected leader! New state: ALIVE
17/05/30 22:12:24 DEBUG TransportServer: Shuffle server started on port: 55783
17/05/30 22:12:24 INFO Utils: Successfully started service 'sparkWorker1' on port 55783.
17/05/30 22:12:24 INFO Worker: Starting Spark worker 192.168.93.1:55783 with 1 cores, 1024.0 MB RAM
17/05/30 22:12:24 INFO Worker: Running Spark version 2.1.0
17/05/30 22:12:24 INFO Worker: Spark home: G:\IMFBigDataSpark2017\SparkApps\data\wordcount
17/05/30 22:12:24 DEBUG SecurityManager: Created SSL options for standalone: SSLOptions{enabled=false, keyStore=None, keyStorePassword=None, trustStore=None, trustStorePassword=None, protocol=None, enabledAlgorithms=Set()}
17/05/30 22:12:24 INFO Utils: Successfully started service 'WorkerUI' on port 55784.
17/05/30 22:12:24 INFO WorkerWebUI: Bound WorkerWebUI to 0.0.0.0, and started at http://192.168.93.1:55784 17/05/30 22:12:24 INFO Worker: Connecting to master 192.168.93.1:55761...
17/05/30 22:12:24 TRACE TransportClientFactory: DNS resolution for /192.168.93.1:55761 took 0 ms
17/05/30 22:12:24 DEBUG TransportClientFactory: Creating new connection to /192.168.93.1:55761
17/05/30 22:12:24 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://192.168.93.1:55761...

17/05/31 05:46:08 INFO Worker: Executor app-20170531054607-0000/0 finished with state EXITED message Command exited with code 1 exitStatus 1
private def fetchAndRunExecutor() {
try {
// Launch the process
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")

builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)

// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
case e: Exception =>
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
}
private[deploy] class ExecutorRunner(    val appId: String,    val execId: Int,    val appDesc: ApplicationDescription,    val cores: Int,    val memory: Int,    val worker: RpcEndpointRef,    val workerId: String,    val host: String,    val webUiPort: Int,    val publicAddress: String,    val sparkHome: File,    val executorDir: File,    val workerUrl: String,    conf: SparkConf,    val appLocalDirs: Seq[String],    @volatile var state: ExecutorState.Value)  extends Logging {

if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}错误: 找不到或无法加载主类 org.apache.spark.executor.CoarseGrainedExecutorBackend
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: