您的位置:首页 > 其它

kafka源码之日志管理-LogManager

2016-07-27 10:20 309 查看

LogManager

说明,LogManager实例的生成依赖于KafkaScheduler的后台调度管理组件,这个组件用于管理各partition的消息记录下index的信息,包含每个Partition的Log,segment等的管理。

实例创建与启动

在KafkaServer的startup函数中通过调用createLogManager函数来完成实例的函数,

private def createLogManager(zkClient: ZkClient, brokerState: BrokerState)
: LogManager = {
  val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
  val defaultLogConfig = LogConfig(defaultProps)

从zk中/config/topics找对应的topic的特殊配置的配置信息,如果没有,直接使用默认的配置.
  val configs = AdminUtils.fetchAllTopicConfigs(zkUtils).mapValues(
         LogConfig.fromProps(defaultProps, _))
 
这里需要的配置项:
配置项log.cleaner.threads,默认值1.用于配置清理过期日志的线程个数(用于日志合并).
配置项log.cleaner.dedupe.buffer.size,默认值128MB,用于配置清理过期数据的内存缓冲区,这个用于数据清理时,选择的压缩方式时,用于对重复数据的清理排序内存,用于日志合并.
配置项log.cleaner.io.buffer.load.factor,默认值0.9,用于配置清理内存缓冲区的数据装载因子,主要是用于hash,这个因子越小,对桶的重复可能越小,但内存占用越大,用于日志合并.
配置项log.cleaner.io.buffer.size,默认值512KB,用于清理过期数据的IO缓冲区大小,用于日志合并.
配置项message.max.bytes,默认值1000012字节,用于设置单条数据的最大大小.
配置项log.cleaner.io.max.bytes.per.second,用于控制过期数据清理时的IO速度限制,默认不限制速度,用于日志合并.
配置项log.cleaner.backoff.ms,用于定时检查日志是否需要清理的时间间隔(这个主要是在日志合并时使用),默认是15秒.
配置项log.cleaner.enable,是否启用日志的定时清理,默认是启用.
配置项num.recovery.threads.per.data.dir,用于在启动时,用于日志恢复的线程个数,默认是1.
配置项log.flush.scheduler.interval.ms,用于检查日志是否被flush到磁盘,默认不检查.
配置项log.flush.offset.checkpoint.interval.ms,用于定时对partition的offset进行保存的时间间隔,默认值60000ms.
配置项log.retention.check.interval.ms,定期检查保留日志的时间间隔,默认值5分钟.
  // read the log configurations from zookeeper
  val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
                   dedupeBufferSize = config.logCleanerDedupeBufferSize,
                   dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
                   ioBufferSize = config.logCleanerIoBufferSize,
                   maxMessageSize = config.messageMaxBytes,
                   maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
                   backOffMs = config.logCleanerBackoffMs,
                   enableCleaner = config.logCleanerEnable)
  new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
                 topicConfigs = configs,
                 defaultConfig = defaultLogConfig,
                 cleanerConfig = cleanerConfig,
                 ioThreads = config.numRecoveryThreadsPerDataDir,
                 flushCheckMs = config.logFlushSchedulerIntervalMs,
                 flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
                 retentionCheckMs = config.logCleanupIntervalMs,
                 scheduler = kafkaScheduler,
                 brokerState = brokerState,
                 time = time)
}
 

实例初始化时的默认运行流程:

val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
val 
LockFile = ".lock"
val 
InitialTaskDelayMs = 30*1000
private val logCreationOrDeletionLock = new Object
private val logs = new Pool[TopicAndPartition, Log]()
检查日志目录是否被创建,如果没有创建目录,同时检查目录是否有读写的权限.
createAndValidateLogDirs(logDirs)
生成每个目录的.lock文件,并通过这个文件锁定这个目录.
private val dirLocks = lockLogDirs(logDirs)
根据每个目录下的recovery-point-offset-checkpoint文件,生成出checkpoints的集合.这个用于定期更新每个partition的offset记录.
private val recoveryPointCheckpoints = logDirs.map(
  dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))
).toMap
 
根据每一个目录,生成一个线程池,线程池的大小是num.recovery.threads.per.data.dir配置的值,
加载每个目录下的offset-checkpoint的文件内容.
读取每个目录下的topic-partitionid的目录,并根据zk中针对此topic的配置文件(或者默认的配置文件),通过offset-checkpoint中记录的此partition对应的offset,生成Log实例.并通过线程池来执行Log实例的加载,也就是日志的恢复.
loadLogs()

如果启用了日志定时清理,生成LogCleaner实例,并根据配置的清理线程个数,生成对应个数的清理线程.
// public, so we can access this from kafka.admin.DeleteTopicTest
val cleaner: LogCleaner =
  if(cleanerConfig.enableCleaner)
    new LogCleaner(cleanerConfig, logDirs, logs, time = time)
  else
    null

 

加载partition的日志的segment的处理流程:

在LogManager实例生成时,会读取每个目录下的topic-partition的目录,并生成Log实例,Log实例初始化时,会读取segment的信息.
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] =
new ConcurrentSkipListMap[java.lang.Long, LogSegment]
 
这里开始去加载对应的Partition的segments的信息.
loadSegments()

/* Calculate the offset of the next message */
@volatile var nextOffsetMetadata = 
new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, 
    activeSegment.size.toInt)

val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)

info("Completed load of log %s with log end offset %d".format(name, logEndOffset))

val tags = Map("topic" -> topicAndPartition.topic, 
              "partition" -> topicAndPartition.partition.toString)
 
接下来看看Log.loadSegments的函数如何加载segment的流程:

private def loadSegments() {
首先检查,如果partition的目录还没有创建,先创建这个目录.
  // create the log directory if it doesn't exist
  dir.mkdirs()
  var swapFiles = Set[File]()
  
  for(file <- dir.listFiles if file.isFile) {
    if(!file.canRead)
      throw new IOException("Could not read file " + file)
这里读取这个partition目录下的所有的文件,同时这个文件是具有可读权限的.
    val filename = file.getName
如果文件是.deleted或者是.cleaned结尾的文件时,直接删除这个文件.
    if(filename.endsWith(DeletedFileSuffix) ||  
           filename.endsWith(CleanedFileSuffix)) {
      // if the file ends in .deleted or .cleaned, delete it
      file.delete()
    } else if(filename.endsWith(SwapFileSuffix)) {
这里的部分检查文件是否是.swap文件,如果是,把后缀的.swap去掉,然后检查文件名是.index的文件还是.log的文件,如果是.index的文件,直接删除这个文件这是一个对log的索引文件,
如果文件是.log文件,把这个文件添加到swapFiles的文件集合列表中,同时删除这个log文件对应的索引文件.
      // we crashed in the middle of a swap operation, to recover:
      // if a log, delete the .index file, complete the swap operation later
      // if an index just delete it, it will be rebuilt
      val baseName = new File(CoreUtils.replaceSuffix(file.getPath,
            SwapFileSuffix, ""))
      if(baseName.getPath.endsWith(IndexFileSuffix)) {
        file.delete()
      } else if(baseName.getPath.endsWith(LogFileSuffix)){
        // delete the index
        val index = new File(CoreUtils.replaceSuffix(baseName.getPath, 
            LogFileSuffix, IndexFileSuffix))
        index.delete()
        swapFiles += file
      }
    }
  }

下面开始对.index与.log文件进行迭代处理.
在执行对log的恢复操作时,需要使用到配置项message.max.bytes,默认值1000012的字节,用于配置每条消息的最大大小.
  // now do a second pass and load all the .log and .index files
  for(file <- dir.listFiles if file.isFile) {
    val filename = file.getName
    if(filename.endsWith(IndexFileSuffix)) {
如果是一个.index的文件,检查这个index对应的log文件是否存在,如果log文件不存在,直接删除这个index文件.
      // if it is an index file, make sure it has a corresponding .log file
      val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, 
            LogFileSuffix))
      if(!logFile.exists) {
        warn("Found an orphaned index file, %s, with no corresponding log 
             file.".format(file.getAbsolutePath))
        file.delete()
      }
    } else if(filename.endsWith(LogFileSuffix)) {
如果这个文件是一个.log文件,找到这个文件对应的index文件,也就是这个log文件的文件,
      // if its a log file, load the corresponding log segment
      val start = filename.substring(0, 
           filename.length - LogFileSuffix.length).toLong
      val indexFile = Log.indexFilename(dir, start)
根据logFile中第一条offset的值生成LogSegment的实例.
生成LogSegment的实例时,需要用到两个配置项:
1,配置项log.index.interval.bytes,默认值4096,当向log文件中写入的消息的大小达到这个配置的字节大小后,会向index文件中写入一个offset对应log的位置的记录.
2,配置项log.index.size.max.bytes,默认值10MB,这个用于控制index文件的最大大小.这个值取配置值中8的倍数中最接近这个配置值的值,原则上这个值小于或等于配置的值.
      val segment = new LogSegment(dir = dir, 
                                   startOffset = start,
                                   indexIntervalBytes = config.indexInterval, 
                                   maxIndexSize = config.maxIndexSize,
                                   rollJitterMs = config.randomSegmentJitter,
                                   time = time,
                                   fileAlreadyExists = true)

      if(indexFile.exists()) {
如果这个log文件对应的index文件存在,执行这个segment的检查,如果检查失败,删除index文件,并执行segment的恢复操作.
        try {
在一个index file中,index file的大小必须是8的整数倍,这个文件中每次记录存储8个字节,前4个字节是log文件中对应的offset,后4个字节对应log文件的位置 .
如果这个index文件的entity的个数大于0,同时index的最后一个offset是一个小于log文件的开始位置的offset,这个文件的检查会出错.
            segment.index.sanityCheck()
        } catch {
检查index file文件失败,表示这个index文件有错误,删除这个index文件,执行对log文件的恢复操作.
          case e: java.lang.IllegalArgumentException =>
            warn("Found a corrupted index file, %s, deleting and rebuilding
                index...".format(indexFile.getAbsolutePath))

            indexFile.delete()
            segment.recover(config.maxMessageSize)
        }
      }else {
这种情况下,表示log文件没有找到对应的index文件,直接执行这个segment的恢复操作.
        error("Could not find index file corresponding to log file %s,
            rebuilding index...".format(segment.log.file.getAbsolutePath))

        segment.recover(config.maxMessageSize)
      }
      segments.put(start, segment)
    }
  }
  
这个部分,把上次正在执行恢复操作的swap文件(这部分只有log文件)进行恢复.
  // Finally, complete any interrupted swap operations. To be crash-safe,
  // log files that are replaced by the swap segment should be renamed to .deleted
  // before the swap file is restored as the new segment file.
  for (swapFile <- swapFiles) {
    val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath,
         SwapFileSuffix, ""))
    val fileName = logFile.getName
    val startOffset = fileName.substring(0, 
           fileName.length - LogFileSuffix.length).toLong
    val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, 
           LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
    val index =  new OffsetIndex(file = indexFile, baseOffset = startOffset, 
         maxIndexSize = config.maxIndexSize)
    val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
                                     index = index,
                                     baseOffset = startOffset,
                                     indexIntervalBytes = config.indexInterval,
                                     rollJitterMs = config.randomSegmentJitter,
                                     time = time)
    info("Found log file %s from interrupted swap operation, 
        repairing.".format(swapFile.getPath))
执行对logsegment的恢复操作,这个是对上次正在进行恢复的swap文件进行恢复,
    swapSegment.recover(config.maxMessageSize)
这里找到对应此恢复文件的offset的所有的老的segment的logsegment.
    val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.nextOffset)
从segments的集合中(对应此partition log)移出老的segments,并删除对应的文件.这里移出的是上面通过baseOffset与swapSegment中新的offset之间的原来的所有的segments.
    replaceSegments(swapSegment, oldSegments.toSeq, isRecoveredSwapFile = true)
  }

  if(logSegments.size == 0) {
如果当前的partition中没有segment时,创建一个新的LogSegment实例,这里需要注意的一个配置,
配置项log.preallocate,默认值false,用于配置是否在创建LogFile时提前预分配空间.
配置项log.segment.bytes,默认值1GB,用于配置LogSegment的大小.
最后一个参数传入的preallocate表示是否提前预分配磁盘空间.
    // no existing segments, create a new mutable segment beginning at offset 0
    segments.put(0L, new LogSegment(dir = dir,
                                   startOffset = 0,
                                   indexIntervalBytes = config.indexInterval, 
                                   maxIndexSize = config.maxIndexSize,
                                   rollJitterMs = config.randomSegmentJitter,
                                   time = time,
                                   fileAlreadyExists = false,
                                   initFileSize = this.initFileSize(),
                                   preallocate = config.preallocate))
  } else {
这里通过最后一个对partition的checkpoint的offset,读取所有比这个offset后的logSegment,并执行对这些segments的恢复操作.
    recoverLog()
    // reset the index size of the currently active log segment to allow more entries
    activeSegment.index.resize(config.maxIndexSize)
  }
}
 

LogSegment中对log的恢复处理流程:

对一个segment的恢复主要是index file错误或者index file不存在的情况下,或者说是上次正在执行恢复但进程被关闭的情况下,会执行恢复处理,通过LogSegment中的recover函数.
def recover(maxMessageSize: Int): Int = {
这里把index file文件进行截断,说白了就是清空,
  index.truncate()
重新生成index文件的大小,
  index.resize(index.maxIndexSize)
  var validBytes = 0
  var lastIndexEntry = 0
这里读取log文件的每一条记录,根据文件生成一个iterator,这个iterator每next一下,读取一条消息.
  val iter = log.iterator(maxMessageSize)
  try {
    while(iter.hasNext) {
      val entry = iter.next
      entry.message.ensureValid()
这里检查读取log中消息的大小是否达到了记录index的大小,如果已经达到,把当前读取到的消息的offset与对应log文件中的位置记录到index文件中.
      if(validBytes - lastIndexEntry > indexIntervalBytes) {
        val startOffset =
          entry.message.compressionCodec match {
            case NoCompressionCodec =>
              entry.offset
            case _ =>
              ByteBufferMessageSet.deepIterator(entry.message).next().offset
        }
        index.append(startOffset, validBytes)
        lastIndexEntry = validBytes
      }
      validBytes += MessageSet.entrySize(entry.message)
    }
  } catch {
    case e: InvalidMessageException => 
      logger.warn("Found invalid messages in log segment %s at byte offset %d: 
         %s.".format(log.file.getAbsolutePath, validBytes, e.getMessage))
  }
这里通过原来log文件的大小,减去log中消息被认证完成后的大小,得到这个log文件中需要被切断的大小,也就是说log文件中,后面的部分消息格式可能不对,对文件进行截断.
  val truncated = log.sizeInBytes - validBytes
  log.truncateTo(validBytes)
根据新的LOG文件的大小,对index file中超过log的offset的记录部分的记录进行清理.
  index.trimToValidSize()
  truncated
}
 

通过执行LogManager实例中的startup函数来进行启动:

这里根据后台调度器,启动对日志回滚,清理,checkpoint的后台处理线程.

待分析...

def startup() {
  /* Schedule the cleanup task to delete old logs */
  if(scheduler != null) {
通过log.retention.check.interval.ms配置的执行间隔,读取定时删除过期日志的线程,执行函数为cleanupLogs,
    info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
    scheduler.schedule("kafka-log-retention"
                       cleanupLogs, 
                       delay = InitialTaskDelayMs, 
                       period = retentionCheckMs, 
                       TimeUnit.MILLISECONDS)
通过log.flush.scheduler.interval.ms配置的执行间隔,定期对partition的segment的磁盘缓冲区进行flush操作,这个默认不启用,执行函数为flushDirtyLogs.
    info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
    scheduler.schedule("kafka-log-flusher"
                       flushDirtyLogs, 
                       delay = InitialTaskDelayMs, 
                       period = flushCheckMs, 
                       TimeUnit.MILLISECONDS)
通过log.flush.offset.checkpoint.interval.ms配置的时间间隔,定期对各partition中当前的offset进行checkpoint的操作,通过执行函数checkpointRecoveryPointOffsets.
    scheduler.schedule("kafka-recovery-point-checkpoint",
                       checkpointRecoveryPointOffsets,
                       delay = InitialTaskDelayMs,
                       period = flushCheckpointMs,
                       TimeUnit.MILLISECONDS)
  }
 
这里主要启动LogCleaner中的CleanerThread的线程,
  if(cleanerConfig.enableCleaner)
    cleaner.startup()
}
 

定期删除过期的日志:

这个通过后台的调度组件定期去执行LogManager中的cleanupLogs的函数,

def cleanupLogs() {
  debug("Beginning log cleanup...")
  var total = 0
  val startMs = time.milliseconds
这里找到所有的partition的log,同时log的配置的清理模式不是compact的模式时,
  for(log <- allLogs; if !log.config.compact) {
    debug("Garbage collecting '" + log.name + "'")
在cleanupExpiredSegments函数中,计算出所有的segment中(不包含最后一个)最后一次修改的时间加上过期的删除时间当于当前时间的segments的集合,并把对应的segment的log文件与index文件添加后缀为.deleted的文件,同时通过调度线程,在log.segment.delete.delay.ms配置的间隔后,删除这些对应的文件.默认是60秒.这个函数根据segment的保留时间进行删除,
     保留时间通过log.retention.{hour,minutes,ms}进行配置.
在cleanupSegmentsToMaintainSize函数中,通过计算log中所有的segment中,总共保留的数据的总大小进行删除,保留总大小通过log.retention.bytes配置.这里计算所有的segment中,从log的第一个segment开始,对每一个segment的大小进行相减,直到减至保留大小时的segment停止,删除超过保留大小的所有的segment.这个操作不包含对当前活动的segment进行删除.
    total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log)
  }
  debug("Log cleanup completed. " + total + " files deleted in " +
                (time.milliseconds - startMs) / 1000 + " seconds")
}
 

定期对log的磁盘缓冲区进行flush:

这个通过后台的调度组件定期去执行LogManager中的flushDirtyLogs的函数,

这个函数中迭代所有的partition的log,并执行flush的操作,这个操作中通过当前最后一个offset找到上一次进行checkpoint的offset与当前的offset中间的segment,并执行segment中log与index的flush操作.对应log文件执行文件管道的force函数,对于index文件,执行文件管道map的force函数.

private def flushDirtyLogs() = {
  debug("Checking for dirty logs to flush...")

  for ((topicAndPartition, log) <- logs) {
    try {
      val timeSinceLastFlush = time.milliseconds - log.lastFlushTime
      debug("Checking if flush is needed on " + topicAndPartition.topic 
           + " flush interval  " + log.config.flushMs +
            " last flushed " + log.lastFlushTime + " time since last flush: " 
           + timeSinceLastFlush)
      if(timeSinceLastFlush >= log.config.flushMs)
        log.flush
    } catch {
      case e: Throwable =>
        error("Error flushing topic " + topicAndPartition.topic, e)
    }
  }
}
 

定期对partition的offset进行checkpoint操作:

这个通过后台的调度组件定期去
   执行LogManager中的checkpointRecoveryPointOffsets的函数,
def checkpointRecoveryPointOffsets() {
  this.logDirs.foreach(checkpointLogsInDir)
}
这里对每个dir中存储的partition的最后一个offset进行checkpoint的操作.
在这个函数中,迭代每个dir中对应的partition的offset记录到对应目录下的checkpoint文件中.
第一行写入一个0,表示是checkpoint文件的版本.
第二行写入的是partition的个数,当前checkpoint时,这个dir已经存在数据的partition的个数.
后面对应第二个的值个数的条数的数据,每条数据写入topic partition offset的值.
private def checkpointLogsInDir(dir: File): Unit = {
  val recoveryPoints = this.logsByDir.get(dir.toString)
  if (recoveryPoints.isDefined) {
    this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(
        _.recoveryPoint))
  }
}
 

LogCleaner实例中,定期执行的日志压缩:

这个实例中,通过CleanerThread的线程进行处理:

配置项log.cleaner.io.max.bytes.per.second,用于控制这个线程操作的IO速度,默认不控制速度
配置项log.cleaner.dedupe.buffer.size,默认值128MB,用于配置清理过期数据的内存缓冲区,这个用于数据清理时,选择的压缩方式时,用于对重复数据的清理排序内存.
配置项log.cleaner.threads,默认值1.用于配置清理过期日志的线程个数.
配置项log.cleaner.backoff.ms,用于定时检查日志是否需要清理的时间间隔,默认是15秒.
 
 
private def cleanOrSleep() {
通过cleanerManager中找到log的清理配置是compact的所有的partition,并读取每个dir下partition对应的cleaner-offset-checkpoint文件中上次进行清理的offset.如果partition没有在文件中,offset取当前log中第一个segment的开始offset.得到满足条件的所有的partition的Log实例,并排序正在进行合并操作的Log.并取出未进行清理的bytes最大的一个Log(partition),同时这个未进行清理的bytes占总Log大小的占比大于log.cleaner.min.cleanable.ratio配置的值(默认值是0.5).如果这个函数返回的是一个非None的值时,表示找到一个要清理的log.
把这个Log同时添加到inProgress集合中,表示这个Log正在执行清理操作,下次线程再次执行时,这个Log会被排除掉.
  cleanerManager.grabFilthiestLog() match {
    case None =>
如果没有找到需要进行清理的log,通过log.cleaner.backoff.ms配置的时间,等待下一次的触发.
      // there are no cleanable logs, sleep a while
      backOffWaitLatch.await(config.backOffMs, TimeUnit.MILLISECONDS)
    case Some(cleanable) =>
首先先取出上一次进行清理的offset(如果是第一次,找到最小的segment的开始的offset)
      // there's a log, clean it
      var endOffset = cleanable.firstDirtyOffset
      try {
通过Cleaner实例执行clean操作.这个函数返回的offset是map中可存储的最大到log中的那一个segment的最后的一个offset的值.也就是清理到的结束的这个segment的最大的offset的值.
        endOffset = cleaner.clean(cleanable)
        recordStats(cleaner.id, cleanable.log.name, cleanable.firstDirtyOffset, 
             endOffset, cleaner.stats)
      } catch {
        case pe: LogCleaningAbortedException => // task can be aborted, let it go.
      } finally {
从inProgress集合中移出这个被Clear后的Log处理,同时更新这个cleaner-offset-checkpoint中对应此partition的offset.
        cleanerManager.doneCleaning(cleanable.topicPartition, 
             cleanable.log.dir.getParentFile, endOffset)
      }
  }
}
 
Cleaner中的clean函数,处理对过期日志的合并操作:
Cleaner实例生成时,需要如下几个配置项:
配置项log.cleaner.dedupe.buffer.size,默认值128MB,用于配置清理过期数据的内存缓冲区,这个用于数据清理时,选择的压缩方式时,用于对重复数据的清理排序内存.每一个Cleaner实例的buffer大小是这个配置大小/总的cleaner的线程个数.
配置项log.cleaner.io.buffer.load.factor,默认值0.9,用于配置清理内存缓冲区的数据装载因子,主要是用于hash(默认使用MD5hash算法),这个因子越小,对桶的重复可能越小,但内存占用越大,用于日志合并.
配置项log.cleaner.io.buffer.size,默认值512KB,用于清理过期数据的IO缓冲区大小,用于日志合并.每一个Cleaner实例的iobuffer大小是这个配置大小/总的cleaner的线程个数/2(输入与输出).
 
这个函数返回的offset就是处理到的segment的最后的一个offset.
private[log] def clean(cleanable: LogToClean): Long = {
  stats.clear()
  info("Beginning cleaning of log %s.".format(cleanable.log.name))
  val log = cleanable.log

  // build the offset map
  info("Building offset map for %s...".format(cleanable.log.name))
先得到当前最后一个segment的offset的开始位置,这个segment是当前活动的segment
  val upperBoundOffset = log.activeSegment.baseOffset
 
这里使用到的offsetMap是一个存储key对应的offset的一个Map实例,由SkimpyOffsetMap实现.
Hash算法是MD5算法,内存大小就是上面说到的dedupe的大小.
在这个函数中,根据startoffset与endoffset找到对应的segments,根据map的大小,计算出map可存储的消息条数,这个Offsetmap的key是消息的bytebuffer的hash码,value是对应此消息的key的offset.把segment中,可以通过map存储的segment加载到内存中,直接map不能存储更多的消息时,停止这个offsetMap的build操作,这时返回的endOffset就是能够加载到map里面的最后一个segment的最后一个offset的值.注意:这个map中并不存储key值,而是存储的key值的hash码.
  val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, 
       offsetMap) + 1
  stats.indexDone()
  
  这里通过0到上次计算清理的firstDirtyOffset得到的segments其实就是得到已经进行清理后的所有的segments,下面的这段语句就是得到最近一次执行清理的时间,减少保留的时间,得到这个segment应该执行清理操作的时间.这个值用于检查老的segment中,如果最后的修改时间小于或等于这个值的segment应该被排除掉,不会在进行压缩.
  val deleteHorizonMs = 
    log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
      case None => 0L
      case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
  }
      
  // group the segments and clean the groups
  info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name,
        new Date(deleteHorizonMs)))
这里读取到上面计算出来的endOffset部分的所有的segment,根据segmentsize,indexsize进行分组,这个分组是每一组的segmentsize不能超过segmentSize的配置大小,indexfile不能超过配置的最大indexsize的大小,同时条数不能超过int.maxvalue.
迭代计算出来需要进行清理的所有的segments通过调用cleanSegments函数进行清理操作.
  for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), 
            log.config.segmentSize, log.config.maxIndexSize))
    cleanSegments(log, group, offsetMap, deleteHorizonMs)
    
  // record buffer utilization
  stats.bufferUtilization = offsetMap.utilization
  
  stats.allDone()

  endOffset
}
 
执行对segment的清理操作:
private[log] def cleanSegments(log: Log,
                               segments: Seq[LogSegment], 
                               map: OffsetMap, 
                               deleteHorizonMs: Long) {
根据要执行合并操作的所有的segments中取出第一个segment通过这个文件或名称,生成一个log与index文件的.cleaned的文件,用于把所有的segments添加到这一个segment中.
  val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
  logFile.delete()
  val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
  indexFile.delete()
  val messages = new FileMessageSet(logFile, fileAlreadyExists = false,
        initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
  val index = new OffsetIndex(indexFile, segments.head.baseOffset, 
        segments.head.index.maxIndexSize)
  val cleaned = new LogSegment(messages, index, segments.head.baseOffset, 
        segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)

  try {
    // clean segments into the new destination segment
    for (old <- segments) {
      val retainDeletes = old.lastModified > deleteHorizonMs
      info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
          
.format(old.baseOffset, log.name, new Date(old.lastModified), 
          cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
这里迭代所有的segment进行合并操作,合并流程:
1,首先迭代这个segment中的所有的message,如果message的key在OffsetMap中能找到对应的offset,同时当前的message的offset不小于offsetMap中存储的offset.
2,检查这个segment的最后修改时间是否大于最大的保留时间,如果大于,同时这个消息的value是一个是一个有效的value
      cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes)
    }

    // trim excess index
    index.trimToValidSize()

    // flush new segment to disk before swap
    cleaned.flush()

    val modified = segments.last.lastModified
    cleaned.lastModified = modified

    // swap in new segment
    info("Swapping in cleaned segment %d for segment(s) %s in log %s."
        .format(cleaned.baseOffset, segments.map(_.baseOffset)
        .mkString(","), log.name))
最后,删除老的segments并删除对应的文件,把.cleared的segment更新为正常的segment,并添加到log的segments中.
    log.replaceSegments(cleaned, segments)
  } catch {
    case e: LogCleaningAbortedException =>
      cleaned.delete()
      throw e
  }
}
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息