您的位置:首页 > 运维架构 > Linux

hadoop2.3 伪分布式搭建(centOS 6.5)

2014-10-07 21:17 344 查看
root 账号下操作

copy  jdk-7u60-linux-x64.gz hadoop-2.3.0-x64.tar.gz  至 /root/hadoop/soft下

一:安装 centOS6.5

1. 安装后,配置中文系统

vim /etc/sysconfig/i18n

修改为:LANG=”zh_CN.UTF-8”

2. 安装输入法

         yum install “@Chinese Support”

 

二:修改主机名

     1. 修改network文件

        Vi /etc/sysconfig/network

        修改 HOSTNAME=hadoop

     2. 修改hosts 文件

         vi /etc/hosts  

         添加修改:

 192.168.116.155 hadoop

(注释掉原有配置)

 

3. 刷新

· 重启网络配置: service network restart 

 

三:关闭防火墙

      service  iptables  status        查看防火墙状态

  service  iptables  start         开启防火墙

  service  iptables  stop         关闭防火墙

  service  iptables  restart       重启防火墙

 

 

(1) 重启后永久性生效:

开启:chkconfig iptables on

关闭:chkconfig iptables off

(2) 即时生效,重启后失效:

开启:service iptables start

关闭:service iptables stop

(3) 查看防火墙状态

service iptables status

 

四:SSH免密码登录

     1. 执行  ssh-keygen -t rsa -P ''

  2.  执行 cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

[root@hadoop.ssh]$ chmod 600 authorized_keys  

[root@hadoop.ssh]$ chmod 700 ../.ssh/   #目录权限必须设置700 

 

      3.  验证:ssh hadoop 

 

 

五:JDK 安装

      1. 解压 

             tar -zxvf jdk-7u60-linux-x64.gz

      2. 迁移

             mv -rf jdk1.7.0_60 /root/hadoop/jdk1.7.0_60

  3. 配置环境变量

            Vi /etc/profile

  

export JAVA_HOME=/root/hadoop/jdk1.7.0_60

export JRE_HOME=/root/hadoop/jdk1.7.0_60/jre 

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH

export PATH=$JAVA_HOME/bin:$PATH

        4. 执行: source /etc/profile

        5. 验证:java –version

六:安装hadoop

1. 解压 

tar -zxvf hadoop-2.3.0-x64.tar.gz

2. 迁移

            mv -f hadoop-2.3.0 /root/hadoop/hadoop-2.3.0

3. 配置环境变量

            export HADOOP_HOME=/root/hadoop/hadoop-2.3.0 

export HADOOP_INSTALL=$HADOOP_HOME

export PATH=$PATH:$HADOOP_HOME/bin 

export PATH=$PATH:$HADOOP_HOME/sbin 

export HADOOP_MAPRED_HOME=$HADOOP_HOME 

export HADOOP_COMMON_HOME=$HADOOP_HOME 

export HADOOP_HDFS_HOME=$HADOOP_HOME 

export YARN_HOME=$HADOOP_HOME

4. 执行:source /etc/profile

        5. 创建文件目录

             mkdir ~/dfs/

mkdir ~/dfs/name

mkdir ~/dfs/data

mkdir ~/temp 

6. 修改etc/hadoop目录下配置7个文件

              $HADPP_HOME/etc/hadoop/core-site.xml

$HADPP_HOME/etc/hadoop/hadoop-env.sh

$HADPP_HOME/etc/hadoop/hdfs-site.xml

$HADPP_HOME/etc/hadoop/mapred-site.xml

$HADPP_HOME/etc/hadoop/slaves

$HADPP_HOME/etc/hadoop/yarn-env.sh

$HADPP_HOME/etc/hadoop/yarn-site.xml

 以上个别文件默认不存在的,可以复制相应的template文件获得。

 比如:sudo cp yarn-site.xml.template yarn-site.xml

(1) hadoop-env.sh

                  修改JAVA_HOME

                       export JAVA_HOME=/root/hadoop/jdk1.7.0_60

(2) yarn-env.sh

                   修改JAVA_HOME

                        export JAVA_HOME=/root/hadoop/jdk1.7.0_60

(3) slaves 

删除localhost, 添加hadoop

    (4)core-site.xml

                    在<configuration>节点中间添加:

                    <property>

<name>fs.default.name</name>

<value>hdfs://hadoop:9000</value>

</property>

<property>

<name>hadoop.tem.dir</name>

<value>file:/root/temp</value>

</property>

(5) hdfs-site.xml

                      在<configuration> 节点中间添加:

                      <property>

<name>dfs.namenode.name.dir</name>

<value>file:/root/dfs/name</value>

</property>

<property>

<name>dfs.datanode.data.dir</name>

<value>file:/root/dfs/data</value>

</property>

<property>

<name>dfs.replication</name>

<value>1</value>

</property>

(6) mapred-site.xml

                       在<configuration> 节点中间添加:

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

<property>

<name>mapred.job.tracker</name>

<value>hadoop:9001</value>

</property>

<property>

<name>mapred.remote.os</name>

<value>Linux</value>

<description>

Remote MapReduce framework's OS, can be either Linux or Windows

</description>

</property>

<property>

<name>mapreduce.application.classpath</name>

<value>$HADOOP_CONF_DIR,

$HADOOP_COMMON_HOME/share/hadoop/common/*,

$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,

$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,

$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,

$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,

$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*,

$HADOOP_YARN_HOME/share/hadoop/yarn/*,

$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*

</value>

</property>

(7) yarn-site.xml

                          在<configuration>节点中间添加:

  <property>

<name>yarn.nodemanager.aux-services</name>

<value>mapreduce_shuffle</value>

</property>

<property>

<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

<value>org.apache.hadoop.mapred.ShuffleHandler</value>

</property>

<property>

<name>yarn.application.classpath</name>

<value>

$HADOOP_HOME/etc/hadoop,

$HADOOP_COMMON_HOME/share/hadoop/common/*,

$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,

$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,

$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,

$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*,

$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*,

$HADOOP_YARN_HOME/share/hadoop/yarn/*,

$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*

</value>

</property>

 

7. 格式化hadoop

                   (1) 切换目录:cd /root/hadoop/hadoop-2.3.0/bin

                   (2) 执行:./hdfs namenode -format

(3)之后每次对hadoop配置文件进行修改后必须再次格式化hdfs文件系统,否则会报错。 

再次格式化hdfs文件系统的步骤:备份hdfs文件系统,删除配置文件中hdfs-site.xml中name.dir与data.dir目录下的所有文件,格式化hdfs文件系统,回复hdfs数据。

 

8. 启动hadoop

(1) 完全启动:sbin/start-all.sh

                    (2) 分布启动:sbin/start-dfs.sh

                                  Sbin/start-yarn.sh

 

9. 验证:

  http://hadoop:50070/

(hadoop2.3 只有火狐才能打开?)

 

 

 

七:开发环境搭建(eclipse)

 

1. copy eclipse的安装包至 linux中。

      eclipse-standard-luna-R-linux-gtk-x86_64.tar.gz

2. 执行: tar –zxvf eclipse-standard-luna-R-linux-gtk-x86_64.tar.gz

3. copy hadoop-eclipse插件包,并解压到eclipse的plugins中

4. 启动eclipse

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

(代码 )

Wordcount代码如下:

package test;

 

import java.io.IOException;

 

import java.util.StringTokenizer;

 

import org.apache.hadoop.conf.Configuration;

 

import org.apache.hadoop.fs.Path;

 

import org.apache.hadoop.io.IntWritable;

 

import org.apache.hadoop.io.Text;

 

import org.apache.hadoop.mapreduce.Job;

 

import org.apache.hadoop.mapreduce.Mapper;

 

import org.apache.hadoop.mapreduce.Reducer;

 

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

import org.apache.hadoop.util.GenericOptionsParser;

 

public class wordcount {

 

         public static class TokenizerMapper

 

         extends Mapper<Object, Text, Text, IntWritable> {

 

                   private final static IntWritable one = new IntWritable(1);

 

                   private Text word = new Text();

 

                   public void map(Object key, Text value, Context context

 

                   ) throws IOException, InterruptedException {

 

                            StringTokenizer itr = new StringTokenizer(value.toString());

 

                            while (itr.hasMoreTokens()) {

 

                                     word.set(itr.nextToken());

 

                                     context.write(word, one);

 

                            }

 

                   }

 

         }

 

         public static class IntSumReducer

 

         extends Reducer<Text, IntWritable, Text, IntWritable> {

 

                   private IntWritable result = new IntWritable();

 

                   public void reduce(Text key, Iterable<IntWritable> values,

 

                   Context context

 

                   ) throws IOException, InterruptedException {

 

                            int sum = 0;

 

                            for (IntWritable val : values) {

 

                                     sum += val.get();

 

                            }

 

                            result.set(sum);

 

                            context.write(key, result);

 

                   }

 

         }

 

         public static void main(String[] args) throws Exception {

 

                   Configuration conf = new Configuration();

 

                   String[] otherArgs = new GenericOptionsParser(conf, args)

                                     .getRemainingArgs();

 

                   if (otherArgs.length != 2) {

 

                            System.err.println("Usage: wordcount <in> <out>");

 

                            System.exit(2);

 

                   }

 

                   Job job = new Job(conf, "word count");

 

                   job.setJarByClass(wordcount.class);

 

                   job.setMapperClass(TokenizerMapper.class);

 

                   job.setCombinerClass(IntSumReducer.class);

 

                   job.setReducerClass(IntSumReducer.class);

 

                   job.setOutputKeyClass(Text.class);

 

                   job.setOutputValueClass(IntWritable.class);

 

                   FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

 

                   FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

 

                   System.exit(job.waitForCompletion(true) ? 0 : 1);

 

         }

}

 

 

 

八:安装sqoop

1. 下载sqoop  sqoop-1.99.3  

地址:sudo wget http://mirrors.hust.edu.cn/apache/sqoop/1.99.3/sqoop-1.99.3.tar.gz

解压路径:/root/hadoop/sqoop

 

2. 修改/etc/profile

     Vim /etc/profile

 

Export SQOOP_HOME=/root/hadoop/sqoop

Export CATALINA_BASE=$SQOOP_HOME/server

 

3. 修改sqoop配置

 

 Vi server/conf/sqoop.properties

 

 将org.apache.sqoop.submission.engine.mapreduce.configuration.directory后面hadoop的位置修改为自己安装的  hadoop配置文件位置 /root/hadoop/hadoop-2.3.0/etc/hadoop/

 

4. 修改sqoop读取hadoop的jar包的路径:

 

vi /sqoop/server/conf/catalina.properties

 

将common.loader行后的/usr/lib/hadoop/lib/*.jar改成自己的hadoop jar 包目录,我的为:

 

/root/hadoop/hadoop-2.3.0/share/hadoop/common/*.jar,/root/hadoop/hadoop-2.3.0/share/hadoop/common/lib/*.jar,/root/hadoop/hadoop-2.3.0/share/hadoop/hdfs/*.jar,/root/hadoop/hadoop-2.3.0/share/hadoop/hdfs/lib/*.jar,/root/hadoop/hadoop-2.3.0/share/hadoop/mapreduce/*.jar,/root/hadoop/hadoop-2.3.0/share/hadoop/mapreduce/lib/*.jar,/root/hadoop/hadoop-2.3.0/share/hadoop/tools/*.jar,/root/hadoop/hadoop-2.3.0/share/hadoop/tools/lib/*.jar,/root/hadoop/hadoop-2.3.0/share/hadoop/yarn/*.jar,/ro
21ad1
ot/hadoop/hadoop-2.3.0/share/hadoop/yarn/lib/*.jar

 

5. 下载mysql jdbc 的drvier,放在/root/hadoop/sqoop/server/lib

 

6. 启动 sqoop

./bin/sqoop.sh server start

7. 停止sqoop

./bin/sqoop.sh server stop

 

8. 进入客户端交互模式

./bin/sqoop.sh client

 

9. 创建数据库连接

sqoop:000> create connection --cid 1 

Creating connection for connector with id 1  

Please fill following values to create new connection object  

Name: Mysql-H216  

 

Connection configuration  

 

JDBC Driver Class: com.mysql.jdbc.Driver  

JDBC Connection String: jdbc:mysql://192.168.1.120:3306/test

Username: admin  

Password: *****  

JDBC Connection Properties:  

There are currently 0 values in the map:  

entry#  

 

Security related configuration options  

 

Max connections: 100  

 

New connection was successfully created with validation status FINE and persistent id 1 

 

10. 创建导入任务

sqoop:000> create job --xid 1 --type import  (xid: connection id)

Creating job for connection with id 1  

Please fill following values to create new job object  

Name: HeartBeat  

 

Database configuration  

 

Schema name: mic_db_out  

Table name: t_heart_beat  

Table SQL statement:  

Table column names:  

Partition column name:  

Nulls in partition column:  

Boundary query:  

 

Output configuration  

 

Storage type:  

 0 : HDFS  

Choose: 0  

Output format:  

 0 : TEXT_FILE  

 1 : SEQUENCE_FILE  

Choose: 0  

Compression format:  

 0 : NONE  

 1 : DEFAULT  

 2 : DEFLATE  

 3 : GZIP  

 4 : BZIP2  

 5 : LZO  

 6 : LZ4  

 7 : SNAPPY  

Choose: 0  

Output directory: /user/jarcec/users

 

Throttling resources  

 

Extractors:  

Loaders:  

New job was successfully created with validation status FINE  and persistent id 1  

 

11. 查看导入任务状态

sqoop:000> status job --jid 1  

Submission details  

Job ID: 1  

Server URL: :12000/sqoop/  

Created by: dev  

Creation date: 2014-04-19 18:54:25 CST  

Lastly updated by: dev  

External ID: job_local1638775039_0002  

2014-04-19 18:54:50 CST: UNKNOWN  

 

12. 启动任务

start job --jid 1 –s

 

13. 其他相关命令

显示任务:status job --jid 1

显示所有任务:show job -a

停止任务:stop job --jid 1

删除任务: delete job –jid 1

克隆连接:clone connection --xid 1

克隆任务:clone job --jid 1

删除连接:delete connection --xid 1

 

(生成导入文件默认目录:sqoop/output-sqoop2-history)

 

14. 导出

创建任务: crate job  --xid 1 --type export --fields-terminated-by "\0001"  

 

 

(备注:  目前只能把数据从mysql 导入到linux本地,再用hdfs dfs –put /input/* /input 上传到hdfs中, 问题待解决)

 

九:安装hive

1. 下载hive 最新版本

apache-hive-0.13.1-bin.tar.gz  (适用 hadoop 2.3)

 

2. 将hive 压缩包拷贝到服务器 /root/hadoop/ 下,并解压:

/root/hadoop/hive/

3. 配置环境变量

Vi /etc/profile

export HIVE_HOME=/root/hadoop/hive

export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/conf

source /etc/profile

 

4. 配置 hive-site.xml

(1) 创建hive-site.xml

Cd /root/hadoop/hive/conf

cp hive-default.xml.template hive-site.xml

(2) 在hdfs上创建目录

 Hdfs dfs –mkdir –p /hive/warehouse

  Hdfs dfs –mkdir /tmp

 (3) 配置参数

Hive 数据存储目录:

hive-metastore.warehourse.dir:/hive/warehouse

 

hive  数据临时目录

hive.exec.scratchdir:默认 /tmp/hive-${user.name}

 

(4)连接数据库配置

在默认情况下,Hive已经配置好了Derby数据库的连接参数,并且集成了Derby数据库及数据库连接驱动jar包。
参数javax.jdo.option.ConnectionURL

指定Hive连接的数据库:

jdbc:derby:;databaseName=metastore_db;create=true

参数javax.jdo.option.ConnectionDriverName

指定驱动的类入口名称:

org.apache.derby.jdbc.EmbeddedDriver

参数javax.jdo.option.ConnectionUserName

指定数据库的用户名:APP

参数javax.jdo.option.ConnectionPassword

指定数据库的密码:mime

Derby数据库驱动在目录$HIVE_HOME/lib/下,

Hive-0.13.1默认提供的驱动包:derby-10.10.1.1.jar

 

(5)配置mysql 数据库信息

将mysql-connector-java-5.1.22-bin.jar 拷贝至 $HIVE_HOME/lib

 

<property>

  <name>javax.jdo.option.ConnectionURL</name>

  <value>jdbc:mysql://103.6.220.91:3306/appStore?characterEncoding=utf8</value>

  <description>JDBC connect string for a JDBC metastore</description>

</property>

 

<property>

  <name>javax.jdo.option.ConnectionDriverName</name>

  <value>com.mysql.jdbc.Driver</value>

  <description>Driver class name for a JDBC metastore</description>

</property>

<property>

  <name>javax.jdo.option.ConnectionUserName</name>

  <value>appStore</value>

  <description>username to use against metastore database</description>

</property>

 

<property>

  <name>javax.jdo.option.ConnectionPassword</name>

  <value>bxl206</value>

  <description>password to use against metastore database</description>

</property>

(6)启动hive

启动:/root/hadoop/hive/hive

退出:Exit; 

(7) 清空表数据

insert overwrite table dc_app_20140820 select * from dc_app_20140820 where 1=0;

(注: (4) (5) 选其一)

 

5. 常见问题

(1) com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes

解决:ALTER DATABASE hadoopDB CHARACTER SET latin1;

(2) Name node is in safe mode.

解决:hadoop dfsadmin -safemode leave  (退出安全模式)

 

(3) 常用脚本

CREATE TABLE dc_app_20140822(id int,mid String,appv String,df_browser String,model String,project String,channel String,system_apps String,data_apps String,app_used String,df_launcher String,desktop_shortcut String,report_date String,create_date String,province_name String,province_code String,city_name String,city_code String)

   COMMENT "this is a dc_app22"

   ROW FORMAT DELIMITED FIELDS TERMINATED BY ','

   STORED AS TEXTFILE;

 

 

LOAD DATA INPATH '/hadoopDB/dc_app_20140820/*' INTO TABLE dc_app_20140822;

    

    

    insert overwrite table dc_app_20140820 select * from dc_app_20140820 where 1=0;

 

6. eclipse 集成hive

(1) 需要 jar

 

(2) hive --service hiveserver -p 50000 &
(3) 测试代码

1.  package hive;  

2.   

3. import java.sql.*;  

4. import java.sql.Date;  

5. import java.text.SimpleDateFormat;  

6. import java.util.*;  

7.   

8. public class HiveService {  

9.     private static final String URLHIVE = "jdbc:hive://192.168.1.9:50000/default";  

10.     private static Connection connection = null;  

11.   

12.     public static Connection getHiveConnection() {  

13.         if (null == connection) {  

14.             synchronized (HiveService.class) {  

15.                 if (null == connection) {  

16.                     try {  

17.                         Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");  

18.                         connection = DriverManager.getConnection(URLHIVE, "","");  

19.                     } catch (SQLException e) {  

20.                         e.printStackTrace();  

21.                     } catch (ClassNotFoundException e) {  

22.                         e.printStackTrace();  

23.                     }  

24.                 }  

25.             }  

26.         }  

27.         return connection;  

28.     }  

29.   

30.     public static void createTable() throws SQLException {  

31.         String tweetTableSql = "DROP TABLE IF EXISTS hive_crm_tweet2222";  

32.         String createTable1 = "CREATE EXTERNAL TABLE hive_crm_tweet2222(tweet_id string, cuser_id string, created_at bigint, year bigint, month bigint, day bigint, hour bigint, text string, comments_count bigint, reposts_count bigint, source string, retweeted_id string, post_type string, sentiment string, positive_tags_string string, predict_tags_string string, tags_string string) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES (\"dynamodb.table.name\" = \"crm_tweet\",\"dynamodb.column.mapping\" = \"tweet_id:tweet_id,cuser_id:cuser_id,created_at:created_at,year:year,month:month,day:day,hour:hour,text:text,comments_count:comments_count,reposts_count:reposts_count,source:source,retweeted_id:retweeted_id,post_type:post_type,sentiment:sentiment,positive_tags_string:positive_tags_string,predict_tags_string:predict_tags_string,tags_string:tags_string\")";  

33.         String commentTableSql = "DROP TABLE IF EXISTS hive_tweet_comment2222";  

34.         String createTable2 = "CREATE EXTERNAL TABLE hive_tweet_comment2222(tweet_id string,comment_id string, cuser_id string, user_id string, created_at bigint, year bigint, month bigint, day bigint, hour bigint, text string, comments_count bigint, reposts_count bigint, source string, topic_id string, post_type string, sentiment string) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES (\"dynamodb.table.name\" = \"crm_tweet_comment\",\"dynamodb.column.mapping\" = \"tweet_id:tweet_id,comment_id:comment_id,cuser_id:cuser_id,user_id:user_id,created_at:created_at,year:year,month:month,day:day,hour:hour,text:text,comments_count:comments_count,reposts_count:reposts_count,source:source,topic_id:tweet_id,post_type:post_type,sentiment:sentiment\")";  

35.         String retweetTableSql = "DROP TABLE IF EXISTS hive_tweet_retweet2222";  

36.         String createTable3 = "CREATE EXTERNAL TABLE hive_tweet_retweet2222(tweet_id string, cuser_id string, user_id string, retweet_id string, created_at BIGINT, year BIGINT, month BIGINT, day BIGINT, hour BIGINT, text string, comments_count BIGINT, reposts_count BIGINT, source string, topic_id string, verified_type BIGINT, post_type string, sentiment string) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES (\"dynamodb.table.name\" = \"crm_tweet_retweet\",\"dynamodb.column.mapping\" = \"tweet_id:tweet_id,cuser_id:cuser_id,user_id:user_id,retweet_id:retweet_id,created_at:created_at,year:year,month:month,day:day,hour:hour,text:text,comments_count:comments_count,reposts_count:reposts_count,source:source,topic_id:tweet_id,verified_type:verified_type,post_type:post_type,sentiment:sentiment\")";  

37.   

38.         Statement stmt = getHiveConnection().createStatement();  

39.         stmt.executeQuery(tweetTableSql);  

40.         stmt.executeQuery(createTable1);  

41.         stmt.executeQuery(commentTableSql);  

42.         stmt.executeQuery(createTable2);  

43.         stmt.executeQuery(retweetTableSql);  

44.         stmt.executeQuery(createTable3);  

45.     }  

46.   

47.     public static void selectTweet() throws SQLException {  

48.         long aaa = System.currentTimeMillis();  

49.         //long start = DateUtils.getNDaysAgo(DateUtils.getMidNight(), 15).getTime().getTime();  

50.         //long end = DateUtils.getNDaysAgo(DateUtils.getMidNight(), 13).getTime().getTime();  

51.         long start =new java.util.Date().getTime();  

52.         long end =start+2*60*60*1000;  

53.         String sql = "select cuser_id, count(*) as tw_hour, year, month, day from hive_crm_tweet2222 where created_at > ? and created_at < ? and cuser_id = ? group by cuser_id, year, month, day, hour";  

54.         PreparedStatement pstm = getHiveConnection().prepareStatement(sql);  

55.         pstm.setLong(1, start);  

56.         pstm.setLong(2, end);  

57.         pstm.setString(3, "2176270443");  

58.         ResultSet rss = pstm.executeQuery();  

59.         while (rss.next()) {  

60.             System.out.println("1: " + rss.getString("cuser_id") + "   2: "  

61.                     + rss.getInt("tw_hour") + "   3: " + rss.getInt("year")  

62.                     + "   4: " + rss.getInt("month") + "   5: "  

63.                     + rss.getInt("day"));  

64.         }  

65.   

66.         System.out.println(System.currentTimeMillis() - aaa);  

67.   

68.     }  

69.   

70.     public static void selectTweet22() throws SQLException {  

71.         long aaa = System.currentTimeMillis();  

72.         //long start = DateUtils.getNDaysAgo(DateUtils.getMidNight(), 15).getTime().getTime();  

73.         //long end = DateUtils.getNDaysAgo(DateUtils.getMidNight(), 13).getTime().getTime();  

74.         long start =new java.util.Date().getTime();  

75.         long end =start+2*60*60*1000;  

76.           

77.         String sql = "select cuser_id, created_at, tweet_id from hive_crm_tweet2222 where created_at > ? and created_at < ? and cuser_id = ?";  

78.         PreparedStatement pstm = getHiveConnection().prepareStatement(sql);  

79.         pstm.setLong(1, start);  

80.         pstm.setLong(2, end);  

81.         pstm.setString(3, "2176270443");  

82.         ResultSet rss = pstm.executeQuery();  

83.         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");  

84.         while (rss.next()) {  

85.             long cc = Long.valueOf(String.valueOf(rss.getInt("created_at"))  

86.                     + "000");  

87.             java.util.Date date = new java.util.Date(cc);  

88.             System.out.println(dateFormat.format(date));  

89.             System.out.println(rss.getString("cuser_id") + " "  

90.                     + rss.getString("tweet_id"));  

91.         }  

92.   

93.         System.out.println(System.currentTimeMillis() - aaa);  

94.   

95.     }  

96.   

97.     public static void main(String[] args) throws ClassNotFoundException,  

98.             SQLException {  

99.          Class.forName("org.apache.hadoop.hive.jdbc.HiveDriver");  

100.         String querySQL = "SELECT a.* FROM student a";  

101.         Connection con = DriverManager.getConnection(URLHIVE, "", "");  

102.         Statement stmt = con.createStatement();  

103.         ResultSet res = stmt.executeQuery(querySQL); // 执行查询语句  

104.         while (res.next()) {  

105.         System.out.println("Result: key:" + res.getString(1) + "  –>  value:"+ res.getString(2));  

106.         }  

107.         //createTable();  

108.         //selectTweet22();  

109.   

110.          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH");  

111.          System.out.println(dateFormat.format(new java.util.Date()));  

112.     }  

113. }  

 

十:安装Hbase

 

(一) 安装 zookeeper

1. 将zookeeper-3.4.6.tar.gz 解压至 /root/hadoop/zookeeper

2. cp conf/zoo_sample.cfg conf/zoo.cfg

3. vim conf/zoo.cfg

(1)tickTime=2000

(2)  dataDir=/home/ruifeng.shan/zookeeper

( 3)clientPort=2181

(4) initLimit=5

(5) syncLimit=2

4. 在你指定的dataDir目录下新建一个文件myid,在里面添加你指定的server编号。例如,你为hostname名为master的指定了如下配置
server.1=master:2888:3888 ,所以它的编号应该是1.

将zookeeper-3.4.5分发到其他机器。然后修改对应的myid文件内容为之前给其指定的编号。如node为2,slave为3.

(1) server.1=master:2888:3888

(2) server.2=node:2888:3888

(3) server.3=slave:2888:3888

5. 分别启动zookeeper服务。在每台机器上切换到zookeeper-3.4.5目录,执行

$bin/zkServer.sh start

 

(二) 安装 hbase-0.96

1. 解压 hbase-0.96.0-hadoop2-bin.tar.gz 至 /root/hadoop/hbase

2. vim conf/hbase-env.sh

export JAVA_HOME=/root/hadoop/jdk1.7.0_60

export HBASE_MANAGES_ZK=false

3. vi hbase-site.xml

 

1. <configuration>

2. <property>

3. <name>hbase.rootdir</name>

4. <value> hdfs://hadoop:9000/hbase</value>

5. </property>

6. 

7. <property>

8. <name>hbase.master</name>

9. <value>hdfs://hadoop:60000</value>

10. </property>

11. 

12. <property>

13. <name>hbase.cluster.distributed</name>

14. <value>true</value>

15. </property>

16. 

17. <property>

18. <name>hbase.zookeeper.property.clientPort</name>

19. <value>2181</value>

20. </property>

21. 

22. <property>

23. <name>hbase.zookeeper.quorum</name>

24. <value>hadoop</value>

25. </property>

26. 

27. <property>

28. <name>hbase.zookeeper.property.dataDir</name>

29. <value>/home/aaron/zookeeper</value>

30. </property>

31. 

32. <property>

33. <name>hbase.client.scanner.caching</name>

34. <value>200</value>

35. </property>

36. 

37. <property>

38. <name>hbase.balancer.period</name>

39. <value>300000</value>

40. </property>

41. 

42. <property>

43. <name>hbase.client.write.buffer</name>

44. <value>10485760</value>

45. </property>

46. 

47. <property>

48. <name>hbase.hregion.majorcompaction</name>

49. <value>7200000</value>

50. </property>

51. 

52. <property>

53. <name>hbase.hregion.max.filesize</name>

54. <value>67108864</value>

55. <description>

56. Maximum HStoreFile size. If any one of a column families' HStoreFiles has

57. grown to exceed this value, the hosting HRegion is split in two.</description>

58. </property>

59. 

60. <property>

61. <name>hbase.hregion.memstore.flush.size</name>

62. <value>1048576</value>

63. <description>

64. Memstore will be flushed to disk if size of the memstore

65. exceeds this number of bytes. Value is checked by a thread that runs

66. every hbase.server.thread.wakefrequency.</description>

67. </property>

68. 

69. <property>

70. <name>hbase.server.thread.wakefrequency</name>

71. <value>30000</value>

72. <description>Time to sleep in between searches for work (in milliseconds).

73. Used as sleep interval by service threads such as log roller.</description>

74. </property>

75. 

76. </configuration>

其中hbase.rootdir要保持与hadoop的core-site.xml文件中的fs.default.name中的值一致。

 

注意:为了兼容Hadoop-2.2.0,需要将hbase的lib包中的内容lib包中的hadoop-common-2.1.0-beta.jar替换成hadoop-2.2.0/share/hadoop/common目录下的hadoop-common-2.2.0.jar。

 

4. 启动hbase:

 

 start-hbase.sh 

 

验证:http://hadoop:60010/master-status

 

 

 进入shell

   ./bin/hbase shell

 

5. 集成eclipse 开发实例

1、搭建环境

  新建JAVA项目,添加的包有:

   有关Hadoop的hadoop-core-0.20.204.0.jar

   有关Hbase的hbase-0.90.4.jar、hbase-0.90.4-tests.jar以及Hbase资源包中lib目录下的所有jar包

 

2、主要程序

 

Java代码  

1. package com.wujintao.hbase.test;  

2.   

3. import java.io.IOException;  

4. import java.util.ArrayList;  

5. import java.util.List;  

6.   

7. import org.apache.hadoop.conf.Configuration;  

8. import org.apache.hadoop.hbase.HBaseConfiguration;  

9. import org.apache.hadoop.hbase.HColumnDescriptor;  

10. import org.apache.hadoop.hbase.HTableDescriptor;  

11. import org.apache.hadoop.hbase.KeyValue;  

12. import org.apache.hadoop.hbase.MasterNotRunningException;  

13. import org.apache.hadoop.hbase.ZooKeeperConnectionException;  

14. import org.apache.hadoop.hbase.client.Delete;  

15. import org.apache.hadoop.hbase.client.Get;  

16. import org.apache.hadoop.hbase.client.HBaseAdmin;  

17. import org.apache.hadoop.hbase.client.HTable;  

18. import org.apache.hadoop.hbase.client.HTablePool;  

19. import org.apache.hadoop.hbase.client.Put;  

20. import org.apache.hadoop.hbase.client.Result;  

21. import org.apache.hadoop.hbase.client.ResultScanner;  

22. import org.apache.hadoop.hbase.client.Scan;  

23. import org.apache.hadoop.hbase.filter.Filter;  

24. import org.apache.hadoop.hbase.filter.FilterList;  

25. import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;  

26. import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;  

27. import org.apache.hadoop.hbase.util.Bytes;  

28.   

29. public class JinTaoTest {  

30.   

31.     public static Configuration configuration;  

32.     static {  

33.         configuration = HBaseConfiguration.create();  

34.         configuration.set("hbase.zookeeper.property.clientPort", "2181");  

35.         configuration.set("hbase.zookeeper.quorum", "192.168.1.100");  

36.         configuration.set("hbase.master", "192.168.1.100:600000");  

37.     }  

38.   

39.     public static void main(String[] args) {  

40.         // createTable("wujintao");  

41.         // insertData("wujintao");  

42.         // QueryAll("wujintao");  

43.         // QueryByCondition1("wujintao");  

44.         // QueryByCondition2("wujintao");  

45.         //QueryByCondition3("wujintao");  

46.         //deleteRow("wujintao","abcdef");  

47.         deleteByCondition("wujintao","abcdef");  

48.     }  

49.   

50.     /** 

51.      * 创建表 

52.      * @param tableName 

53.      */  

54.     public static void createTable(String tableName) {  

55.         System.out.println("start create table ......");  

56.         try {  

57.             HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);  

58.             if (hBaseAdmin.tableExists(tableName)) {// 如果存在要创建的表,那么先删除,再创建  

59.                 hBaseAdmin.disableTable(tableName);  

60.                 hBaseAdmin.deleteTable(tableName);  

61.                 System.out.println(tableName + " is exist,detele....");  

62.             }  

63.             HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);  

64.             tableDescriptor.addFamily(new HColumnDescriptor("column1"));  

65.             tableDescriptor.addFamily(new HColumnDescriptor("column2"));  

66.             tableDescriptor.addFamily(new HColumnDescriptor("column3"));  

67.             hBaseAdmin.createTable(tableDescriptor);  

68.         } catch (MasterNotRunningException e) {  

69.             e.printStackTrace();  

70.         } catch (ZooKeeperConnectionException e) {  

71.             e.printStackTrace();  

72.         } catch (IOException e) {  

73.             e.printStackTrace();  

74.         }  

75.         System.out.println("end create table ......");  

76.     }  

77.   

78.     /** 

79.      * 插入数据 

80.      * @param tableName 

81.      */  

82.     public static void insertData(String tableName) {  

83.         System.out.println("start insert data ......");  

84.         HTablePool pool = new HTablePool(configuration, 1000);  

85.         HTable table = (HTable) pool.getTable(tableName);  

86.         Put put = new Put("112233bbbcccc".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值  

87.         put.add("column1".getBytes(), null, "aaa".getBytes());// 本行数据的第一列  

88.         put.add("column2".getBytes(), null, "bbb".getBytes());// 本行数据的第三列  

89.         put.add("column3".getBytes(), null, "ccc".getBytes());// 本行数据的第三列  

90.         try {  

91.             table.put(put);  

92.         } catch (IOException e) {  

93.             e.printStackTrace();  

94.         }  

95.         System.out.println("end insert data ......");  

96.     }  

97.   

98.     /** 

99.      * 删除一张表 

100.      * @param tableName 

101.      */  

102.     public static void dropTable(String tableName) {  

103.         try {  

104.             HBaseAdmin admin = new HBaseAdmin(configuration);  

105.             admin.disableTable(tableName);  

106.             admin.deleteTable(tableName);  

107.         } catch (MasterNotRunningException e) {  

108.             e.printStackTrace();  

109.         } catch (ZooKeeperConnectionException e) {  

110.             e.printStackTrace();  

111.         } catch (IOException e) {  

112.             e.printStackTrace();  

113.         }  

114.   

115.     }  

116.     /** 

117.      * 根据 rowkey删除一条记录 

118.      * @param tablename 

119.      * @param rowkey 

120.      */  

121.      public static void deleteRow(String tablename, String rowkey)  {  

122.         try {  

123.             HTable table = new HTable(configuration, tablename);  

124.             List list = new ArrayList();  

125.             Delete d1 = new Delete(rowkey.getBytes());  

126.             list.add(d1);  

127.               

128.             table.delete(list);  

129.             System.out.println("删除行成功!");  

130.               

131.         } catch (IOException e) {  

132.             e.printStackTrace();  

133.         }  

134.           

135.   

136.     }  

137.   

138.      /** 

139.       * 组合条件删除 

140.       * @param tablename 

141.       * @param rowkey 

142.       */  

143.      public static void deleteByCondition(String tablename, String rowkey)  {  

144.             //目前还没有发现有效的API能够实现 根据非rowkey的条件删除 这个功能能,还有清空表全部数据的API操作  

145.   

146.     }  

147.   

148.   

149.     /** 

150.      * 查询所有数据 

151.      * @param tableName 

152.      */  

153.     public static void QueryAll(String tableName) {  

154.         HTablePool pool = new HTablePool(configuration, 1000);  

155.         HTable table = (HTable) pool.getTable(tableName);  

156.         try {  

157.             ResultScanner rs = table.getScanner(new Scan());  

158.             for (Result r : rs) {  

159.                 System.out.println("获得到rowkey:" + new String(r.getRow()));  

160.                 for (KeyValue keyValue : r.raw()) {  

161.                     System.out.println("列:" + new String(keyValue.getFamily())  

162.                             + "====值:" + new String(keyValue.getValue()));  

163.                 }  

164.             }  

165.         } catch (IOException e) {  

166.             e.printStackTrace();  

167.         }  

168.     }  

169.   

170.     /** 

171.      * 单条件查询,根据rowkey查询唯一一条记录 

172.      * @param tableName 

173.      */  

174.     public static void QueryByCondition1(String tableName) {  

175.   

176.         HTablePool pool = new HTablePool(configuration, 1000);  

177.         HTable table = (HTable) pool.getTable(tableName);  

178.         try {  

179.             Get scan = new Get("abcdef".getBytes());// 根据rowkey查询  

180.             Result r = table.get(scan);  

181.             System.out.println("获得到rowkey:" + new String(r.getRow()));  

182.             for (KeyValue keyValue : r.raw()) {  

183.                 System.out.println("列:" + new String(keyValue.getFamily())  

184.                         + "====值:" + new String(keyValue.getValue()));  

185.             }  

186.         } catch (IOException e) {  

187.             e.printStackTrace();  

188.         }  

189.     }  

190.   

191.     /** 

192.      * 单条件按查询,查询多条记录 

193.      * @param tableName 

194.      */  

195.     public static void QueryByCondition2(String tableName) {  

196.   

197.         try {  

198.             HTablePool pool = new HTablePool(configuration, 1000);  

199.             HTable table = (HTable) pool.getTable(tableName);  

200.             Filter filter = new SingleColumnValueFilter(Bytes  

201.                     .toBytes("column1"), null, CompareOp.EQUAL, Bytes  

202.                     .toBytes("aaa")); // 当列column1的值为aaa时进行查询  

203.             Scan s = new Scan();  

204.             s.setFilter(filter);  

205.             ResultScanner rs = table.getScanner(s);  

206.             for (Result r : rs) {  

207.                 System.out.println("获得到rowkey:" + new String(r.getRow()));  

208.                 for (KeyValue keyValue : r.raw()) {  

209.                     System.out.println("列:" + new String(keyValue.getFamily())  

210.                             + "====值:" + new String(keyValue.getValue()));  

211.                 }  

212.             }  

213.         } catch (Exception e) {  

214.             e.printStackTrace();  

215.         }  

216.   

217.     }  

218.   

219.     /** 

220.      * 组合条件查询 

221.      * @param tableName 

222.      */  

223.     public static void QueryByCondition3(String tableName) {  

224.   

225.         try {  

226.             HTablePool pool = new HTablePool(configuration, 1000);  

227.             HTable table = (HTable) pool.getTable(tableName);  

228.   

229.             List<Filter> filters = new ArrayList<Filter>();  

230.   

231.             Filter filter1 = new SingleColumnValueFilter(Bytes  

232.                     .toBytes("column1"), null, CompareOp.EQUAL, Bytes  

233.                     .toBytes("aaa"));  

234.             filters.add(filter1);  

235.   

236.             Filter filter2 = new SingleColumnValueFilter(Bytes  

237.                     .toBytes("column2"), null, CompareOp.EQUAL, Bytes  

238.                     .toBytes("bbb"));  

239.             filters.add(filter2);  

240.   

241.             Filter filter3 = new SingleColumnValueFilter(Bytes  

242.                     .toBytes("column3"), null, CompareOp.EQUAL, Bytes  

243.                     .toBytes("ccc"));  

244.             filters.add(filter3);  

245.   

246.             FilterList filterList1 = new FilterList(filters);  

247.   

248.             Scan scan = new Scan();  

249.             scan.setFilter(filterList1);  

250.             ResultScanner rs = table.getScanner(scan);  

251.             for (Result r : rs) {  

252.                 System.out.println("获得到rowkey:" + new String(r.getRow()));  

253.                 for (KeyValue keyValue : r.raw()) {  

254.                     System.out.println("列:" + new String(keyValue.getFamily())  

255.                             + "====值:" + new String(keyValue.getValue()));  

256.                 }  

257.             }  

258.             rs.close();  

259.   

260.         } catch (Exception e) {  

261.             e.printStackTrace();  

262.         }  

263.   

264.     }  

265.   

266. }  

 注意:可能大家没看到更新数据的操作,其实更新的操作跟添加完全一致,只不过是添加呢rowkey不存在,更新呢rowkey已经存在,并且timstamp相同的情况下,还有就是目前好像还没办法实现hbase数据的分页查询,不知道有没有人知道怎么做

 

HBase性能优化建议:

 针对前面的代码,有很多不足之处,在此我就不修改上面的代码了,只是提出建议的地方,大家自己加上

   1)配置

  当你调用create方法时将会加载两个配置文件:hbase-default.xml and hbase-site.xml,利用的是当前的java类路径, 代码中configuration设置的这些配置将会覆盖hbase-default.xml和hbase-site.xml中相同的配置,如果两个配置文件都存在并且都设置好了相应参上面的属性下面的属性即可

 

 2)关于建表

  

public void createTable(HTableDescriptor desc)

 

HTableDescriptor 代表的是表的schema, 提供的方法中比较有用的有

setMaxFileSize,指定最大的region size

setMemStoreFlushSize 指定memstore flush到HDFS上的文件大小

增加family通过 addFamily方法

 

public void addFamily(final HColumnDescriptor family)

 

HColumnDescriptor代表的是column的schema,提供的方法比较常用的有

setTimeToLive:指定最大的TTL,单位是ms,过期数据会被自动删除。

setInMemory:指定是否放在内存中,对小表有用,可用于提高效率。默认关闭

setBloomFilter:指定是否使用BloomFilter,可提高随机查询效率。默认关闭

setCompressionType:设定数据压缩类型。默认无压缩。

setMaxVersions:指定数据最大保存的版本个数。默认为3。

 

注意的是,一般我们不去setInMemory为true,默认是关闭的

 

3)关于入库

   官方建议

 table.setAutoFlush(false); //数据入库之前先设置此项为false

 table.setflushCommits();//入库完成后,手动刷入数据

注意:

  在入库过程中,put.setWriteToWAL(true/flase);

  关于这一项如果不希望大量数据在存储过程中丢失,建议设置为true,如果仅是在测试演练阶段,为了节省入库时间建议设置为false

 

4)关于获取表实例

HTablePool pool = new HTablePool(configuration, Integer.MAX_VALUE);

HTable table = (HTable) pool.getTable(tableName);

建议用表连接池的方式获取表,具体池有什么作用,我想用过数据库连接池的同学都知道,我就不再重复

不建议使用new HTable(configuration,tableName);的方式获取表

 

5)关于查询

 建议每个查询语句都放入try catch语句块,并且finally中要进行关闭ResultScanner实例以及将不使用的表重新放入到HTablePool中的操作,具体做法如下

Java代码  

1. public static void QueryAll(String tableName) {  

2.         HTablePool pool = new HTablePool(configuration, Integer.MAX_VALUE);  

3.         HTable table = null;  

4.         ResultScanner rs = null;  

5.         try {  

6.             Scan scan = new Scan();  

7.             table = (HTable) pool.getTable(tableName);  

8.             rs = table.getScanner(scan);  

9.             for (Result r : rs) {  

10.                 System.out.println("获得到rowkey:" + new String(r.getRow()));  

11.                 for (KeyValue keyValue : r.raw()) {  

12.                     System.out.println("列:" + new String(keyValue.getFamily())  

13.                             + "====值:" + new String(keyValue.getValue()));  

14.                 }  

15.             }  

16.         } catch (IOException e) {  

17.             e.printStackTrace();  

18.         }finally{  

19.             rs.close();// 最后还得关闭  

20.             pool.putTable(table); //实际应用过程中,pool获取实例的方式应该抽取为单例模式的,不应在每个方法都重新获取一次(单例明白?就是抽取到专门获取pool的逻辑类中,具体逻辑为如果pool存在着直接使用,如果不存在则new)  

21.         }  

22.     }  

 

 所以,以上代码有缺陷的地方,感兴趣的同学可以针对优化建议作出相应修改

 

十一:安装mahout

 1. 软件下载

http://mirror.bit.edu.cn/apache/mahout/

 

 2. 解压 至 /root/hadoop/mahout

Tar –zxvf mahout-distribution-0.9.tar.gz

 

 3. 配置环境变量

Vi /etc/profile

Export MAHOUT_HOME=/root/hadoop/mahout

Export PATH=$MAHOUT_HOME/bin:$PATH

    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

 

   4. 单击测试:

          数据准备:(验证未成功)

            cd /tmp
wget http://archive.ics.uci.edu/ml/databases/synthetic_control/synthetic_control.data
hadoop fs -mkdir testdata
hadoop fs -put synthetic_control.data testdata
hadoop fs -lsr testdata

hadoop集群来执行聚类算法
cd /usr/local/mahout

mahout org.apache.mahout.clustering.syntheticcontrol.canopy.Job
mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job
mahout org.apache.mahout.clustering.syntheticcontrol.fuzzykmeans.Job
mahout org.apache.mahout.clustering.syntheticcontrol.dirichlet.Job
mahout org.apache.mahout.clustering.syntheticcontrol.meanshift.Job

如果执行成功,在hdfs的/user/dev/output里面应该可以看到输出结果
GroupLens Data Sets http://www.grouplens.org/node/12,包括MovieLens Data Sets、Wikilens Data Set、Book-Crossing Data Set、Jester Joke Data Set、EachMovie Data Set
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: