您的位置:首页 > 运维架构

Hadoop volume选择策略

2018-03-30 17:08 369 查看
原代码version:2.6.1

什么是volume:卷 , 用途:在hadoop中用于dataNode副本储存,所有dataNode的储存都离不开volume的策略选择,策略的选择可以通过参数dfs.datanode.fsdataset.volume.choosing.policy 参数来设置,参数默认值:org.apache.hadoop.hdfs.server.datanode.fsdataset.AvailableSpaceVolumeChoosingPolicy
策略选择:目前有round-robin ,available space
上述两种策略均继承: org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy
方法chooseVolume便是选择的条件,即策略的定义
package org.apache.hadoop.hdfs.server.datanode.fsdataset;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* This interface specifies the policy for choosing volumes to store replicas.
*/
@InterfaceAudience.Private
public interface VolumeChoosingPolicy<V extends FsVolumeSpi> {

/**
* Choose a volume to place a replica,
* given a list of volumes and the replica size sought for storage.
*
* The implementations of this interface must be thread-safe.
*
* @param volumes - a list of available volumes.
* @param replicaSize - the size of the replica for which a volume is sought.
* @return the chosen volume.
* @throws IOException when disks are unavailable or are full.
*/
public V chooseVolume(List<V> volumes, long replicaSize) throws IOException;
}
策略1:round-robin,源码如下
package org.apache.hadoop.hdfs.server.datanode.fsdataset;

import java.io.IOException;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;

/**
* Choose volumes in round-robin order.
*/
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> {
public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);

private int curVolume = 0;

@Override
public synchronized V chooseVolume(final List<V> volumes, long blockSize)
throws IOException {

if(volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}

// since volumes could've been removed because of the failure
// make sure we are not out of bounds
if(curVolume >= volumes.size()) {
curVolume = 0;
}

int startVolume = curVolume;
long maxAvailable = 0;

while (true) {
final V volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable();
if (availableVolumeSize > blockSize) {
return volume;
}

if (availableVolumeSize > maxAvailable) {
maxAvailable = availableVolumeSize;
}

if (curVolume == startVolume) {
throw new DiskOutOfSpaceException("Out of space: "
+ "The volume with the most available space (=" + maxAvailable
+ " B) is less than the block size (=" + blockSize + " B).");
}
}
}
}
其中volumes就是我们通过hdfs-site.xml 里面配置的dfs.datanode.data.dir的目录,blockSize就是副本的大小(副本的大小与配置的副本的副本数有关,后话)
以上可以看出采用的是轮询的方式进行的,将所有的目录磁盘进行轮训,期间记录最大的卷大小,如果存在卷的大小大于blockSize的大小则直接放回该卷,否则轮询结束直接抛出空间不足的异常(异常信息有最大卷的大小,和副本的大小,易于排查问题)
策略2:available space

@Override
public synchronized V chooseVolume(List<V> volumes,
long replicaSize) throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}

AvailableSpaceVolumeList volumesWithSpaces =
new AvailableSpaceVolumeList(volumes);

if (volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()) {
// If they're actually not too far out of whack, fall back on pure round
// robin.
V volume = roundRobinPolicyBalanced.chooseVolume(volumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("All volumes are within the configured free space balance " +
"threshold. Selecting " + volume + " for write of block size " +
replicaSize);
}
return volume;
} else {
V volume = null;
// If none of the volumes with low free space have enough space for the
// replica, always try to choose a volume with a lot of free space.
long mostAvailableAmongLowVolumes = volumesWithSpaces
.getMostAvailableSpaceAmongVolumesWithLowAvailableSpace();

List<V> highAvailableVolumes = extractVolumesFromPairs(
volumesWithSpaces.getVolumesWithHighAvailableSpace());
List<V> lowAvailableVolumes = extractVolumesFromPairs(
volumesWithSpaces.getVolumesWithLowAvailableSpace());

float preferencePercentScaler =
(highAvailableVolumes.size() * balancedPreferencePercent) +
(lowAvailableVolumes.size() * (1 - balancedPreferencePercent));
float scaledPreferencePercent =
(highAvailableVolumes.size() * balancedPreferencePercent) /
preferencePercentScaler;
if (mostAvailableAmongLowVolumes < replicaSize ||
random.nextFloat() < scaledPreferencePercent) {
volume = roundRobinPolicyHighAvailable.chooseVolume(
highAvailableVolumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from high available space volumes for write of block size "
+ replicaSize);
}
} else {
volume = roundRobinPolicyLowAvailable.chooseVolume(
lowAvailableVolumes, replicaSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Volumes are imbalanced. Selecting " + volume +
" from low available space volumes for write of block size "
+ replicaSize);
}
}
return volume;
}
}volumesWithSpaces.areAllVolumesWithinFreeSpaceThreshold()会计算所有volume中的最大可用空间和最小可用空间的差,如果该差小于balancedSpaceThreshold (理解为小于balancedSpaceThreshold,则视为各个volume的情况一致,直接轮询返回即可),则直接使用  round-robin 策略进行选择volume(balancedSpaceThreshold:该值来源于配置参数
dfs.datanode.available-space-volume-choosing-policy.balanced-space-threshold
默认值是10G

否则分两步走
1,选择高可用volume -- highAvailableVolumes (空间大于balancedSpaceThreshold+所有volume中的最小volume的值)
(1)如果副本大小 大于lowAvailableVolumes 最大可用空间,直接在highAvailableVolumes中采用 round-robin轮询

(2)75%(balancedPreferencePercent)的概率来使用在highAvailableVolumes(这里的概率理解为权重)
(这个可配置,默认0.75dfs.datanode.available-space-volume-choosing-policy.balanced-space-preference-fraction)
    【题外话, hadoop源码中实现权重0.75

2,选择低可用volume --lowAvailableVolumes (空间小于等于balancedSpaceThreshold+所有volume中的最小volume的值)
不满足上诉(1)的情况下 1-balancedPreferencePercent 的概率实现在lowAvailableVolumes采用round-robin轮询
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hadoop 源码解读