您的位置:首页 > Web前端 > HTML5

基于cdh5的spark测试

2020-07-13 06:04 375 查看

1.cdh集群环境
cdh版本 5.13.2
jdk 1.8
scala 2.10.6
zookeeper 3.4.5
hadoop 2.6.0
yarn 2.6.0
spark 1.6.0 、2.1.0
kafka 2.1.0
备注: 基于CDH进行Spark开发时,使用高版本的apache原生包即可;不需要使用CDH中的spark开发包,另外,其它生态项目也如此。在IDEA开发的时候用原生包,实际往生产环境部署时默认就使用CDH的包了。

2.pom.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.xxxx.xxx</groupId>
<artifactId>spark</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.compat.version>2.10</scala.compat.version>

<!--编译时的编码-->
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
<!--文件拷贝时的编码-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<scala.version>2.10.6</scala.version>
<spark.version>1.6.0</spark.version>
</properties>

<!--scala插件的存储仓库-->
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>provided</scope>
</dependency>

<!--provided表示maven打包时排除该jar包,如果集群上存在的话-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<!-- 指定源码包和测试包的位置 -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!--scala插件,让maven能够编译、测试、运行scala项目-->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>

<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>

<!--maven打包的插件-->
<plugin>
<!--此插件可以将maven项目import到eclipse中-->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<!--maven打包时会将所有依赖包构建-->
<!--<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>-->
</plugins>
</build>

<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>
  1. 执行模式
  2. 本地测试
    需要修改hdfs上文件输入、输出路径的权限:hadoop fs -chmod -R 777 /xxx
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object WordCount {

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[1]")
val sc: SparkContext = new SparkContext(conf)
sc.setCheckpointDir("hdfs://192.168.226.88:8020/test/checkPoint")
//入参:文件读取路径 文件输出路径
//F:\\ideaProjects\\spark_kafka\\src\\main\\resources\\words
//hdfs://192.168.226.88:8020/test/input/wordcountInput.txt
val inputFile: String = args(0)
val outputFile: String = args(1)
val lineRDD: RDD[String] = sc.textFile(inputFile)
val wordsRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val tupsRDD: RDD[(String, Int)] = wordsRDD.map(word => (word,1))
val wordCountRDD: RDD[(String, Int)] = tupsRDD.reduceByKey((tupValue1, tupValue2) => tupValue1 + tupValue2)
if (wordCountRDD.isCheckpointed == false) {
wordCountRDD.checkpoint()
}
//println(wordCountRDD.collect().toBuffer)
val charRDD: RDD[ArrayBuffer[(Char, Int)]] = wordCountRDD.map(wordCountTup => {
//val listBuffer =  new ListBuffer[Tuple2[Char,Int]]
val arrayBuffer = new ArrayBuffer[Tuple2[Char,Int]]()
val charArr: Array[Char] = wordCountTup._1.toCharArray
for (char <- charArr) {
val tuple: (Char, Int) = (char, wordCountTup._2)
arrayBuffer += tuple
}
arrayBuffer
})
val charCountRDD: RDD[(Char, Int)] = charRDD.flatMap(arrBuf => arrBuf).reduceByKey(_+_)
charCountRDD.saveAsTextFile(outputFile)
//println(charCountRDD.collect().toBuffer)
}

}
  1. 集群提交,集群运行(spark on yarn模式)
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object WordCount {

def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName)
val sc: SparkContext = new SparkContext(conf)
sc.setCheckpointDir("hdfs://192.168.226.88:8020/test/checkPoint")
//入参:文件读取路径 文件输出路径
//F:\\ideaProjects\\spark_kafka\\src\\main\\resources\\words
//hdfs://192.168.226.88:8020/test/input/wordcountInput.txt
val inputFile: String = args(0)
val outputFile: String = args(1)
val lineRDD: RDD[String] = sc.textFile(inputFile)
val wordsRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val tupsRDD: RDD[(String, Int)] = wordsRDD.map(word => (word,1))
val wordCountRDD: RDD[(String, Int)] = tupsRDD.reduceByKey((tupValue1, tupValue2) => tupValue1 + tupValue2)
if (wordCountRDD.isCheckpointed == false) {
wordCountRDD.checkpoint()
}
//println(wordCountRDD.collect().toBuffer)
val charRDD: RDD[ArrayBuffer[(Char, Int)]] = wordCountRDD.map(wordCountTup => {
//val listBuffer =  new ListBuffer[Tuple2[Char,Int]]
val arrayBuffer = new ArrayBuffer[Tuple2[Char,Int]]()
val charArr: Array[Char] = wordCountTup._1.toCharArray
for (char <- charArr) {
val tuple: (Char, Int) = (char, wordCountTup._2)
arrayBuffer += tuple
}
arrayBuffer
})
val charCountRDD: RDD[(Char, Int)] = charRDD.flatMap(arrBuf => arrBuf).reduceByKey(_+_)
charCountRDD.saveAsTextFile(outputFile)
//println(charCountRDD.collect().toBuffer)
}
}
#!/bin/sh
BIN_DIR=$(cd `dirname $0`; pwd)
#BIN_DIR="$(cd $(dirname $BASH_SOURCE) && pwd)"
LOG_DIR=${BIN_DIR}/../logs
LOG_TIME=`date +%Y-%m-%d`
#main函数传入参数
inputFile='hdfs://192.168.226.88:8020/test/input/wordcountInput.txt'
outputFile='hdfs://192.168.226.88:8020/test/output'
spark-submit --class WordCount \
--master yarn \
--deploy-mode client \
--queue default \
--executor-memory 512m \
--num-executors 1 \
--jars /opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo.jar \
/home/spark/jars/spark_kafka-1.0-SNAPSHOT.jar $inputFile $outputFile > ${LOG_DIR}/wordcount_${LOG_TIME}.log 2>&1

6.错误记录
6.1 依赖libgplcompression.so
cp /opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/native/libgplcompression.so /usr/local/jdk1.8.0_152/jre/lib/amd64
参考:https://blog.csdn.net/u011750989/article/details/49306439

6.2 添加hadoop-lzo.jar
spark-submit脚本添加–jars /opt/cloudera/parcels/HADOOP_LZO/lib/hadoop/lib/hadoop-lzo.jar
参考:https://www.cnblogs.com/francisYoung/p/6073842.html

6.3 内存配置
修改节点内存和单个container内存,hadoop02,hadoop03物理内存3g,可分配给节点内存2g
yarn.scheduler.minimum-allocation-mb 1g 每个节点单个container可申请最小内存
yarn.scheduler.maximum-allocation-mb 2g 每个节点单个container可申请最大内存
yarn.nodemanager.resource.memory-mb 2g 每个节点可用的最大内存
参考:
https://zhuanlan.zhihu.com/p/69703968
http://blog.chinaunix.net/uid-28311809-id-4383551.html
参考文献:
https://docs.cloudera.com/documentation/enterprise/5-13-x/topics/spark.html

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: