您的位置:首页 > 编程语言 > Java开发

搭建Java语言开发spark程序环境

2019-03-18 16:23 211 查看

搭建Java语言开发spark程序环境主要有以下几点:

  1. 安装jdk
  2. 安装maven
  3. 安装eclipse
  4. 安装hadoop

 以上操作完成后打开eclipse,导入项目demo(maven的spark demo项目)

以下是关于spark的依赖包,在maven的pom.xml文件中添加,根据使用的不同版本的添加不同的版本依赖包

[code]<dependencies>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- redis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<!-- json -->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>

<!-- https://mvnrepository.com/artifact/net.sf.json-lib/json-lib -->
<!-- <dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
</dependency>
-->

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>

<!-- Hbase依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<artifactId>hbase-server</artifactId>
<groupId>org.apache.hbase</groupId>
<version>1.3.1</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>1.3.1</version>
</dependency>

<dependency>
<artifactId>hbase-client</artifactId>
<groupId>org.apache.hbase</groupId>
<version>1.3.1</version>
</dependency>

<dependency>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
<version>3.4.6</version>
</dependency>

<!-- hadoop依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.7.3</version>
</dependency>

<!-- 其他依赖 -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
<version>1.6</version>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>12.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
<version>3.1.0-incubating</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>

</dependencies>

以下是一个spark对接kafka的代码样例

[code]import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.json.JSONObject;
import com.cbbs.utils.BoilerCommonBean;
import com.cbbs.utils.BoilerHighBean;
import com.cbbs.utils.BoilerSuperHeatBean;
import com.cbbs.utils.HbaseConfigUtil;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;
public class SmartFactoryOrder2 implements Serializable {

public static void main(String[] args) throws Exception {
String brokers = "192.168.1.221:9092";
// String brokers = args[0];
String topics = "test";// testsql3

// String topics = args[1];// testsql3
String groups = "test-consumer-group";
SparkConf sparkConf = new SparkConf().setAppName("AppName").set("spark.defalut.parallelism", "500")
.setMaster("local[*]");//本地运行需要,扔服务器上跑删除.setMaster("local[*]")
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
// 设置kafka参数
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("group.id", groups);
// 为方便测试。使用这种模式,实际应用使用latest
// kafkaParams.put("auto.offset.reset", "earliest");//
// group首次开始消费数据时的offset,有以下几个值可以cyber-data-library-monitor选择:latest、earliest、
// none
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferBrokers(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
JavaDStream<Object> lines = messages.map(x -> x.value());
lines.print();
// 将数据以数组的形式返回
JavaDStream<String[]> words = lines.map(new Function<Object, String[]>() {
private static final long serialVersionUID = 1L;
@Override
public String[] call(Object v1) throws Exception {
String[] str = v1.toString().split(",");
return str;
}
});
String tableName = "AAAA"; // 表名
String rowkey = null;
String family = "O";
String lie = null; // 列名
words.foreachRDD(rdd -> {
rdd.foreach(x -> {
String arrf[] = x[3].replace("\"", "").split("-");
String str2=	getData(tableName, arrf[0]+"FSFOrderSum", family, arrf[0]+"y");
if(str2==null){
insterRow(tableName, arrf[0]+"FSFOrderSum", family, arrf[0]+"y", 1+"");
}else{
int i=Integer.parseInt(str2);
String str3= String.valueOf(i+1);
insterRow(tableName, arrf[0]+"FSFOrderSum", family, arrf[0]+"y", str3);
}
});
});
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void insterRow(String tableName, String rowkey, String colFamily, String col, String val)
throws IOException {
HbaseConfigUtil.init();
// Table table =
// HbaseConfigUtil.connection.getTable(TableName.valueOf(tableName));
Table table = HbaseConfigUtil.connection.getTable(TableName.valueOf(Bytes.toBytes(tableName)));
Put put = new Put(Bytes.toBytes(rowkey));
put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(val));
table.put(put);
table.close();
HbaseConfigUtil.close();
}

public static List<String> dateFomat(String tableName, String rowkey, String family, String lie,String startYYMMDD) {
// 将开始时间切分s
String arrf[] = startYYMMDD.split("-");
// 分别获取开始时间的年 月 日
String fy = arrf[0];
String fm = arrf[1];
String fd = arrf[2];
int intfy = Integer.valueOf(fy);
int intfm = Integer.valueOf(fm);
int intfd = Integer.valueOf(fd);
rowkey = fy + "FSFOrderSum";
lie = fy + "y";
List<String> listReturn = new ArrayList<>();
listReturn.add(tableName);
listReturn.add(rowkey);
listReturn.add(family);
listReturn.add(lie);
return listReturn;
}

public static String  getData(String tableName, String rowkey, String colFamily, String col) throws IOException {
HbaseConfigUtil.init();
Table table = HbaseConfigUtil.connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowkey));
// 获取指定列族数据
get.addFamily(Bytes.toBytes(colFamily));
//获取指定列数据
get.addColumn(Bytes.toBytes(colFamily),Bytes.toBytes(col));
Result result = table.get(get);

String str1=  showCell(result);

table.close();
HbaseConfigUtil.close();
return str1;
}
public static String showCell(Result result) {
Cell[] cells = result.rawCells();
String str=null;
for (Cell cell : cells) {
System.out.println("RowName:" + new String(CellUtil.cloneRow(cell)) + " ");
System.out.println("Timetamp:" + cell.getTimestamp() + " ");
System.out.println("column Family:" + new String(CellUtil.cloneFamily(cell)) + " ");
System.out.println("row Name:" + new String(CellUtil.cloneQualifier(cell)) + " ");
System.out.println("value:" + new String(CellUtil.cloneValue(cell)) + " ");
str= new String(CellUtil.cloneValue(cell));

}
return str;
}
}

运行可能会存在的环境问题:

 java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

这是因为没有配置hadoop环境变量导致的,配置好环境变量后,运行测试。如果还有这个问题,看看Hadoop安装包中这个文件是否存在

 如果不存在,下载一个加入bin目录下就可以了。

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: