搭建Java语言开发spark程序环境
2019-03-18 16:23
211 查看
搭建Java语言开发spark程序环境主要有以下几点:
- 安装jdk
- 安装maven
- 安装eclipse
- 安装hadoop
以上操作完成后打开eclipse,导入项目demo(maven的spark demo项目)
以下是关于spark的依赖包,在maven的pom.xml文件中添加,根据使用的不同版本的添加不同的版本依赖包
[code]<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> <exclusions> <exclusion> <artifactId>scala-library</artifactId> <groupId>org.scala-lang</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> </dependency> <!-- redis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <!-- json --> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20160810</version> </dependency> <!-- https://mvnrepository.com/artifact/net.sf.json-lib/json-lib --> <!-- <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> </dependency> --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <!-- Hbase依赖 --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.3.1</version> </dependency> <dependency> <artifactId>hbase-server</artifactId> <groupId>org.apache.hbase</groupId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> <version>1.3.1</version> </dependency> <dependency> <artifactId>hbase-client</artifactId> <groupId>org.apache.hbase</groupId> <version>1.3.1</version> </dependency> <dependency> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> <version>3.4.6</version> </dependency> <!-- hadoop依赖 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.3</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>2.7.3</version> </dependency> <!-- 其他依赖 --> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>commons-configuration</groupId> <artifactId>commons-configuration</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>12.0.1</version> </dependency> <dependency> <groupId>org.apache.htrace</groupId> <artifactId>htrace-core</artifactId> <version>3.1.0-incubating</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>com.yammer.metrics</groupId> <artifactId>metrics-core</artifactId> <version>2.2.0</version> </dependency> </dependencies>
以下是一个spark对接kafka的代码样例
[code]import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.IdentityHashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.collections.map.HashedMap; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import org.json.JSONObject; import com.cbbs.utils.BoilerCommonBean; import com.cbbs.utils.BoilerHighBean; import com.cbbs.utils.BoilerSuperHeatBean; import com.cbbs.utils.HbaseConfigUtil; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; public class SmartFactoryOrder2 implements Serializable { public static void main(String[] args) throws Exception { String brokers = "192.168.1.221:9092"; // String brokers = args[0]; String topics = "test";// testsql3 // String topics = args[1];// testsql3 String groups = "test-consumer-group"; SparkConf sparkConf = new SparkConf().setAppName("AppName").set("spark.defalut.parallelism", "500") .setMaster("local[*]");//本地运行需要,扔服务器上跑删除.setMaster("local[*]") JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(","))); // 设置kafka参数 Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", brokers); kafkaParams.put("bootstrap.servers", brokers); kafkaParams.put("group.id", groups); // 为方便测试。使用这种模式,实际应用使用latest // kafkaParams.put("auto.offset.reset", "earliest");// // group首次开始消费数据时的offset,有以下几个值可以cyber-data-library-monitor选择:latest、earliest、 // none kafkaParams.put("auto.offset.reset", "earliest"); kafkaParams.put("enable.auto.commit", false); kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferBrokers(), ConsumerStrategies.Subscribe(topicsSet, kafkaParams)); JavaDStream<Object> lines = messages.map(x -> x.value()); lines.print(); // 将数据以数组的形式返回 JavaDStream<String[]> words = lines.map(new Function<Object, String[]>() { private static final long serialVersionUID = 1L; @Override public String[] call(Object v1) throws Exception { String[] str = v1.toString().split(","); return str; } }); String tableName = "AAAA"; // 表名 String rowkey = null; String family = "O"; String lie = null; // 列名 words.foreachRDD(rdd -> { rdd.foreach(x -> { String arrf[] = x[3].replace("\"", "").split("-"); String str2= getData(tableName, arrf[0]+"FSFOrderSum", family, arrf[0]+"y"); if(str2==null){ insterRow(tableName, arrf[0]+"FSFOrderSum", family, arrf[0]+"y", 1+""); }else{ int i=Integer.parseInt(str2); String str3= String.valueOf(i+1); insterRow(tableName, arrf[0]+"FSFOrderSum", family, arrf[0]+"y", str3); } }); }); jssc.start(); try { jssc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } } public static void insterRow(String tableName, String rowkey, String colFamily, String col, String val) throws IOException { HbaseConfigUtil.init(); // Table table = // HbaseConfigUtil.connection.getTable(TableName.valueOf(tableName)); Table table = HbaseConfigUtil.connection.getTable(TableName.valueOf(Bytes.toBytes(tableName))); Put put = new Put(Bytes.toBytes(rowkey)); put.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col), Bytes.toBytes(val)); table.put(put); table.close(); HbaseConfigUtil.close(); } public static List<String> dateFomat(String tableName, String rowkey, String family, String lie,String startYYMMDD) { // 将开始时间切分s String arrf[] = startYYMMDD.split("-"); // 分别获取开始时间的年 月 日 String fy = arrf[0]; String fm = arrf[1]; String fd = arrf[2]; int intfy = Integer.valueOf(fy); int intfm = Integer.valueOf(fm); int intfd = Integer.valueOf(fd); rowkey = fy + "FSFOrderSum"; lie = fy + "y"; List<String> listReturn = new ArrayList<>(); listReturn.add(tableName); listReturn.add(rowkey); listReturn.add(family); listReturn.add(lie); return listReturn; } public static String getData(String tableName, String rowkey, String colFamily, String col) throws IOException { HbaseConfigUtil.init(); Table table = HbaseConfigUtil.connection.getTable(TableName.valueOf(tableName)); Get get = new Get(Bytes.toBytes(rowkey)); // 获取指定列族数据 get.addFamily(Bytes.toBytes(colFamily)); //获取指定列数据 get.addColumn(Bytes.toBytes(colFamily),Bytes.toBytes(col)); Result result = table.get(get); String str1= showCell(result); table.close(); HbaseConfigUtil.close(); return str1; } public static String showCell(Result result) { Cell[] cells = result.rawCells(); String str=null; for (Cell cell : cells) { System.out.println("RowName:" + new String(CellUtil.cloneRow(cell)) + " "); System.out.println("Timetamp:" + cell.getTimestamp() + " "); System.out.println("column Family:" + new String(CellUtil.cloneFamily(cell)) + " "); System.out.println("row Name:" + new String(CellUtil.cloneQualifier(cell)) + " "); System.out.println("value:" + new String(CellUtil.cloneValue(cell)) + " "); str= new String(CellUtil.cloneValue(cell)); } return str; } }
运行可能会存在的环境问题:
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
这是因为没有配置hadoop环境变量导致的,配置好环境变量后,运行测试。如果还有这个问题,看看Hadoop安装包中这个文件是否存在
如果不存在,下载一个加入bin目录下就可以了。
相关文章推荐
- 搭建java web开发环境、使用eclipse编写第一个java web程序
- Java语言的介绍,开发环境搭建
- Java语言简介和开发环境搭建
- Spark程序开发-环境搭建-程序编写-Debug调试-项目提交
- 基于Java语言的安卓程序编程之一环境搭建1
- Java语言实验机器与环境及JDK开发工具简介及Java程序开发步骤
- 搭建Java开发环境,书写你的第一个Java程序
- Spark+ECLIPSE+JAVA+MAVEN windows开发环境搭建及入门实例【附详细代码】
- Spark 开发环境搭建(4)IDEA Gradle+java方式 Gradle导出jar包
- Java开发环境搭建(JDK及其安装、运行第一个java程序)
- IDEA搭建Spark程序开发环境
- 搭建java开发环境、使用eclipse编写第一个java程序
- Java重修之路(一)历史背景,语言特点,开发环境搭建
- 1. java简介、开发环境搭建、第一个程序
- Spark+ECLIPSE+JAVA+MAVEN windows开发环境搭建及入门实例【附详细代码】
- JAVA基础(二)------第一个JAVA程序(开发环境搭建及配置环境变量)
- 在sublime text 3中搭建Java语言开发环境
- 使用Eclipse IDE搭建Apache Spark的Java开发环境
- java基础(1)-开发环境的搭建和第一个java程序的运行
- Java语言基础{Java_se(01)}-搭建Java开发环境-环境变量(环境配置)-Java的基本语法-Java的三大注释-关键字和保留字-Java中的语言分隔符