您的位置:首页 > 其它

kafka安装及使用

2015-11-26 23:04 549 查看
一:kafka的安装
1.n5上面下载:wget http://apache.dataguru.cn/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
2.vi /usr/lib/kafka/config/server.properties
   broker.id=5
log.dirs=/usr/lib/kafka/kafka-logs
log.flush.interval.messages=10000 默认不用修改
log.flush.interval.ms=1000 默认不用修改
zookeeper.connect=master:2181,n3:2181,n4:2181

master 1 9092
       n3 2 9093
       n4 3 9094
二:应用

1.开启服务:master-1:nohup /usr/lib/kafka/bin/kafka-server-start.sh /usr/lib/kafka/config/server.properties
&  
master-2:nohup /usr/lib/kafka/bin/kafka-server-start.sh /usr/lib/kafka/config/server1.properties
&  
2.jps  两个kafka项目
3.创建Topic 
sh /usr/lib/kafka/bin/kafka-topics.sh --create  --zookeeper master:2181,n3:2181,n4:2181 --partitions 3 --topic test0107-1
--replication-factor 2
4.查看队列情况
sh /usr/lib/kafka/bin/kafka-topics.sh
--describe test0107-1 --zookeeper master:2181,n3:2181,n4:2181
5.查看Topic

 sh kafka-topics.sh --list --zookeeper  master:2181,n3:2181,n4:2181

6.进入zookeeper的客户端
/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/zookeeper/bin/zkCli.sh
-server master:2181,n3:2181,n4:2181
 ls /brokers/topics/test0107-1/partitions
 get /brokers/topics/test0107-1/partitions/2
7.创建发送者 发送消息.

sh kafka-console-producer.sh --broker-list localhost:9092 --topic test02 

8.创建消费者
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic test02 --from-beginning

生产者:bin/run-example org.apache.spark.examples.streaming.KafkaWordCountProducer localhost:9092 test02 3 5
消费者:bin/run-example org.apache.spark.examples.streaming.KafkaWordCount localhost:2181 test-consumer-group test 1

emace -nw  *编辑文件
进入zookeeper:./zkCli.sh -server hadoop2:2181
ls /
ls /brokers
ls /brokers/ids
查看消费者
ls /consumers/
消费数据
get  //../.1

不建议删除 有bug

第二次尝试3.kafka的安装

wget http://apache.dataguru.cn/kafka/0.8.1.1/kafka_2.9.2-0.8.1.1.tgz
1.master

vi /usr/lib/kafka/config/server.properties

broker.id=0

log.dirs=/usr/lib/kafka/kafka-logs

log.flush.interval.messages=10000 默认不用修改

log.flush.interval.ms=1000 默认不用修改

zookeeper.connect=master.dw.lewatek.com:2181,n4.dw.lewatek.com:2181,n5.dw.lewatek.com:2181

  

master  server 0 9091

n3        server 1 9092

n4        server 2 9093

/usr/lib/kafka/

master:nohup /usr/lib/kafka/bin/kafka-server-start.sh /usr/lib/kafka/config/server.properties &        

n4:          nohup /usr/lib/kafka/bin/kafka-server-start.sh /usr/lib/kafka/config/server.properties &        

n5:         nohup /usr/lib/kafka/bin/kafka-server-start.sh /usr/lib/kafka/config/server.properties &        

2.jps  两个kafka项目

3.创建Topic 

sh /usr/lib/kafka/bin/kafka-topics.sh --create  --zookeeper master.dw.lewatek.com:2181,n4.dw.lewatek.com:2181,n5.dw.lewatek.com:2181 --partitions 1 --topic newboom --replication-factor 2

            修改这个Topic

                  $ sh /usr/lib/kafka/bin/kafka-topics.sh   --zookeeper master.dw.lewatek.com:2181,n4.dw.lewatek.com:2181,n5.dw.lewatek.com:2181  --partitions 3 --topic newboom --alter

4.查看队列明细

sh /usr/lib/kafka/bin/kafka-topics.sh --describe newboom --zookeeper master.dw.lewatek.com:2181,n4.dw.lewatek.com:2181,n5.dw.lewatek.com:2181

5.查看Topic列表

sh /usr/lib/kafka/bin/kafka-topics.sh --list --zookeeper  master.dw.lewatek.com:2181,n4.dw.lewatek.com:2181,n5.dw.lewatek.com:2181

6.进入zookeeper的客户端

/opt/cloudera/parcels/CDH-5.2.0-1.cdh5.2.0.p0.36/lib/zookeeper/bin/zkCli.sh -server master.dw.lewatek.com:2181,n4.dw.lewatek.com:2181,n5.dw.lewatek.com:2181

 ls /brokers/topics/test0107-1/partitions

 get /brokers/topics/test0107-1/partitions/2

7.创建发送者,生产信息  producer:生产者

sh /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list master.dw.lewatek.com:9091,n4.dw.lewatek.com:9092,n5.dw.lewatek.com:9093 --topic newboom 

8.创建消费者,消费信息  consumer:消费者

sh /usr/lib/kafka/bin/kafka-console-consumer.sh --zookeeper master.dw.lewatek.com:2181,n4.dw.lewatek.com:2181,n5.dw.lewatek.com:2181 --topic  newboom --from-beginning

处理数据

package com.lewatek

import java.util.Properties

import kafka.producer._

import org.apache.spark.sql._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._

import org.apache.spark.streaming.kafka._

import org.apache.spark.streaming.kafka.KafkaUtils._

import org.apache.spark.SparkConf

import org.apache.spark.examples.streaming._

import scala.collection.mutable.ArrayBuffer

import scala.io.Source

import org.apache.spark.{SparkContext, SparkConf}

object KafkaWordCount {

  def main(args: Array[String]) {

    if (args.length < 4) {

      System.err.println("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")

      System.exit(1)

    }

    StreamingExamples.setStreamingLogLevels()

    //n4.dw.lewatek.com:2181 test-consumer-group boom 1

    val Array(zkQuorum, group, topics, numThreads) = args

    val sparkConf = new SparkConf().setAppName("KafkaWordCount")

    //Seconds(2):Spark Streaming运行时的batch窗口大小。在这个例子中就是将2秒钟的输入数据进行一次Spark Job处理。

    val ssc =  new StreamingContext(sparkConf, Seconds(2))

                  import org.apache.spark.sql.SQLContext

                  val sc = new SparkContext(sparkConf)

                  val sqlContext: SQLContext = new SQLContext(sc)

    ssc.checkpoint("checkpoint")

//    val topicpMap = topics.split("").map((_,numThreads.toInt)).toMap

    val topicpMap = topics.split(" ").map((_,numThreads.toInt)).toMap

    //线程的数量(input DSream),输入DStream(lines),这里是监控kafka发来的信息

//    if(rdd.!=(null)){

    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)

             lines.foreachRDD{

               rdd=>

//                   if(rdd.isEmpty()==false){

                   val kafkajson = sqlContext.jsonRDD(rdd).cache()

                   kafkajson.registerTempTable("login")

                   val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

                   val table= sqlContext.jsonRDD(rdd).cache()

                   table.printSchema()

                   table.registerTempTable("jsonTable")

                   sqlContext.cacheTable("jsonTable")

                   val jsonsql = sqlContext.sql("SELECT clientId,dev.chl,dev.mod,apps,timestamp FROM jsonTable ").flatMap { x: Row =>

                     val id: String = x(0).asInstanceOf[String]

                     val ch1: String = x(1).asInstanceOf[String]

                     val mod: String = x(2).asInstanceOf[String]

                     val zz = x(3).asInstanceOf[ArrayBuffer[Row]]

                     val ts: String = x(4).asInstanceOf[String]

                     zz.map { x: Row => id + "\t" + ch1 + "\t" +mod + "\t" + x(0) + "\t" + x(1) + "\t" + x(2) + "\t" + x(3) + "\t" + ts.substring(0,10)}

                   }

                   val wordCounts = jsonsql

                   wordCounts.foreach(println)

//                 }

             }

//    val words = lines.flatMap(_.split("\n"))

//    val wordCounts = words

//    wordCounts.print()

    //开启streams应用程序

    ssc.start()

    ssc.awaitTermination()

  }

}

// Produces some random words between 1 and 100.

object KafkaWordCountProducer {

  def main(args: Array[String]) {

    if (args.length < 4) {

      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +

        "<messagesPerSec> <wordsPerMessage>")

      System.exit(1)

    }

    //master.dw.lewatek.com:9091 boom 3 5

    val Array(brokers, topic, messagesPerSec, wordsPerMessage,filename) = args

    // Zookeper connection properties

    val props = new Properties()

    props.put("metadata.broker.list", brokers)

    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val config = new ProducerConfig(props)

    val producer = new Producer[String, String](config)

    // Send some messages

//                val ip = "10.1.0.141";

//                val msg ="this is a message! 测试";

//                var start_time = System.currentTimeMillis();

//                   for(1 <- 1 to 100)

//                    {

//                        val data = new KeyedMessage[String, String]("test_kafka", "IP",msg);

//                        producer.send(data);

//                        Thread.sleep(100)

//                    }

    val ip = "10.1.0.141";

    val msg ="this is a message! 测试";

    var start_time = System.currentTimeMillis();

//    for(i <- 1 to 100) {

//         val data = new KeyedMessage[String, String](topic, "10.1.0.141"+i,msg+i);

//        producer.send(data)

//        Thread.sleep(100)

//    }

    for (line <- Source.fromFile(filename).getLines){

        val data = new KeyedMessage[String, String](topic,line);

      producer.send(data)

      Thread.sleep(100)

    }

  }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: