您的位置:首页 > 移动开发

ScriptBasedMapping,CachedDNSToSwitchMapping,AbstractDNSToSwitchMapping,DNSToSwitchMapping类层次分析

2016-05-09 17:17 429 查看
DNSToSwitchMapping是接口。

定义了以下几个方法:

public interface DNSToSwitchMapping {
/**
* Resolves a list of DNS-names/IP-addresses and returns back a list of
* switch information (network paths). One-to-one correspondence must be
* maintained between the elements in the lists.
* Consider an element in the argument list - x.y.com. The switch information
* that is returned must be a network path of the form /foo/rack,
* where / is the root, and 'foo' is the switch where 'rack' is connected.
* Note the hostname/ip-address is not part of the returned path.
* The network topology of the cluster would determine the number of
* components in the network path.
* <p/>
*
* If a name cannot be resolved to a rack, the implementation
* should return {@link NetworkTopology#DEFAULT_RACK}. This
* is what the bundled implementations do, though it is not a formal requirement
*
* @param names the list of hosts to resolve (can be empty)
* @return list of resolved network paths.
* If <i>names</i> is empty, the returned list is also empty
*/
public List<String> resolve(List<String> names);

/**
* Reload all of the cached mappings.
*
* If there is a cache, this method will clear it, so that future accesses
* will get a chance to see the new data.
*/
public void reloadCachedMappings();

/**
* Reload cached mappings on specific nodes.
*
* If there is a cache on these nodes, this method will clear it, so that
* future accesses will see updated data.
*/
public void reloadCachedMappings(List<String> names);
}


AbstractDNSToSwitchMapping implements DNSToSwitchMapping

@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class AbstractDNSToSwitchMapping
implements DNSToSwitchMapping, Configurable {

private Configuration conf;

/**
* Create an unconfigured instance
*/
protected AbstractDNSToSwitchMapping() {
}

/**
* Create an instance, caching the configuration file.
* This constructor does not call {@link #setConf(Configuration)}; if
* a subclass extracts information in that method, it must call it explicitly.
* @param conf the configuration
*/
protected AbstractDNSToSwitchMapping(Configuration conf) {
this.conf = conf;
}

@Override
public Configuration getConf() {
return conf;
}

@Override
public void setConf(Configuration conf) {
this.conf = conf;
}

/**
* Predicate that indicates that the switch mapping is known to be
* single-switch. The base class returns false: it assumes all mappings are
* multi-rack. Subclasses may override this with methods that are more aware
* of their topologies.
*
* <p/>
*
* This method is used when parts of Hadoop need know whether to apply
* single rack vs multi-rack policies, such as during block placement.
* Such algorithms behave differently if they are on multi-switch systems.
* </p>
*
* @return true if the mapping thinks that it is on a single switch
*/
public boolean isSingleSwitch() {
return false;
}

/**
* Get a copy of the map (for diagnostics)
* @return a clone of the map or null for none known
*/
public Map<String, String> getSwitchMap() {
return null;
}

/**
* Generate a string listing the switch mapping implementation,
* the mapping for every known node and the number of nodes and
* unique switches known about -each entry to a separate line.
* @return a string that can be presented to the ops team or used in
* debug messages.
*/
public String dumpTopology() {
Map<String, String> rack = getSwitchMap();
StringBuilder builder = new StringBuilder();
builder.append("Mapping: ").append(toString()).append("\n");
if (rack != null) {
builder.append("Map:\n");
Set<String> switches = new HashSet<String>();
for (Map.Entry<String, String> entry : rack.entrySet()) {
builder.append("  ")
.append(entry.getKey())
.append(" -> ")
.append(entry.getValue())
.append("\n");
switches.add(entry.getValue());
}
builder.append("Nodes: ").append(rack.size()).append("\n");
builder.append("Switches: ").append(switches.size()).append("\n");
} else {
builder.append("No topology information");
}
return builder.toString();
}

protected boolean isSingleSwitchByScriptPolicy() {
return conf != null
&& conf.get(CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null;
}

/**
* Query for a {@link DNSToSwitchMapping} instance being on a single
* switch.
* <p/>
* This predicate simply assumes that all mappings not derived from
* this class are multi-switch.
* @param mapping the mapping to query
* @return true if the base class says it is single switch, or the mapping
* is not derived from this class.
*/
public static boolean isMappingSingleSwitch(DNSToSwitchMapping mapping) {
return mapping != null && mapping instanceof AbstractDNSToSwitchMapping
&& ((AbstractDNSToSwitchMapping) mapping).isSingleSwitch();
}

}


CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping, and overwrited reloadCachedMappings and reloadCachedMappings methods.

/**
* A cached implementation of DNSToSwitchMapping that takes an
* raw DNSToSwitchMapping and stores the resolved network location in
* a cache. The following calls to a resolved network location
* will get its location from the cache.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CachedDNSToSwitchMapping extends AbstractDNSToSwitchMapping {
private Map<String, String> cache = new ConcurrentHashMap<String, String>();

/**
* The uncached mapping
*/
protected final DNSToSwitchMapping rawMapping;

/**
* cache a raw DNS mapping
* @param rawMapping the raw mapping to cache
*/
public CachedDNSToSwitchMapping(DNSToSwitchMapping rawMapping) {
this.rawMapping = rawMapping;
}

/**
* @param names a list of hostnames to probe for being cached
* @return the hosts from 'names' that have not been cached previously
*/
private List<String> getUncachedHosts(List<String> names) {
// find out all names without cached resolved location
List<String> unCachedHosts = new ArrayList<String>(names.size());
for (String name : names) {
if (cache.get(name) == null) {
unCachedHosts.add(name);
}
}
return unCachedHosts;
}

/**
* Caches the resolved host:rack mappings. The two list
* parameters must be of equal size.
*
* @param uncachedHosts a list of hosts that were uncached
* @param resolvedHosts a list of resolved host entries where the element
* at index(i) is the resolved value for the entry in uncachedHosts[i]
*/
private void cacheResolvedHosts(List<String> uncachedHosts,
List<String> resolvedHosts) {
// Cache the result
if (resolvedHosts != null) {
for (int i=0; i<uncachedHosts.size(); i++) {
cache.put(uncachedHosts.get(i), resolvedHosts.get(i));
}
}
}

/**
* @param names a list of hostnames to look up (can be be empty)
* @return the cached resolution of the list of hostnames/addresses.
*  or null if any of the names are not currently in the cache
*/
private List<String> getCachedHosts(List<String> names) {
List<String> result = new ArrayList<String>(names.size());
// Construct the result
for (String name : names) {
String networkLocation = cache.get(name);
if (networkLocation != null) {
result.add(networkLocation);
} else {
return null;
}
}
return result;
}

@Override
public List<String> resolve(List<String> names) {
// normalize all input names to be in the form of IP addresses
names = NetUtils.normalizeHostNames(names);

List <String> result = new ArrayList<String>(names.size());
if (names.isEmpty()) {
return result;
}

List<String> uncachedHosts = getUncachedHosts(names);

// Resolve the uncached hosts
List<String> resolvedHosts = rawMapping.resolve(uncachedHosts);
//cache them
cacheResolvedHosts(uncachedHosts, resolvedHosts);
//now look up the entire list in the cache
return getCachedHosts(names);

}

/**
* Get the (host x switch) map.
* @return a copy of the cached map of hosts to rack
*/
@Override
public Map<String, String> getSwitchMap() {
Map<String, String > switchMap = new HashMap<String, String>(cache);
return switchMap;
}

@Override
public String toString() {
return "cached switch mapping relaying to " + rawMapping;
}

/**
* Delegate the switch topology query to the raw mapping, via
* {@link AbstractDNSToSwitchMapping#isMappingSingleSwitch(DNSToSwitchMapping)}
* @return true iff the raw mapper is considered single-switch.
*/
@Override
public boolean isSingleSwitch() {
return isMappingSingleSwitch(rawMapping);
}

@Override
public void reloadCachedMappings() {
cache.clear();
}

@Override
public void reloadCachedMappings(List<String> names) {
for (String name : names) {
cache.remove(name);
}
}
}


ScriptBasedMapping extends CachedDNSToSwitchMapping ,it has innerclass RawScriptBasedMapping

protected static class RawScriptBasedMapping
extends AbstractDNSToSwitchMapping {
private String scriptName;
private int maxArgs; //max hostnames per call of the script
private static final Log LOG =
LogFactory.getLog(ScriptBasedMapping.class);

/**
* Set the configuration and extract the configuration parameters of interest
* @param conf the new configuration
*/
@Override
public void setConf (Configuration conf) {
super.setConf(conf);
if (conf != null) {
scriptName = conf.get(SCRIPT_FILENAME_KEY);
maxArgs = conf.getInt(SCRIPT_ARG_COUNT_KEY, DEFAULT_ARG_COUNT);
} else {
scriptName = null;
maxArgs = 0;
}
}

/**
* Constructor. The mapping is not ready to use until
* {@link #setConf(Configuration)} has been called
*/
public RawScriptBasedMapping() {}

@Override
public List<String> resolve(List<String> names) {
List<String> m = new ArrayList<String>(names.size());

if (names.isEmpty()) {
return m;
}

if (scriptName == null) {
for (String name : names) {
m.add(NetworkTopology.DEFAULT_RACK);
}
return m;
}

String output = runResolveCommand(names, scriptName);
if (output != null) {
StringTokenizer allSwitchInfo = new StringTokenizer(output);
while (allSwitchInfo.hasMoreTokens()) {
String switchInfo = allSwitchInfo.nextToken();
m.add(switchInfo);
}

if (m.size() != names.size()) {
// invalid number of entries returned by the script
LOG.error("Script " + scriptName + " returned "
+ Integer.toString(m.size()) + " values when "
+ Integer.toString(names.size()) + " were expected.");
return null;
}
} else {
// an error occurred. return null to signify this.
// (exn was already logged in runResolveCommand)
return null;
}

return m;
}

/**
* Build and execute the resolution command. The command is
* executed in the directory specified by the system property
* "user.dir" if set; otherwise the current working directory is used
* @param args a list of arguments
* @return null if the number of arguments is out of range,
* or the output of the command.
*/
protected String runResolveCommand(List<String> args,
String commandScriptName) {
int loopCount = 0;
if (args.size() == 0) {
return null;
}
StringBuilder allOutput = new StringBuilder();
int numProcessed = 0;
if (maxArgs < MIN_ALLOWABLE_ARGS) {
LOG.warn("Invalid value " + Integer.toString(maxArgs)
+ " for " + SCRIPT_ARG_COUNT_KEY + "; must be >= "
+ Integer.toString(MIN_ALLOWABLE_ARGS));
return null;
}

while (numProcessed != args.size()) {
int start = maxArgs * loopCount;
List<String> cmdList = new ArrayList<String>();
cmdList.add(commandScriptName);
for (numProcessed = start; numProcessed < (start + maxArgs) &&
numProcessed < args.size(); numProcessed++) {
cmdList.add(args.get(numProcessed));
}
File dir = null;
String userDir;
if ((userDir = System.getProperty("user.dir")) != null) {
dir = new File(userDir);
}
ShellCommandExecutor s = new ShellCommandExecutor(
cmdList.toArray(new String[cmdList.size()]), dir);
try {
s.execute();
allOutput.append(s.getOutput()).append(" ");
} catch (Exception e) {
LOG.warn("Exception running " + s, e);
return null;
}
loopCount++;
}
return allOutput.toString();
}

/**
* Declare that the mapper is single-switched if a script was not named
* in the configuration.
* @return true iff there is no script
*/
@Override
public boolean isSingleSwitch() {
return scriptName == null;
}

@Override
public String toString() {
return scriptName != null ? ("script " + scriptName) : NO_SCRIPT;
}

@Override
public void reloadCachedMappings() {
// Nothing to do here, since RawScriptBasedMapping has no cache, and
// does not inherit from CachedDNSToSwitchMapping
}

@Override
public void reloadCachedMappings(List<String> names) {
// Nothing to do here, since RawScriptBasedMapping has no cache, and
// does not inherit from CachedDNSToSwitchMapping
}
}


runResolveCommand方法是调用script file来实现解析。

/**
* Build and execute the resolution command. The command is
* executed in the directory specified by the system property
* "user.dir" if set; otherwise the current working directory is used
* @param args a list of arguments
* @return null if the number of arguments is out of range,
* or the output of the command.
*/
protected String runResolveCommand(List<String> args,
String commandScriptName) {
int loopCount = 0;
if (args.size() == 0) {
return null;
}
StringBuilder allOutput = new StringBuilder();
int numProcessed = 0;
if (maxArgs < MIN_ALLOWABLE_ARGS) {
LOG.warn("Invalid value " + Integer.toString(maxArgs)
+ " for " + SCRIPT_ARG_COUNT_KEY + "; must be >= "
+ Integer.toString(MIN_ALLOWABLE_ARGS));
return null;
}

while (numProcessed != args.size()) {
int start = maxArgs * loopCount;
List<String> cmdList = new ArrayList<String>();
cmdList.add(commandScriptName);
for (numProcessed = start; numProcessed < (start + maxArgs) &&
numProcessed < args.size(); numProcessed++) {
cmdList.add(args.get(numProcessed));
}
File dir = null;
String userDir;
if ((userDir = System.getProperty("user.dir")) != null) {
dir = new File(userDir);
}
ShellCommandExecutor s = new ShellCommandExecutor(
cmdList.toArray(new String[cmdList.size()]), dir);
try {
s.execute();
allOutput.append(s.getOutput()).append(" ");
} catch (Exception e) {
LOG.warn("Exception running " + s, e);
return null;
}
loopCount++;
}
return allOutput.toString();
}


ShellCommandExecutor实现如下:

public ShellCommandExecutor(String[] execString, File dir) {
this(execString, dir, null);
}

public ShellCommandExecutor(String[] execString, File dir,
Map<String, String> env) {
this(execString, dir, env , 0L);
}

/**
* Create a new instance of the ShellCommandExecutor to execute a command.
*
* @param execString The command to execute with arguments
* @param dir If not-null, specifies the directory which should be set
*            as the current working directory for the command.
*            If null, the current working directory is not modified.
* @param env If not-null, environment of the command will include the
*            key-value pairs specified in the map. If null, the current
*            environment is not modified.
* @param timeout Specifies the time in milliseconds, after which the
*                command will be killed and the status marked as timedout.
*                If 0, the command will not be timed out.
*/
public ShellCommandExecutor(String[] execString, File dir,
Map<String, String> env, long timeout) {
command = execString.clone();
if (dir != null) {
setWorkingDirectory(dir);
}
if (env != null) {
setEnvironment(env);
}
timeOutInterval = timeout;
}


/** Execute the shell command. */
public void execute() throws IOException {
this.run();
}


/** check to see if a command needs to be executed and execute if needed */
protected void run() throws IOException {
if (lastTime + interval > Time.now())
return;
exitCode = 0; // reset for next run
runCommand();
}


runCommand的源代码如下:

/** Run a command */
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
Timer timeOutTimer = null;
ShellTimeoutTimerTask timeoutTimerTask = null;
timedOut = new AtomicBoolean(false);
completed = new AtomicBoolean(false);

if (environment != null) {
builder.environment().putAll(this.environment);
}
if (dir != null) {
builder.directory(this.dir);
}

builder.redirectErrorStream(redirectErrorStream);

if (Shell.WINDOWS) {
synchronized (WindowsProcessLaunchLock) {
// To workaround the race condition issue with child processes
// inheriting unintended handles during process launch that can
// lead to hangs on reading output and error streams, we
// serialize process creation. More info available at:
// http://support.microsoft.com/kb/315939 process = builder.start();
}
} else {
process = builder.start();
}

if (timeOutInterval > 0) {
timeOutTimer = new Timer("Shell command timeout");
timeoutTimerTask = new ShellTimeoutTimerTask(
this);
//One time scheduling.
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
}
final BufferedReader errReader =
new BufferedReader(new InputStreamReader(process
.getErrorStream()));
BufferedReader inReader =
new BufferedReader(new InputStreamReader(process
.getInputStream()));
final StringBuffer errMsg = new StringBuffer();

// read error and input streams as this would free up the buffers
// free the error stream buffer
Thread errThread = new Thread() {
@Override
public void run() {
try {
String line = errReader.readLine();
while((line != null) && !isInterrupted()) {
errMsg.append(line);
errMsg.append(System.getProperty("line.separator"));
line = errReader.readLine();
}
} catch(IOException ioe) {
LOG.warn("Error reading the error stream", ioe);
}
}
};
try {
errThread.start();
} catch (IllegalStateException ise) { }
try {
parseExecResult(inReader); // parse the output
// clear the input stream buffer
String line = inReader.readLine();
while(line != null) {
line = inReader.readLine();
}
// wait for the process to finish and check the exit code
exitCode  = process.waitFor();
// make sure that the error thread exits
joinThread(errThread);
completed.set(true);
//the timeout thread handling
//taken care in finally block
if (exitCode != 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
throw new IOException(ie.toString());
} finally {
if (timeOutTimer != null) {
timeOutTimer.cancel();
}
// close the input stream
try {
// JDK 7 tries to automatically drain the input streams for us
// when the process exits, but since close is not synchronized,
// it creates a race if we close the stream first and the same
// fd is recycled.  the stream draining thread will attempt to
// drain that fd!!  it may block, OOM, or cause bizarre behavior
// see: https://bugs.openjdk.java.net/browse/JDK-8024521 //      issue is fixed in build 7u60
InputStream stdout = process.getInputStream();
synchronized (stdout) {
inReader.close();
}
} catch (IOException ioe) {
LOG.warn("Error while closing the input stream", ioe);
}
if (!completed.get()) {
errThread.interrupt();
joinThread(errThread);
}
try {
InputStream stderr = process.getErrorStream();
synchronized (stderr) {
errReader.close();
}
} catch (IOException ioe) {
LOG.warn("Error while closing the error stream", ioe);
}
process.destroy();
lastTime = Time.now();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: