您的位置:首页 > 运维架构

hadoop学习笔记之mapreduce 基于hbase日志数据的最频繁访问ip统计

2017-03-07 10:26 651 查看

前言

本篇打算基于hbase中存储的日志信息,统计最常访问IP,得到结果

jar依赖

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
</dependency>


其中版本对应hadoop和hbase版本

代码

1.mapper

public class CountMapper extends TableMapper<Text,IntWritable> {
private Text ip = new Text();
private IntWritable count = new IntWritable(1);

@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
/**
* result中只包含log:remote_addr这一列的值
*/
for(Cell cell : value.listCells()) {
ip.set(new String(cell.getValue()));
context.write(ip,count);
}
}


2.reducer

public class MaxCountReducer extends TableReducer<Text,IntWritable,ImmutableBytesWritable> {

@Override
/**
* 若reduce操作是对每个key进行操作,可覆盖此方法
*/
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

}

@Override
public void run(Context context) throws IOException, InterruptedException {
this.setup(context);
try {
HashMap<String,Integer> countMap = new HashMap<String, Integer>();
while(context.nextKey()) {
countMap.put(new String(context.getCurrentKey().getBytes()),count(context.getValues()));
Iterator iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator)iter).resetBackupStore();
}
}
HashSet<String> keys = new HashSet<String>();
int maxCount = maxCount(countMap,keys);
System.out.println("maxcount:"+maxCount);
System.out.println("ips:"+keys.toString());
} finally {
this.cleanup(context);
}
}

//统计某一key对应计数
private int count(Iterable<IntWritable> values) {
int count = 0;
for(IntWritable value:values) {
count+=value.get();
}
return count;
}
//获取最大计数值和对应key
private int  maxCount(HashMap<String,Integer> map,HashSet<String> keys) {
int maxCount = 0;
for(String key : map.keySet()) {
int count = map.get(key);
if(count>maxCount) {
keys.clear();
maxCount = count;
keys.add(key);
}else if(count == maxCount) {
keys.add(key);
}
}
return maxCount;
}
}


3.CountRun

public class CountRun {
public static void run(String table,String family,String qualifier,String hostName) throws IOException, ClassNotFoundException, InterruptedException {
//配置hbase
JobConf conf = new JobConf();
Job job = Job.getInstance(conf,"maxCount");
job.setMapperClass(CountMapper.class);
job.setReducerClass(MaxCountReducer.class);

Scan scan = new Scan();
scan.addColumn(family.getBytes(),qualifier.getBytes());
//设置读入数据来源
TableMapReduceUtil.initTableMapperJob(table.getBytes(),scan,CountMapper.class, Text.class, IntWritable.class,job);
TableMapReduceUtil.initTableReducerJob("maxCount",MaxCountReducer.class,job);
System.exit(job.waitForCompletion(true)?0:1);
}
}


idea远程调试配置

1.下载hadoop-commons-x.x.x-bin,并解压配置环境变量

目录下文件



配置环境变量

HADOOP_HOME ……..(解压目标目录)

PATH中添加${HADOOP_HOME}\lib

2.将hadoop配置文件添加到resources目录下

core-site.xml、hdfs-site.xml、mapred-site.xml、yarn.xml、log4j.properties、hbase-site.xml


3.设置打包方式

File->Project Structure->Artifacts








4.运行调试即可

问题:
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z

at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)

at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)

at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)

at org.apache.hadoop.util.DiskChecker.checkAccessByFileMethods(DiskChecker.java:187)

at org.apache.hadoop.util.DiskChecker.checkDirAccess(DiskChecker.java:174)

at org.apache.hadoop.util.DiskChecker.checkDir(DiskChecker.java:108)

at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:285)

at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)

at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:150)

at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:131)

at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:115)

at org.apache.hadoop.mapred.LocalDistributedCacheManager.setup(LocalDistributedCacheManager.java:125)

at org.apache.hadoop.mapred.LocalJobRunner$Job.<init>(LocalJobRunner.java:163)

at org.apache.hadoop.mapred.LocalJobRunner.submitJob(LocalJobRunner.java:731)

at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:240)

at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)

at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1287)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Unknown Source)

at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

at org.apache.hadoop.mapreduce.Job.submit(Job.java:1287)

at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1308)

at com.mine.mapreduce.hbase.CountRun.run(CountRun.java:29)

at com.mine.mapreduce.Main.main(Main.java:17)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)

at java.lang.reflect.Method.invoke(Unknown Source)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)


解决方法:将HADOOP_HOME\bin目录下的hadoop.dll复制一份到C:\Windows\System32目录下

参考资料:

http://nemotan.github.io/2015/09/intellij%E5%92%8Ceclipse%E8%BF%9C%E7%A8%8B%E6%8F%90%E4%BA%A4%E4%BD%9C%E4%B8%9A%E5%88%B0%E9%9B%86%E7%BE%A4%E5%92%8C%E6%9C%AC%E6%8F%90%E4%BA%A4-5/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐