您的位置:首页 > 数据库 > Oracle

sparksql 从oracle读取数据然后整合到elasticsearch

2017-11-16 14:54 951 查看
pom.xm
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>com.ftms.spark</groupId>
<artifactId>ftms</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.0.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--<dependency>-->
<!--    <groupId>com.databricks</groupId>-->
<!--    <artifactId>spark-csv_2.10</artifactId>-->
<!--    <version>1.0.3</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>

<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!--<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>-->

<!--<dependency>-->
<!--<groupId>org.elasticsearch</groupId>-->
<!--<artifactId>elasticsearch-hadoop</artifactId>-->
<!--<version>5.5.2</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-core_2.10</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-sql_2.10</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>org.apache.storm</groupId>-->
<!--<artifactId>storm-core</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>cascading</groupId>-->
<!--<artifactId>cascading-hadoop</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.10</artifactId>
<version>5.5.1</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

package com.ftms.scala.result

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql.EsSparkSQL
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

/**
* Created by othc on 2017/10/10.
*/
object Test {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("import").setMaster("local[2]")
.set("spark.sql.warehouse.dir","file:///C:/WorkSpace/IdeaProjects/FTMS_MAVEN/spark-warehouse")
// .set("es.index.auto.create", "true")
.set("es.nodes", "192.168.1.212:9200")
.set("es.mapping.id", "VINNO")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

//车辆
val jdbcMapv = Map("url" -> "jdbc:oracle:thin:@//192.168.1.212:1521/ftms",
"user" -> "ftms",

d4da
"password" -> "ftms",
"dbtable" -> "MTD_VEHICLE",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val jdbcDFv = sqlContext.read.options(jdbcMapv).format("jdbc").load
jdbcDFv.registerTempTable("vehicle")
val data: DataFrame = sqlContext.sql("select * from vehicle")
println(data.count())

//联系人信息
val jdbcMapm = Map("url" -> "jdbc:oracle:thin:@//192.168.1.212:1521/ftms",
"user" -> "ftms",
"password" -> "ftms",
"dbtable" -> "MTD_CONTACTINFORMATION",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val jdbcDFm = sqlContext.read.options(jdbcMapm).format("jdbc").load
jdbcDFm.registerTempTable("information")
val info: DataFrame = sqlContext.sql("select * from information")

//join
val join: DataFrame = sqlContext.sql(
"""
|select v.*,concat_ws(',', collect_set(n.CONTACTDETAIL)) as CONTACTDETAIL from vehicle v
|left join (
|select m.* from information m where m.CONTACTTYPE in ('1','2','3','4')
|) n on v.customerid=n.customerid
|group by v.VEHICLEID,v.VINNO,v.CUSTOMERID,v.VEHICLETYPE,v.VEHICLEUSETYPE,v.VEHICLENAMECODE,
| v.VEHICLENAME,v.VEHICLEGRADE,v.MISSION,v.MODEL,v.MODELYEAR,v.SFX,v.COLORCODE,v.COLORNAME,
| v.ENGINENO,v.EXHAUST,v.REGISTERNO,v.BUYDATE,v.SALESDEALERCODE,v.DEALERCODE,v.BILLTIME,v.BILLPRICE,
| v.BILLNUMBER,v.DELIVERYENDDATE,v.BODYPRICE,v.BODYCUT,v.BODYPAY,v.TAXRATE,v.TAX,v.REMARK,v.LOANSTATE,
| v.LOANGIVEOUTTIME,v.LOANORGRAN,v.PRODUCTYPE,v.YEARRATE,v.DOWNPAYMENT,v.LOANAMOUNT,v.INVALIDDATA,
| v.CREDATE,v.CREATEUSER,v.UPDATETIME,v.UPDATEUSER,v.SALESDATE,v.CANCELDATE,v.BUYERID,
| v.LOANPERIED,v.ISTTIME,v.UPTTIME,v.DELTIME
""".stripMargin
)

// join.rdd.repartition(1).saveAsTextFile("C:\\data\\ttttt")

val schema: StructType = join.schema
println(schema)

val res = join.select(join.col("VEHICLEID").cast(StringType).as("vehicleid"),
join.col("VINNO").as("vinno"),join.col("CUSTOMERID").as("customerid"),join.col("VEHICLETYPE").as("vehicletype"),
join.col("VEHICLEUSETYPE").as("vehicleusetype"),join.col("VEHICLENAMECODE").as("vehiclenamecode"),join.col("VEHICLENAME").as("vehiclename"),
join.col("VEHICLEGRADE").as("vehicleusetype"),join.col("MISSION").as("mission"),join.col("MODEL").as("model"),
join.col("MODELYEAR").as("modelyear"),join.col("SFX").as("sfx"),join.col("COLORCODE").as("colorcode"),join.col("COLORNAME").as("colorname"),
join.col("ENGINENO").as("engineno"),join.col("EXHAUST").as("exhaust"),join.col("REGISTERNO").as("registerno"),join.col("BUYDATE").as("buydate"),
join.col("SALESDEALERCODE").as("salesdealercode"),join.col("DEALERCODE").as("dealercode"),join.col("BILLTIME").as("billtime"),join.col("BILLPRICE").cast(StringType).as("billprice"),
join.col("BILLNUMBER").as("billnumber"),join.col("DELIVERYENDDATE").as("deliveryendddate"),join.col("BODYPRICE").cast(IntegerType).as("bodyprice"),join.col("BODYCUT").cast(IntegerType).as("bodycut"),
join.col("BODYPAY").cast(IntegerType).as("bodypay"),join.col("TAXRATE").cast(IntegerType).as("taxrate"),join.col("TAX").cast(IntegerType).as("tax"),join.col("REMARK").as("remark"),
join.col("LOANSTATE"),join.col("LOANGIVEOUTTIME"),join.col("LOANORGRAN"),join.col("PRODUCTYPE"),
join.col("YEARRATE").cast(IntegerType),join.col("DOWNPAYMENT").cast(IntegerType),join.col("LOANAMOUNT").cast(IntegerType),join.col("INVALIDDATA"),
join.col("CREDATE"),join.col("CREATEUSER"),join.col("UPDATETIME"),join.col("UPDATEUSER"),
join.col("SALESDATE"),join.col("CANCELDATE"),join.col("BUYERID"),join.col("LOANPERIED"),
join.col("ISTTIME"),join.col("UPTTIME"),join.col("DELTIME"),join.col("CONTACTDETAIL")
)

EsSparkSQL.saveToEs(res,"mtd_vehicle_order_history/mtd_vehicle_type")
sc.stop()

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐