您的位置:首页 > 其它

windows本地sparkstreaming开发环境搭建及简单实例

2017-05-28 23:00 661 查看
windows本地spark开发环境搭建及简单实例
 

1:开发环境IDEA选择:

IntelliJ IDEA Community Edition 2017.1.1  (相比eclipse更方便,不用装那么多插件,且提供免费版,官网直接下载安装就可以)

2:环境配置:(开发语言scala)

由于公司网络原因,下载不方便,没有用mavn,直接拖了本地的jar包

(1) spark core开发jar包:

 


(2) spark streaming开发jar包:

 


(3) spark 连接 hive jar包:

 


(4) jdk及sdk包:

 




 

(5) 由于我的开发场景最终数据要写入hive,所以要引入mysql连接驱动包:

 


(6) 最终项目结构如图:

 




 

(7) 由于要连接hive,我们要把集群的hive-site.xml拿出来,直接拖到项目里就可以了:

这一步还是要有的,不然默认hive元数据管理metastore会去连接derby,hive-site.xml我们把metastore配置为mysql,这样我们把hive-site.xml拖进来就会默认采用hive-site的配置了(上图中的derby.log可以忽略,文件是我测试时自动产生的)

Hive-site.xml部分配置截图:


这里的mysql我是连的hadoop环境的,也就是我虚拟机环境的mysql,当然如果本地windows环境装有mysql,直接连localhost就可以了。

3:测试

Case数据流图

Flume → kafka → spark streaming → hive

Flume具体配置请参考:

Kafka配置请参考:

Spark代码如下(只是测试,没有对时间窗数据做处理,只是写进写出):

/**
  * Created by Tracy.Gao01 on 5/8/2017.
  */
import org.apache.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object Spkhive extends
Serializable {

  case class Person(name:String,col1:String)

  def main(args: Array[String]) {

    println("Start to run TestSpa")

    val conf = new
SparkConf().setMaster("local[3]")setAppName("Spkhive")

    val ssc = new
StreamingContext(conf, Seconds(30))

    val topicMap=Map("my_first_topic"
-> 1)

    //    zookeeper quorums server list
    val
zkQuorum = "192.168.244.134:2181";

    //   consumer group
    val
group = "group1"

    val
lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

    val line = lines.count()

    lines.print()

    line.print()

    lines.foreachRDD(rdd => {

        val categoryItemRow =  rdd.map(reducedItem => {

        val l_date = reducedItem.split(",")(0)

        val t_product = reducedItem.split(",")(1)

        Row(l_date, t_product)

      })

      val structType =
StructType(Array(

        StructField("l_date", StringType,
true),

        StructField("t_product", StringType,
true)

      ))

      val hiveContext =
new
HiveContext(rdd.context)

      val categoryItemDF = hiveContext.createDataFrame(categoryItemRow,structType)

      categoryItemDF.registerTempTable("categoryItemTable1")

      hiveContext.sql("use default")

      hiveContext.sql( """CREATE  TABLE if not exists `table2`(
                      `l_date`  string,
                      `t_product`  string)"""
)

      val reseltDataFram = hiveContext.sql("SELECT l_date,t_product FROM categoryItemTable1")

      reseltDataFram.show()

      hiveContext.sql("insert into table2 select l_date,t_product from categoryItemTable1")

      hiveContext.sql("insert into table table2 select t.* from (select 1, 10) t")

      val reseltDataFram1 = hiveContext.sql("SELECT l_date,t_product FROM table2")

      val count = hiveContext.sql("SELECT count(*) FROM table2")

      reseltDataFram1.show(1000)

      count.show()

      hiveContext.clearCache()

    })

    ssc.start()   //Start the computation
    ssc.awaitTermination()   //Wait for the computation to termination
  }

}

4:测试结果

控制台输出如下:

 


 

 


这样的话简单的开发环境就搭建成功了。

5:遇到问题解决

You have an error in your SQL syntax; check the manual that corresponds to your MySQLserver version for the right syntax to use near 'OPTION SQL_SELECT_LIMIT=DEFAULT'
at line 1

如果遇到这样的问题,适当切换以一下mysql驱动连接包的版本,一般是版本过低导致。

Specified key was too long; max key length is 767 bytes

解决方案:在mysql机器的上命令行中运行:alter database hive character set latin1;
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: