您的位置:首页 > 其它

scala 下 kafka 实战简介

2018-01-22 14:13 253 查看
kafka应用实战:从一个topic接受数据,调用接口处理后发送到另一个topic
关于kafka,其实我也不知道怎么样才能介绍的具体明了,但是我就是想提供一个简单的使用示例,让我们对kafka有个一初步的了解。其实吧,我们知道,数据可以从某个角度分成两类,第一类是批量数据,也就是已经放好的数据,打个比方,就像池塘的水,不会流动,你想怎么计算就怎么计算,你可以统计一下水量,也可以将它们做别的用途,但水就那么多放在那里,你可以把它放在任何一个水库慢慢用。另一类就是流式数据,就是会实时变动的数据,也打个比方,就像河流的水,它不是静止不动的,它是一直在流动的,每时每刻水都在发生改变,也就是你无法像之前一样,把它放在任何水库慢慢用,你只能取出一部分用,当你用完,新的水又流进来了。

kafka就是其中的一条河流,当然不只kafka而已,就像世界上存在很多河流,kafka只是其中之一。kafka是根据topic 来存储数据的,每个topic里面根据不同的偏移量来定位数据。比如下面的代码就是scala语言编写的往topic中发送数据,我们称之为制造者。

package kafka
import java.util.Properties
import kafka.producer.ProducerConfig
import kafka.producer.Producer
import kafka.producer.KeyedMessage
object KafkaProducer {
def sendmessage(presend:String,topic:String,key:String): Unit = {
val brokers = "10.2.117.160:9092,10.2.117.162:9092"   //定义broker节点
val props = new Properties()
props.put("delete.topic.enable", "true")
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", classOf[HashPartitioner].getName)
props.put("queue.buffering.max.messages", "1000000")
props.put("queue.enqueue.timeout.ms", "20000000")
props.put("batch.num.messages", "1")
props.put("producer.type", "sync")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config) // key 和 value 都是 String类型
val sleepFlag = true
if (sleepFlag) Thread.sleep(1000)
val message = new KeyedMessage[String, String](topic, key,presend.toString)
producer.send(message)  //发送到指定的topic
}
}
以上代码定义了一个 sendmessage 函数,用来像指定的topic中发送key-value类信息。

当我们成功往kafka topic中发送了一系列信息的时候,意味着我们定义了一个河流是怎么产生数据的,随时发送,随时更新topic中的数据,但是要怎么取出来使用呢?那就是kafka消费者的定义,接下来看一个消费者代码。这个是主要程序,里面只是包含怎么使用消费者。
注意:同一个组的消费者不会重复消费同一个topic里面的数据。

package kafka
import java.util.Properties
import kafka.consumer.ConsumerConfig
import kafka.consumer.Consumer
import kafka.message.MessageAndMetadata
import GetPost.getScore //这里是一个接口,我们从一个topic取数据,取完后调用这个接口得到分数后,发送到另一个topic
import GetContent.getContent //解析json消息的处理函数
import KafkaProducer.sendmessage //处理后需要发送,所以引入上面定义的制造者代码
import sun.awt.SunHints.Key
object kafka_consumer extends App{
var groupid = "oyp_test24" //kafka有一个注意点,就是同一个组的消费者不会再取topic中已经消费过的数据
var consumerid = "oyp_consumer"
var topic = "ase-info" //取数据源的topic
val props:Properties = new Properties()
props.put("zookeeper.connect", "10.2.117.133:2181,10.2.117.160:2181,10.2.117.162:2181")
props.put("group.id", groupid)
props.put("client.id", "test")
props.put("consumer.id", consumerid)
props.put("auto.offset.reset", "smallest")
props.put("auto.commit.enable", "true")
props.put("auto.commit.interval.ms", "100")
val consumerConfig:ConsumerConfig = new ConsumerConfig(props)
val consumer = Consumer.create(consumerConfig) //创建一个消费者
val topicCountMap = Map(topic -> 1)
val consumerMap = consumer.createMessageStreams(topicCountMap)
val streams = consumerMap.get(topic).get //从topic中获取源数据
for (stream <- streams) {
val it = stream.iterator()
while (it.hasNext()) {
val messageAndMetadata = it.next()
// val message = s"Topic:${messageAndMetadata.topic}, PartitionID:${messageAndMetadata.partition}, " +
//   s"Offset:${messageAndMetadata.offset},Message Payload: ${new String(messageAndMetadata.message())}"
val message:String =  s"${new String(messageAndMetadata.message())}"
val key:String = s"${messageAndMetadata.partition}".toString
var presend :String = ""
try {
val content_text: String = getContent(message) //提取message中的json内容
if (content_text != "Invalid Map") {
val result = getScore(content_text) //调用接口处理,获得分数
presend = message.substring(0,message.length - 1) + "," + result.substring(1)
}
}
catch {
case _ => presend = message
}
sendmessage(presend,"oyp-test-No1",key) //将处理后的消息presend 重新发送到另一个topic
}
}
}
上述代码就是从topic 为 ase-info中获取数据,是一个json串,再调用getContent方法解析里面的内容后,调用getScore方法获得分数,得到presend,再发送到另一个topic "oyp-test-No1" 。
关于getContent 和 getScore代码如下:

package kafka
import scala.collection.mutable
import scala.util.parsing.json
import scala.util.parsing.json.JSONObject
object GetContent {
def getContent(json_text1:String):String = {
val json_text = getJson_clear(json_text1)
var result: String = ""
var obj = json.JSON.parseFull(json_text)
var text: String = ""
var title: String = ""
obj match {
case Some(map: Map[Any, Any]) => {
text = map.get("content").toString()
text = text.substring(5, text.length() - 1)
}
case _ => text = "Invalid Map"
}
return (text.trim.replaceAll("%","个百分点"))
}
def getJson_clear(message:String) :String= {
val start: Int = message.indexOf(""""content":""")
if (start >= 0) {
val end:
4000
Int = message.substring(start).indexOf(""""media":""") - 1 + start
if (end >= 0) {
return ("{" + message.substring(start, end) + "}")
}
else {return ("{"+message.substring(start))}
}
else
{return message}
}

}
这里的处理由于特殊的问题导致了一些特殊的处理,可以自己进行修改,反正功能是从json串获得 content的value值。

获得分数是调用一个python接口,getScore函数如下:

package kafka
import org.apache.http.client.methods.HttpPost
import org.apache.http.impl.client.DefaultHttpClient
import scala.util.parsing.json.JSONObject
import org.apache.http.entity.StringEntity
import java.nio.charset.Charset
object GetPost {
def getScore(text:String) :String= {
// create our object as a json string
val spock =  Map ("content" -> text.toString)
val spockAsJson = new JSONObject(spock).toString()
//println(text)
val url = "http://10.2.145.116:8788/snownlp"
val post = new HttpPost(url)
val client = new DefaultHttpClient
post.addHeader("Content-type","application/json; charset=utf-8")
post.setHeader("Accept", "application/json")
post.setEntity(new StringEntity(spockAsJson, Charset.forName("UTF-8")))
post.formatted(spockAsJson)
println(post.getURI())
val response = client.execute(post)
import org.apache.http.util.EntityUtils
val body = EntityUtils.toString(response.getEntity())
return (body.toString)
}
}
备注:在制造者中有一个 props.put("partitioner.class", classOf[HashPartitioner].getName),其中HashPartitioner代码如下

package kafka
import kafka.producer.Partitioner
import scala.math._
import kafka.utils.VerifiableProperties
class HashPartitioner extends Partitioner {
def this(verifiableProperties: VerifiableProperties) {
this
}
override def partition(key: Any, numPartitions: Int): Int = {
if (key.isInstanceOf[Int]) {
abs(key.toString().toInt) % numPartitions
}
key.hashCode() % numPartitions
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  scala kafka