CDH5.8手动安装spark2.1的运行错误整合 spark访问Hbase 数据导入mysql
2018-01-19 17:06
597 查看
在CDH5.8上面安装的时候spark1.6 苦于编程打包时的问题(spark1.6使用的是scala2.10 idea上使用2.10进行编译的时候 有时会报错 但是用2.11能编译通过 不过2.11编译的在spark上面运行时就会出现找不到包的问题 有人知道怎么回事请通知我 ) 就给CDH手动升级到了2.1版本CDH手动升级Spark2.1 这个博客里虽然是CDH5.11 但是CDH官方给的升级包是可以在5.7之上的升级的 实测可行
安装完之后我是保留了 1.6的 2.1的版本使用spark命令时都改成spark2 spark2-submit之类的
使用spark来读取hbase数据 进行操作
case class newsInfo(id:String,subject:String,descripe:String,source:String,sendTime:String)extends Serializable{
override def toString: String="%s\t%s\t%s\t%s".format(id,subject,descripe,source,sendTime)
//初始化sparksession
val ss = SparkSession.builder.
appName("RHBTSQL")
.getOrCreate()
val tablename = "tablename"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "master,node1,node2")
conf.set("hbase.master", "master")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val scan = new Scan()
scan.setCacheBlocks(false)
scan.addFamily(Bytes.toBytes("news"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("subject"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("contextSplit"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("descripe"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("source"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("sendTime"))
//设置值过滤
val filter = new SingleColumnValueFilter(Bytes.toBytes("news"), Bytes.toBytes("sendTime"), CompareOp.EQUAL, Bytes.toBytes("2017-12-13"))
scan.setFilter(filter)
conf.set(TableInputFormat.INPUT_TABLE, tablename)
//将scan类转化成string类型
val scan_str= ProtobufUtil.toScan(scan)
val scan_S=Base64.encodeBytes(scan_str.toByteArray())
conf.set(TableInputFormat.SCAN,scan_S)
//使用new hadoop api,读取数据,并转成rdd
val rdd = ss.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
运行读取hbase数据时会出现缺包 以及其他问题 这里是因为 spark2安装上之后并不是CDH整合的 而是区别于原来版本的spark的 所以 这里要
在/etc/spark/conf/目录下把老的spark的classpath.txt spark-env.sh 复制到/data/cloudera/parcels/SPARK2-2.0.0.cloudera2-1.cdh5.7.0.p0.118100/etc/spark2/conf.dist/下,/etc/spark2/conf是该目录的链接,然后修改spark-env.sh中SPARK_HOME=/data/cloudera/parcels/SPARK2-2.0.0.cloudera2-1.cdh5.7.0.p0.118100/lib/spark2
读取到的hbase数据RDD再进行数据整理 插入到mysql中 一开始试用的是
def myFun(iterator: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into blog(name, count) values (?, ?)"
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark",
"root", "123456")
iterator.foreach(data => {
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)
ps.setInt(2, data._2)
ps.executeUpdate()
}
)
} catch {
case e: Exception => println("Mysql Exception")
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("RDDToMysql").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
data.foreachPartition(myFun)
}
}这种方式 应该是可以的 但是我没有插入成功 因为操作被try catch了 我没注意日志 后来换了方法才知道数据中包含emoji表情 这种表情插入mysql的时候 会包一个固定的错误 原因是mysql的u8编码是3个字节 表情是4个字节 可以吧mysql的编码改成4个字节的utf8 也可以吧数据中的表情剔除掉 或者其他方法
val news =rdd.map(x=>{
val id = Bytes.toString(x._2.getRow)
var subject = Bytes.toString(x._2.getValue("news".getBytes,"subject".getBytes))
if(EmojiFilter.containsEmoji(subject)){
subject=EmojiFilter.filterEmoji(subject)
}
var descripe = Bytes.toString(x._2.getValue("news".getBytes,"descripe".getBytes))
if(EmojiFilter.containsEmoji(descripe)){
descripe=EmojiFilter.filterEmoji(descripe)
}
var source = Bytes.toString(x._2.getValue("news".getBytes,"source".getBytes))
var sendTime = Bytes.toString(x._2.getValue("news".getBytes,"sendTime".getBytes))
newsInfo(id,subject,descripe,source,sendTime)
})
//方法二、利用createDataFrame方法,内部利用反射获取字段及其类型
val dftemp = ss.createDataFrame(news)
val df = dftemp.createOrReplaceTempView("newsInfo")
val sqlcommand="select * from newsInfo"
val sel = ss.sql(sqlcommand)
val prop = new java.util.Properties
prop.setProperty("user","root")
prop.setProperty("password","123456")
// 调用DataFrameWriter将数据写入mysql
val dataResult = ss.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.10.167:3306/news","newsinfo2",prop) // 表可以不存在
ss.stop()注意 去除表情的方法没有贴出来 网上很多的 自己可以查看 rdd转换DF的方法我是使用的反射来做的 这里有一个链接 可以查看Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)
还有 最好是使用自己提前建好的表进行插入 在没有表的情况下让他自己进行建的话 但是字段会使用text 所以还是自己建吧
可能还会有mysql driver的问题 设置一下env.sh 或者config 吧外部包中加上mysql的那个jar包就行 网上也很多
安装完之后我是保留了 1.6的 2.1的版本使用spark命令时都改成spark2 spark2-submit之类的
使用spark来读取hbase数据 进行操作
case class newsInfo(id:String,subject:String,descripe:String,source:String,sendTime:String)extends Serializable{
override def toString: String="%s\t%s\t%s\t%s".format(id,subject,descripe,source,sendTime)
//初始化sparksession
val ss = SparkSession.builder.
appName("RHBTSQL")
.getOrCreate()
val tablename = "tablename"
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "master,node1,node2")
conf.set("hbase.master", "master")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val scan = new Scan()
scan.setCacheBlocks(false)
scan.addFamily(Bytes.toBytes("news"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("subject"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("contextSplit"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("descripe"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("source"))
scan.addColumn(Bytes.toBytes("news"), Bytes.toBytes("sendTime"))
//设置值过滤
val filter = new SingleColumnValueFilter(Bytes.toBytes("news"), Bytes.toBytes("sendTime"), CompareOp.EQUAL, Bytes.toBytes("2017-12-13"))
scan.setFilter(filter)
conf.set(TableInputFormat.INPUT_TABLE, tablename)
//将scan类转化成string类型
val scan_str= ProtobufUtil.toScan(scan)
val scan_S=Base64.encodeBytes(scan_str.toByteArray())
conf.set(TableInputFormat.SCAN,scan_S)
//使用new hadoop api,读取数据,并转成rdd
val rdd = ss.sparkContext.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
运行读取hbase数据时会出现缺包 以及其他问题 这里是因为 spark2安装上之后并不是CDH整合的 而是区别于原来版本的spark的 所以 这里要
在/etc/spark/conf/目录下把老的spark的classpath.txt spark-env.sh 复制到/data/cloudera/parcels/SPARK2-2.0.0.cloudera2-1.cdh5.7.0.p0.118100/etc/spark2/conf.dist/下,/etc/spark2/conf是该目录的链接,然后修改spark-env.sh中SPARK_HOME=/data/cloudera/parcels/SPARK2-2.0.0.cloudera2-1.cdh5.7.0.p0.118100/lib/spark2
读取到的hbase数据RDD再进行数据整理 插入到mysql中 一开始试用的是
def myFun(iterator: Iterator[(String, Int)]): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = "insert into blog(name, count) values (?, ?)"
try {
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark",
"root", "123456")
iterator.foreach(data => {
ps = conn.prepareStatement(sql)
ps.setString(1, data._1)
ps.setInt(2, data._2)
ps.executeUpdate()
}
)
} catch {
case e: Exception => println("Mysql Exception")
} finally {
if (ps != null) {
ps.close()
}
if (conn != null) {
conn.close()
}
}
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("RDDToMysql").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(("www", 10), ("iteblog", 20), ("com", 30)))
data.foreachPartition(myFun)
}
}这种方式 应该是可以的 但是我没有插入成功 因为操作被try catch了 我没注意日志 后来换了方法才知道数据中包含emoji表情 这种表情插入mysql的时候 会包一个固定的错误 原因是mysql的u8编码是3个字节 表情是4个字节 可以吧mysql的编码改成4个字节的utf8 也可以吧数据中的表情剔除掉 或者其他方法
val news =rdd.map(x=>{
val id = Bytes.toString(x._2.getRow)
var subject = Bytes.toString(x._2.getValue("news".getBytes,"subject".getBytes))
if(EmojiFilter.containsEmoji(subject)){
subject=EmojiFilter.filterEmoji(subject)
}
var descripe = Bytes.toString(x._2.getValue("news".getBytes,"descripe".getBytes))
if(EmojiFilter.containsEmoji(descripe)){
descripe=EmojiFilter.filterEmoji(descripe)
}
var source = Bytes.toString(x._2.getValue("news".getBytes,"source".getBytes))
var sendTime = Bytes.toString(x._2.getValue("news".getBytes,"sendTime".getBytes))
newsInfo(id,subject,descripe,source,sendTime)
})
//方法二、利用createDataFrame方法,内部利用反射获取字段及其类型
val dftemp = ss.createDataFrame(news)
val df = dftemp.createOrReplaceTempView("newsInfo")
val sqlcommand="select * from newsInfo"
val sel = ss.sql(sqlcommand)
val prop = new java.util.Properties
prop.setProperty("user","root")
prop.setProperty("password","123456")
// 调用DataFrameWriter将数据写入mysql
val dataResult = ss.sql(sqlcommand).write.mode(SaveMode.Append).jdbc("jdbc:mysql://10.10.10.167:3306/news","newsinfo2",prop) // 表可以不存在
ss.stop()注意 去除表情的方法没有贴出来 网上很多的 自己可以查看 rdd转换DF的方法我是使用的反射来做的 这里有一个链接 可以查看Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)
还有 最好是使用自己提前建好的表进行插入 在没有表的情况下让他自己进行建的话 但是字段会使用text 所以还是自己建吧
可能还会有mysql driver的问题 设置一下env.sh 或者config 吧外部包中加上mysql的那个jar包就行 网上也很多
相关文章推荐
- Sqoop安装配置与从mysql中导入数据到hbase
- Sqoop安装及MySql数据导入HBase
- Sqoop安装及MySql数据导入HBase
- Sqoop安装配置与从mysql中导入数据到hbase
- 大数据基础(二)hadoop, mave, hbase, hive, sqoop在ubuntu 14.04.04下的安装和sqoop与hdfs,hive,mysql导入导出
- 关于在mysql手动导入数据时遇到的错误
- 安装 XXX 时出现错误 无法访问windows安装服务。发生这种情况的可能是您在安全模式下运行windows
- solr4.0安装和简单导入mysql数据
- 在win2003安装mdac2.6版本以上后,但还是出现数据访问组件版本的错误
- 关于mysql版本升级后,原有的数据无法访问的解决方法,mysql错误代码:1558
- 整合SSH运行添加数据出现错误attempt to create saveOrUpdate event with null entity
- linux下Mysql 的安装、配置、数据导入导出
- 导入导出数据库数据,报错,链接服务器"(null)"的 OLE DB 访问接口 "Microsoft.Jet.OLEDB.4.0" 返回了消息 "未指定的错误"。
- linux下Mysql 的安装、配置、数据导入导出
- MySQL导入数据提示max_allowed_packet错误的解决方法
- 向SQL Server2005中导入数据出现“SQL 错误描述为: 链接服务器 '(null)' 的 OLE DB 访问接口'STREAM' 返回了对列 '[!BulkInsert].field' 无效的数据”解决方案
- 向SQL Server2005里导入数据是出现错误“SQL 错误描述为: 链接服务器 '(null)' 的 OLE DB 访问接口'STREAM' 返回了对列 '[!BulkInsert].field' 无效的数据”
- linux 下 gbk字符集mysql 安装 数据的导入
- mysql导入数据时提示 USING BTREE 相关错误解决办法
- mysql导入数据时 USING BTREE 错误解决办法