您的位置:首页 > Web前端 > Node.js

HDFS Namenode Audit Design and Implemention

2017-04-01 15:48 344 查看

HDFS Namenode Audit Design and Implemention

Hadoop Namenode can audit any operations, including “rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs”.

Hadoop Namenode can audit any operations. Hadoop self contained audit plugin will audit any operation on any file and directory, and it can cause a large amount of

useless infomation and downgrade the efficiency of namenode.

This plugin can audit specified directory and command. Only operation on these directory(include subdirectory) and command will be audited.

For example, if you want to audit who read a hive table, you can set the table base as audit directory. This strategy can greatly reduce the size of audit log.

Terminology:
*   Audit directory:  The directory you want to be audited, the operation on the directory and subdirectory will be audited.
*  Audit Commands:  The commands you want to audit. Must in "rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs", separated by ','.


## Configuration

* ${HADOOP_HOME}/log4j.properties

Modify the following content.

hdfs.audit.logger=INFO,NullAppender

hdfs.audit.log.maxfilesize=256MB

hdfs.audit.log.maxbackupindex=20


to

hdfs.audit.logger=INFO,console
hdfs.audit.log.maxfilesize=256MB
hdfs.audit.log.maxbackupindex=20


${HADOOP_HOME}/hadoop-env.sh

Modify the following content.

export HADOOP_NAMENODE_OPTS="${JVM_OPTS} -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-WARN,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} -Xloggc:$HADOOP_LOG_DIR/nn_gc.log $HADOOP_NAMENODE_OPTS"


to

export HADOOP_NAMENODE_OPTS="${JVM_OPTS} -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-WARN,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,RFAAUDIT} -Xloggc:$HADOOP_LOG_DIR/nn_gc.log $HADOOP_NAMENODE_OPTS"


hdfs-site.xml

Add the following configuration options.

<property>
<name>dfs.namenode.audit.loggers</name>
<value>org.apache.hadoop.hdfs.namenode.NameNodeAuditLogger</value>
</property>
<property>
<name>dfs.namenode.audit.file</name>
<value>/usr/local/hadoop/etc/hadoop/auditfile.xml</value>
<description>The file name in which user can set which directory to audit, and the command to audit</description>
</property>
<property>
<name>dfs.namenode.audit.file.check.interval</name>
<value>10000</value>
<description>This time decides the interval to check whech the auditfile has been modified.
The modification time of file must be changed. The unit of this configuration is millisecond. </description>
</property>


Copy baidu-hadoop-xbu-2.7.2.jar to ${HADOOP_HOME}/share/hadoop/hdfs/ directory.

Create auditfile.xml

<?xml version="1.0"?>

<audit-config>
<audit-directory>
<path>hdfs://localhost:8020/user/houzhizhen</path>
<commands>rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs</commands>
</audit-directory>

<audit-directory>
<path>hdfs://localhost:8020/user/hefuhua</path>
<commands>rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs</commands>
</audit-directory>

<audit-directory>
<path>hdfs://yq-ns2/user/hefuhua</path>
<commands>rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs</commands>
</audit-directory>
</audit-config>


You can set any number of audit-directory. Only the path starts with ${fs.defaultFS} will be processed by this namenode.
You can change the file at any time, it will take effect in ${dfs.namenode.audit.file.check.interval} millisecond.


Rolling restart the namenode. Check the active namenode whether hdfs-audit.log is generated and the audit function works well.

Source Code:

package org.apache.hadoop.hdfs.namenode;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.HdfsAuditLogger;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringInterner;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;

class RootDirectory {
private Directory root = new Directory(File.separator, false, null);
private static int SEPERATOR_LENGTH = File.separator.length();

public synchronized void add(String subPath, String[] commands) {
if (subPath == null) {
return;
}
if (subPath.startsWith(File.separator)) {
subPath = subPath.substring(SEPERATOR_LENGTH);
}
if (subPath.endsWith(File.separator)) {
subPath = subPath.substring(0, subPath.length() - SEPERATOR_LENGTH);
}
if ("".equals(subPath)) {
return;
}
root.add(subPath, commands);
}

public boolean isAuth(String subPath, String command) {
if (subPath == null) {
return false;
}
if (subPath.startsWith(File.separator)) {
subPath = subPath.substring(SEPERATOR_LENGTH);
}
if (subPath.endsWith(File.separator)) {
subPath = subPath.substring(0, subPath.length() - SEPERATOR_LENGTH);
}
if ("".equals(subPath)) {
return false;
}
if (command == null || "".equals(command)) {
return false;
}
return root.isAuth(subPath, command);
}
}

/**
* We must separate the path with '/'. We can not further separate the path to every char consequence. Because like the
* following directories. /a /ab The two directories are both audited, if use char consequence, '/','a' is a leaf, it
* will not check the after ward chars. And the result is wrong, because directory like /abc can not be audited.
*
* @author houzhizhen
*
*/
class Directory {
private final String name;
private Map<String, Directory> subDirectories = new HashMap<String, Directory>();
private final boolean leaf;
private String[] commands;// Only Audit the registed commands.

Directory(String name, boolean leaf, String[] commands) {
this.name = name;
this.leaf = leaf;
if (leaf) {
if (commands == null) {
throw new RuntimeException("auth commands must not be null ");
}
this.commands = commands;
}
}

/**
*
* @param subPath must not be null.
* @param command
* @return
*/
public boolean isAuth(String subPath, String command) {
int index = subPath.indexOf(File.separator);
String subName;

if (index == -1) {
subName = subPath;
subPath = null;
} else {
subName = subPath.substring(0, index);
// with overflow caution (index + 2) will cause exception.
subPath = subPath.substring(index + 1);
}
Directory subDic = subDirectories.get(subName);
if (subDic != null) {
if (subDic.leaf == true) {
for (int i = 0; i < subDic.commands.length; i++) {
if (command.equals(subDic.commands[i])) {
return true;
}
}
return false;
} else {
if (subPath == null) {
return false;
}
return subDic.isAuth(subPath, command);
}
}
return false;
}

// path can with character /
public synchronized void add(String subPath, String[] commands) {
if (leaf == true) {
throw new RuntimeException("directory can not overlap with each other." + subPath);
}
int index = subPath.indexOf(File.separator);
String subName;

if (index == -1) {
subName = subPath;
subPath = null;
} else {
subName = subPath.substring(0, index);
// with overflow caution (index + 2) will cause exception.
subPath = subPath.substring(index + 1);
}
Directory subDic = subDirectories.get(subName);
if (subDic == null) {
// if subDic is leaf, it pass commands, else commands will not be used at middle path.
subDic = new Directory(subName, subPath == null, subPath == null ? commands : null);
subDirectories.put(subName, subDic);
}
if (subPath != null) {
subDic.add(subPath, commands);
} else {
return;
}
}
}

public class NameNodeAuditLogger extends HdfsAuditLogger {
public static final Log auditLog = LogFactory.getLog(FSNamesystem.class.getName() + ".audit");

String auditFile;
private long auditFileCheckInterval;
private transient volatile RootDirectory root = new RootDirectory();
private Timer timer = new Timer(true);
private String defaultFS;
private static final ThreadLocal<StringBuilder> auditBuffer=new ThreadLocal<StringBuilder>(){
@Override protected StringBuilder initialValue()
{return new StringBuilder();
}
};

@Override
public void initialize(Configuration conf) {
//reinitialize conf, or otherwise defaultFS will be set to value like "hdfs://yq01-sw-hds02.yq01.baidu.com:8020"
conf = new Configuration();
logAuditMessage("NameNodeAuditLogger is initializing");
auditFile = conf.get("dfs.namenode.audit.file", "/usr/local/hadoop/etc/hadoop/auditfile.xml");
auditFileCheckInterval = conf.getLong("dfs.namenode.audit.file.check.interval", 10000L);
defaultFS = conf.get("fs.defaultFS");
if (defaultFS.endsWith("/")) {
defaultFS = defaultFS.substring(0, defaultFS.length() - 1);
}
timer.scheduleAtFixedRate(new ReloadAuditFileTask(auditFile), 10, auditFileCheckInterval);

}

@Override
public void logAuditEvent(boolean succeeded, String userName, InetAddress addr, String cmd, String src, String dst,
FileStatus status, UserGroupInformation ugi, DelegationTokenSecretManager dtSecretManager) {

if (auditLog.isInfoEnabled() && src != null && cmd != null) {
RootDirectory tempRoot = root;
// if cmd is rename, it will check src and dst, else only src is checked.
if ("rename".equals(cmd)) {
if (!tempRoot.isAuth(dst, cmd)) {
if (!tempRoot.isAuth(src, cmd)) {
return;
}
}
} else {
if (!tempRoot.isAuth(src, cmd)) {
// auditLog.info("!tempRoot.isAuth(src, cmd)" + src+", cmd:" + cmd);
return;
}
}

final StringBuilder sb = auditBuffer.get();
sb.setLength(0);
sb.append("allowed=").append(succeeded).append("\t");
sb.append("ugi=").append(userName).append("\t");
sb.append("ip=").append(addr).append("\t");
sb.append("cmd=").append(cmd).append("\t");
sb.append("src=").append(defaultFS).append(src).append("\t");
sb.append("dst=").append(dst == null ? "" : defaultFS).append(dst).append("\t");
if (null == status) {
sb.append("perm=null");
} else {
sb.append("perm=");
sb.append(status.getOwner()).append(":");
sb.append(status.getGroup()).append(":");
sb.append(status.getPermission());
}
sb.append("\t").append("proto=");
sb.append(NamenodeWebHdfsMethods.isWebHdfsInvocation() ? "webhdfs" : "rpc");
logAuditMessage(sb.toString());
}
}

public void logAuditMessage(String message) {
auditLog.info(message);
}

public static void main(String[] args) {
Configuration conf = new Configuration();

NameNodeAuditLogger logger = new NameNodeAuditLogger();
logger.initialize(conf);
RootDirectory root = new RootDirectory();
root.add("/user/hive/warehouse/a.db/a",
"rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs".split(","));
root.add("/user/hive/warehouse/a.db/b",
"rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs".split(","));
root.add("/user/hive/warehouse/b.db/b/",
"rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs".split(","));
root.add("/data/scloud/a", "rename,open,delete,listStatus,create,setPermission,getfileinfo,mkdirs".split(","));

assert (root.isAuth("/user/hive/warehouse/a.db/a/dt=20160514/hour=12/a.txt", "open") == true);
assert (root.isAuth("/user/hive/warehouse/c.db/a.txt", "open") == false);
assert (root.isAuth("/user/houzhizhen/a.txt", "open") == false);
assert (root.isAuth("/data/scloud/a/a", "open") == true);
assert (root.isAuth("/data/scloud/a/b/b", "open") == true);
assert (root.isAuth("/data/scloud/a/b/b", "aaa") == false);
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
logger.logAuditEvent(true, "houzhizhen", null, "open", "/user/hefuhua", null, null);

}

class ReloadAuditFileTask extends TimerTask {
private long fileModified = 0;
private String auditFile;

ReloadAuditFileTask(String auditFile){
this.auditFile = auditFile;
}

@Override
public void run() {
RootDirectory tempRoot = new RootDirectory();
File file = new File(auditFile);
long lastModified = file.lastModified();
//logAuditMessage("file.lastModified()" + file.lastModified() + ", fileModified" + fileModified);
if (fileModified >= lastModified) {
return;
}
fileModified = lastModified;
// auditLog.info("reloading " + auditFile);
try {
List<PathAndCommands> directoryList  = parse(file);

String path;
String commands;
for (PathAndCommands pathAndCommands: directoryList) {
path = pathAndCommands.path;
commands =pathAndCommands.commands;
if (!path.startsWith(defaultFS)) {
// auditLog.info("path.startsWith(defaultFS)" + "path: " + path +",defaultFS:" +defaultFS);
continue;
}
path = path.substring(defaultFS.length());
String[] commandArray = commands.split(",");
for (int i = 0; i < commandArray.length; i++) {
commandArray[i] = commandArray[i].trim();
}
//auditLog.info("tempRoot.add(path, commandArray) " + "path: " + path + ",commands:" + commands);
tempRoot.add(path, commandArray);
}
} catch (IOException | SAXException | ParserConfigurationException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
root = tempRoot;
}
private List<PathAndCommands> parse(File file) throws IOException, SAXException, ParserConfigurationException {
List<PathAndCommands> results = new LinkedList<PathAndCommands>();
DocumentBuilderFactory docBuilderFactory  = DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document  doc= builder.parse( new BufferedInputStream(new FileInputStream(file))) ;
Element root = doc.getDocumentElement();
if (!"audit-config".equals(root.getTagName())){
return results;
}
NodeList nodeList = root.getChildNodes();
for (int i = 0; i < nodeList.getLength(); i++) {
Node propNode = nodeList.item(i);
if (!(propNode instanceof Element)) {
continue;
}
Element audit_directory = (Element)propNode;
if (!"audit-directory".equals(audit_directory.getTagName())){
continue;
}
PathAndCommands pathAndCommands = parsePathAndCommands(audit_directory);
if(pathAndCommands != null){
results.add(pathAndCommands);
}
}

return results;
}

private PathAndCommands parsePathAndCommands(Node directory) {
NodeList fields = directory.getChildNodes();
String path = null;
String commands = null;

for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element)fieldNode;
if ("path".equals(field.getTagName()) && field.hasChildNodes())
path = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData().trim());
if ("commands".equals(field.getTagName()) && field.hasChildNodes())
commands = StringInterner.weakIntern(
((Text)field.getFirstChild()).getData());

}
if(path != null && commands != null) {
return new PathAndCommands(path,commands);
} else {
return null;
}
}
}

private class PathAndCommands{
final String path;
final String commands;
PathAndCommands(String path,String commands){
this.path = path;
this.commands = commands;
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐