您的位置:首页 > 其它

Kafka broker配置介绍

2013-07-10 17:40 399 查看
转自:/article/3468125.html

这部分内容对了解系统和提高软件性能都有很大的帮助,kafka官网上也给出了比较详细的配置详单,但是我们还是直接从代码来看broker到底有哪些配置需要我们去了解的,配置都有英文注释,所以每一部分是干什么的就不翻译了,都能看懂:

001
/**
002
*
LicensedtotheApacheSoftwareFoundation(ASF)underoneormore
003
*
contributorlicenseagreements.SeetheNOTICEfiledistributedwith
004
*
thisworkforadditionalinformationregardingcopyrightownership.
005
*
TheASFlicensesthisfiletoYouundertheApacheLicense,Version2.0
006
*
(the"License");youmaynotusethisfileexceptincompliancewith
007
*
theLicense.YoumayobtainacopyoftheLicenseat
008
*
'target='_blank'>http://www.apache.org/licenses/LICENSE-2.0[/code]
009
*
010
*
011
*
Unlessrequiredbyapplicablelaworagreedtoinwriting,software
012
*
distributedundertheLicenseisdistributedonan"ASIS"BASIS,
013
*
WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.
014
*
SeetheLicenseforthespecificlanguagegoverningpermissionsand
015
*
limitationsundertheLicense.
016
*/
017
018
package
kafka.server
019
020
import
java.util.Properties
021
import
kafka.utils.{Utils,
ZKConfig}
022
import
kafka.message.Message
023
024
/**
025
*
Configurationsettingsforthekafkaserver
026
*/
027
class
KafkaConfig(props
:
Properties)
extends
ZKConfig(props)
{
028
/*
theporttolistenandacceptconnectionson*/
029
val
port
:
Int
=
Utils.getInt(props,
"port"
,
6667
)
030
031
/*
hostnameofbroker.Ifnotset,willpickupfromthevaluereturnedfromgetLocalHost.IftherearemultipleinterfacesgetLocalHostmaynotbewhatyouwant.*/
032
val
hostName
:
String
=
Utils.getString(props,
"hostname"
,
null
)
033
034
/*
thebrokeridforthisserver*/
035
val
brokerId
:
Int
=
Utils.getInt(props,
"brokerid"
)
036
037
/*
theSO_SNDBUFFbufferofthesocketseversockets*/
038
val
socketSendBuffer
:
Int
=
Utils.getInt(props,
"socket.send.buffer"
,
100
*
1024
)
039
040
/*
theSO_RCVBUFFbufferofthesocketseversockets*/
041
val
socketReceiveBuffer
:
Int
=
Utils.getInt(props,
"socket.receive.buffer"
,
100
*
1024
)
042
043
/*
themaximumnumberofbytesinasocketrequest*/
044
val
maxSocketRequestSize
:
Int
=
Utils.getIntInRange(props,
"max.socket.request.bytes"
,
100
*
1024
*
1024
,
(
1
,
Int.MaxValue))
045
046
/*
themaximumsizeofmessagethattheservercanreceive*/
047
val
maxMessageSize
=
Utils.getIntInRange(props,
"max.message.size"
,
1000000
,
(
0
,
Int.MaxValue))
048
049
/*
thenumberofworkerthreadsthattheserverusesforhandlingallclientrequests*/
050
val
numThreads
=
Utils.getIntInRange(props,
"num.threads"
,
Runtime.getRuntime().availableProcessors,(
1
,
Int.MaxValue))
051
052
/*
theintervalinwhichtomeasureperformancestatistics*/
053
val
monitoringPeriodSecs
=
Utils.getIntInRange(props,
"monitoring.period.secs"
,
600
,
(
1
,
Int.MaxValue))
054
055
/*
thedefaultnumberoflogpartitionspertopic*/
056
val
numPartitions
=
Utils.getIntInRange(props,
"num.partitions"
,
1
,
(
1
,
Int.MaxValue))
057
058
/*
thedirectoryinwhichthelogdataiskept*/
059
val
logDir
=
Utils.getString(props,
"log.dir"
)
060
061
/*
themaximumsizeofasinglelogfile*/
062
val
logFileSize
=
Utils.getIntInRange(props,
"log.file.size"
,
1
*
1024
*
1024
*
1024
,
(Message.MinHeaderSize,Int.MaxValue))
063
064
/*
themaximumsizeofasinglelogfileforsomespecifictopic*/
065
val
logFileSizeMap
=
Utils.getTopicFileSize(Utils.getString(props,
"topic.log.file.size"
,
""
))
066
067
/*
themaximumtimebeforeanewlogsegmentisrolledout*/
068
val
logRollHours
=
Utils.getIntInRange(props,
"log.roll.hours"
,
24
*
7
,
(
1
,
Int.MaxValue))
069
070
/*
thenumberofhoursbeforerollingoutanewlogsegmentforsomespecifictopic*/
071
val
logRollHoursMap
=
Utils.getTopicRollHours(Utils.getString(props,
"topic.log.roll.hours"
,
""
))
072
073
/*
thenumberofhourstokeepalogfilebeforedeletingit*/
074
val
logRetentionHours
=
Utils.getIntInRange(props,
"log.retention.hours"
,
24
*
7
,
(
1
,
Int.MaxValue))
075
076
/*
thenumberofhourstokeepalogfilebeforedeletingitforsomespecifictopic*/
077
val
logRetentionHoursMap
=
Utils.getTopicRetentionHours(Utils.getString(props,
"topic.log.retention.hours"
,
""
))
078
079
/*
themaximumsizeofthelogbeforedeletingit*/
080
val
logRetentionSize
=
Utils.getLong(props,
"log.retention.size"
,
-
1
)
081
082
/*
themaximumsizeofthelogforsomespecifictopicbeforedeletingit*/
083
val
logRetentionSizeMap
=
Utils.getTopicRetentionSize(Utils.getString(props,
"topic.log.retention.size"
,
""
))
084
085
/*
thefrequencyinminutesthatthelogcleanercheckswhetheranylogiseligiblefordeletion*/
086
val
logCleanupIntervalMinutes
=
Utils.getIntInRange(props,
"log.cleanup.interval.mins"
,
10
,
(
1
,
Int.MaxValue))
087
088
/*
enablezookeeperregistrationintheserver*/
089
val
enableZookeeper
=
Utils.getBoolean(props,
"enable.zookeeper"
,
true
)
090
091
/*
thenumberofmessagesaccumulatedonalogpartitionbeforemessagesareflushedtodisk*/
092
val
flushInterval
=
Utils.getIntInRange(props,
"log.flush.interval"
,
500
,
(
1
,
Int.MaxValue))
093
094
/*
themaximumtimeinmsthatamessageinselectedtopicsiskeptinmemorybeforeflushedtodisk,e.g.,topic1:3000,topic2:6000*/
095
val
flushIntervalMap
=
Utils.getTopicFlushIntervals(Utils.getString(props,
"topic.flush.intervals.ms"
,
""
))
096
097
/*
thefrequencyinmsthatthelogflushercheckswhetheranylogneedstobeflushedtodisk*/
098
val
flushSchedulerThreadRate
=
Utils.getInt(props,
"log.default.flush.scheduler.interval.ms"
,
3000
)
099
100
/*
themaximumtimeinmsthatamessageinanytopiciskeptinmemorybeforeflushedtodisk*/
101
val
defaultFlushIntervalMs
=
Utils.getInt(props,
"log.default.flush.interval.ms"
,
flushSchedulerThreadRate)
102
103
/*
thenumberofpartitionsforselectedtopics,e.g.,topic1:8,topic2:16*/
104
val
topicPartitionsMap
=
Utils.getTopicPartitions(Utils.getString(props,
"topic.partition.count.map"
,
""
))
105
106
/*
themaximumlengthoftopicname*/
107
val
maxTopicNameLength
=
Utils.getIntInRange(props,
"max.topic.name.length"
,
255
,
(
1
,
Int.MaxValue))
108
}
上面这段代码来自kafka.server包下的KafkaConfig类,之前我们就说过,broker就是kafka中的server,所以讲配置放在这个包中也不奇怪。这里我们顺着代码往下读,也顺便看看scala的语法。和java一样也要import相关的包,kafka将同一包内的两个类写在大括号中:

1
import
kafka.utils.{Utils,
ZKConfig}
然后我们看类的写法:

1
class
KafkaConfig(props
:
Properties)
extends
ZKConfig(props)
我们看到在加载kafkaConfig的时候会加载一个properties对象,同时也会加载有关zookeeper的properties,这个时候我们可以回忆一下,之前我们启动kafka
broker的命令:

1.启动zookeeperserver:bin/zookeeper-server-start.sh../config/zookeeper.properties&(用&是为了能退出命令行)

2.启动kafkaserver:bin/kafka-server-start.sh../config/server.properties&

所以你能明白,初始化kafkabroker的时候程序一定是去加载位于config文件夹下的properties,这个和java都一样没有区别。当然properties我们也可以通过程序来给出,这个我们后面再说,继续看我们的代码。既然找到了对应的properties文件,我们就结合代码和properties一起来看。

Kafkabroker的properties中,将配置分为以下六类:

lServerBasics:关于brokerid,hostname等配置

lSocketServerSettings:关于传输的配置,端口、buffer的区间等。

lLogBasics:配置log的位置和partition的数量。

lLogFlushPolicy:这部分是kafka配置中最重要的部分,决定了数据flush到disk的策略。

lLogRetentionPolicy:这部分主要配置日志处理时的策略。

lZookeeper:配置zookeeper的相关信息。

在文件properties中的配置均出现在kafkaConfig这个类中,我们再看看kafkaConfig中的代码:

1
/*
thebrokeridforthisserver*/
2
val
brokerId
:
Int
=
Utils.getInt(props,
"brokerid"
)
3
4
/*
theSO_SNDBUFFbufferofthesocketseversockets*/
5
val
socketSendBuffer
:
Int
=
Utils.getInt(props,
"socket.send.buffer"
,
100
*
1024
)
凡是参数中有三个的,最后一个是default,而参数只有两个的则要求你一定要配置,否则的话则报错。当然在这么多参数中肯定是有一些经验参数的,至于这些参数怎么配置我确实没有一个特别的推荐,需要在不断的测试中才能磨合出来。

当然你也可以将配置写在程序里,然后通过程序去启动broker,这样kafka的配置就可以像下面一样写:

1
Properties
props=newProperties();
2
props.setProperty(
"port"
,
"9093"
);
3
props.setProperty(
"log.dir"
,
"/home/kafka/data1"
);
我倒是觉得配置还是直接写在配置文件中比较好,如果需要修改也不会影响正在运行的服务,写在内存中,总是会有些不方便的地方。所以还是建议大家都写配置好了,后面讲到的producer和consumer都一样。

这里再提两个参数一个是brokerid,每个broker的id必须要区分;第二个参数是hostname,这个是broker和producer、consumer联系的关键,这里记住一定要改成你的地址和端口,否则永远连得都是localhost。

--------------------------------------------------------

下一篇将写producer和consumer的配置了,涉及到这部分就要开始编程了,写着写着又往源码里看进去了,下篇会先讲如何搭建开发环境,然后再写两个简单那的例子去熟悉配置。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: