您的位置:首页 > 数据库 > Oracle

hadoop从oracle中到数据

2016-04-25 08:46 585 查看
你丫的调了两天,终于调通了。

MapperOracle.java

package com.uucun.hadooporacle;

import java.io.IOException;

import java.util.Iterator;

import java.util.List;

import java.util.Map;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.Mapper;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

public class MapperOracle extends Mapper<LongWritable, MyDbWriteble, Text, NullWritable>{

final Logger logger = LoggerFactory.getLogger(MapperOracle.class);

@Override

protected void map(LongWritable key, MyDbWriteble value,

Context context)

throws IOException, InterruptedException {

context.write(new Text(value.toString()), NullWritable.get());

}

}

MyDbWriteble.java

package com.uucun.hadooporacle;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.ArrayList;

import java.util.Date;

import java.util.Iterator;

import java.util.List;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class MyDbWriteble implements DBWritable,WritableComparable {

private List list=new ArrayList();

class DownloadAd{

private int id;

private String session_id;

private int resource_id;

private Date visit_date;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getSession_id() {

return session_id;

}

public void setSession_id(String session_id) {

this.session_id = session_id;

}

public int getResource_id() {

return resource_id;

}

public void setResource_id(int resource_id) {

this.resource_id = resource_id;

}

public Date getVisit_date() {

return visit_date;

}

public void setVisit_date(Date visit_date) {

this.visit_date = visit_date;

}

}

@Override

public void write(PreparedStatement statement) throws SQLException {

//statement.execute();

}

@Override

public void readFields(ResultSet resultSet) throws SQLException {

DownloadAd downloadAd;

do {

downloadAd=new DownloadAd();

downloadAd.id=resultSet.getInt("id");

downloadAd.session_id=resultSet.getString("session_id");

downloadAd.resource_id=resultSet.getInt("resource_id");

downloadAd.visit_date=resultSet.getDate("visit_date");

list.add(downloadAd);

} while (resultSet.next());

}

@Override

public String toString() {

StringBuilder builder=new StringBuilder();

for (Iterator iterator = list.iterator(); iterator.hasNext();) {

DownloadAd downloadAd = (DownloadAd) iterator.next();

builder.append(downloadAd.session_id);

builder.append(",");

builder.append(downloadAd.resource_id);

builder.append("/n");

}

builder.deleteCharAt(builder.lastIndexOf("/n"));

return builder.toString();

}

@Override

public void write(DataOutput out) throws IOException {

// TODO Auto-generated method stub

}

@Override

public void readFields(DataInput in) throws IOException {

}

@Override

public int compareTo(Object o) {

// TODO Auto-generated method stub

return 0;

}

public List getList() {

return list;

}

public void setList(List list) {

this.list = list;

}

}

RunTool.java

package com.uucun.hadooporacle;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;

import org.apache.hadoop.mapreduce.lib.db.DataDrivenDBInputFormat;

import org.apache.hadoop.mapreduce.lib.db.OracleDataDrivenDBInputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import com.uucun.util.hadooputil.PluginUtil;

class RunTool extends PluginUtil{

public static void main(String[] args) throws Throwable{

System.setProperty("HADOOP_USER_NAME","hadoop");

final JobConf conf = new JobConf();

System.out.println(args[0]);

// System.exit(0);

conf.set("fs.default.name", "hdfs://nn:9000");

// conf.set("hadoop.job.user", "hadoop");

conf.set("mapred.job.tracker", "nn:9001");

conf.set("dfs.permissions","false");

// conf.set("mapred.map.tasks", "1");

conf.setInt("mapred.map.tasks", 4);

// DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@10.1.69.22:1521:uucdev");

// conf.set(DBConfiguration.USERNAME_PROPERTY, "support_p3_dw");

// conf.set(DBConfiguration.PASSWORD_PROPERTY, "suppert$p3$dw");

conf.set(DBConfiguration.INPUT_ORDER_BY_PROPERTY, "t.id");

// conf.set(DBConfiguration.INPUT_CONDITIONS_PROPERTY, "visit_date>TO_TIMESTAMP('2013-05-23','YYYY-MM-DD HH24:MI:SS.FF') and visit_date<=TO_TIMESTAMP('2013-05-25','YYYY-MM-DD HH24:MI:SS.FF')");

DBConfiguration.configureDB(conf, "oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@192.168.4.73:1521:XE", "oracle", "oracle");

String jarName = runonhadoop().toString();

conf.setJar(jarName);

Job job = new Job( conf , "hadoopOracle" );

job.setMapperClass( MapperOracle.class );

// job.setReducerClass(Reducer.class);

job.setInputFormatClass(OracleDataDrivenDBInputFormat.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setJarByClass(RunTool.class);

String queryStr="select t.id,t.session_id,t.resource_id,t.visit_date from tbl_res_download_succeed_log t where t.visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and t.visit_date<=to_date('2013-05-25','yyyy-MM-dd HH24:mi:ss') and $CONDITIONS";

// String queryStr="select id,session_id,resource_id,visit_date from tbl_res_download_succeed_log where visit_date>TO_TIMESTAMP('2013-05-23','YYYY-MM-DD HH24:MI:SS.FF') and visit_date<=TO_TIMESTAMP('2013-05-25','YYYY-MM-DD HH24:MI:SS.FF')";

// String queryStr="select id,session_id,resource_id,visit_date from tbl_res_download_succeed_log";

String boundStr="select min(id),max(id) from tbl_res_download_succeed_log where visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and visit_date<=to_date('2013-05-25','yyyy-MM-dd HH24:mi:ss')";

TextOutputFormat.setOutputPath(job, new Path(args[1]));

OracleDataDrivenDBInputFormat.setInput(job, MyDbWriteble.class,queryStr ,boundStr);

// OracleDataDrivenDBInputFormat.setInput(job, MyDbWriteble.class,"tbl_res_download_succeed_log","visit_date>to_date('2013-05-23','yyyy-MM-dd HH24:mi:ss') and visit_date<to_date('2013-05-24','yyyy-MM-dd HH24:mi:ss')","visit_date","id","session_id","resource_id","visit_date");

// OracleDataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(), boundStr);

FileSystem fs =FileSystem.get(conf);

if (fs.exists(new Path(args[1]))) {

fs.delete(new Path(args[1]),true);

fs.close();

}

job.waitForCompletion(true);

}

}

尤其要注意 $CONDITIONS,就是这个毛毛搞得我两天才调通,还有DBWritable接口中的readFields方法中,ResultSet类型的参数已经是记录的第一条,不能按常规的去next,否则会少一条记录。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: