sparksql 从oracle读取数据然后整合到elasticsearch
2017-11-16 14:54
951 查看
pom.xm
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.ftms.spark</groupId>
<artifactId>ftms</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.0.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--<dependency>-->
<!-- <groupId>com.databricks</groupId>-->
<!-- <artifactId>spark-csv_2.10</artifactId>-->
<!-- <version>1.0.3</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!--<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>-->
<!--<dependency>-->
<!--<groupId>org.elasticsearch</groupId>-->
<!--<artifactId>elasticsearch-hadoop</artifactId>-->
<!--<version>5.5.2</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-core_2.10</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-sql_2.10</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>org.apache.storm</groupId>-->
<!--<artifactId>storm-core</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>cascading</groupId>-->
<!--<artifactId>cascading-hadoop</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.10</artifactId>
<version>5.5.1</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.ftms.scala.result
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql.EsSparkSQL
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/**
* Created by othc on 2017/10/10.
*/
object Test {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("import").setMaster("local[2]")
.set("spark.sql.warehouse.dir","file:///C:/WorkSpace/IdeaProjects/FTMS_MAVEN/spark-warehouse")
// .set("es.index.auto.create", "true")
.set("es.nodes", "192.168.1.212:9200")
.set("es.mapping.id", "VINNO")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
//车辆
val jdbcMapv = Map("url" -> "jdbc:oracle:thin:@//192.168.1.212:1521/ftms",
"user" -> "ftms",
d4da
"password" -> "ftms",
"dbtable" -> "MTD_VEHICLE",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val jdbcDFv = sqlContext.read.options(jdbcMapv).format("jdbc").load
jdbcDFv.registerTempTable("vehicle")
val data: DataFrame = sqlContext.sql("select * from vehicle")
println(data.count())
//联系人信息
val jdbcMapm = Map("url" -> "jdbc:oracle:thin:@//192.168.1.212:1521/ftms",
"user" -> "ftms",
"password" -> "ftms",
"dbtable" -> "MTD_CONTACTINFORMATION",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val jdbcDFm = sqlContext.read.options(jdbcMapm).format("jdbc").load
jdbcDFm.registerTempTable("information")
val info: DataFrame = sqlContext.sql("select * from information")
//join
val join: DataFrame = sqlContext.sql(
"""
|select v.*,concat_ws(',', collect_set(n.CONTACTDETAIL)) as CONTACTDETAIL from vehicle v
|left join (
|select m.* from information m where m.CONTACTTYPE in ('1','2','3','4')
|) n on v.customerid=n.customerid
|group by v.VEHICLEID,v.VINNO,v.CUSTOMERID,v.VEHICLETYPE,v.VEHICLEUSETYPE,v.VEHICLENAMECODE,
| v.VEHICLENAME,v.VEHICLEGRADE,v.MISSION,v.MODEL,v.MODELYEAR,v.SFX,v.COLORCODE,v.COLORNAME,
| v.ENGINENO,v.EXHAUST,v.REGISTERNO,v.BUYDATE,v.SALESDEALERCODE,v.DEALERCODE,v.BILLTIME,v.BILLPRICE,
| v.BILLNUMBER,v.DELIVERYENDDATE,v.BODYPRICE,v.BODYCUT,v.BODYPAY,v.TAXRATE,v.TAX,v.REMARK,v.LOANSTATE,
| v.LOANGIVEOUTTIME,v.LOANORGRAN,v.PRODUCTYPE,v.YEARRATE,v.DOWNPAYMENT,v.LOANAMOUNT,v.INVALIDDATA,
| v.CREDATE,v.CREATEUSER,v.UPDATETIME,v.UPDATEUSER,v.SALESDATE,v.CANCELDATE,v.BUYERID,
| v.LOANPERIED,v.ISTTIME,v.UPTTIME,v.DELTIME
""".stripMargin
)
// join.rdd.repartition(1).saveAsTextFile("C:\\data\\ttttt")
val schema: StructType = join.schema
println(schema)
val res = join.select(join.col("VEHICLEID").cast(StringType).as("vehicleid"),
join.col("VINNO").as("vinno"),join.col("CUSTOMERID").as("customerid"),join.col("VEHICLETYPE").as("vehicletype"),
join.col("VEHICLEUSETYPE").as("vehicleusetype"),join.col("VEHICLENAMECODE").as("vehiclenamecode"),join.col("VEHICLENAME").as("vehiclename"),
join.col("VEHICLEGRADE").as("vehicleusetype"),join.col("MISSION").as("mission"),join.col("MODEL").as("model"),
join.col("MODELYEAR").as("modelyear"),join.col("SFX").as("sfx"),join.col("COLORCODE").as("colorcode"),join.col("COLORNAME").as("colorname"),
join.col("ENGINENO").as("engineno"),join.col("EXHAUST").as("exhaust"),join.col("REGISTERNO").as("registerno"),join.col("BUYDATE").as("buydate"),
join.col("SALESDEALERCODE").as("salesdealercode"),join.col("DEALERCODE").as("dealercode"),join.col("BILLTIME").as("billtime"),join.col("BILLPRICE").cast(StringType).as("billprice"),
join.col("BILLNUMBER").as("billnumber"),join.col("DELIVERYENDDATE").as("deliveryendddate"),join.col("BODYPRICE").cast(IntegerType).as("bodyprice"),join.col("BODYCUT").cast(IntegerType).as("bodycut"),
join.col("BODYPAY").cast(IntegerType).as("bodypay"),join.col("TAXRATE").cast(IntegerType).as("taxrate"),join.col("TAX").cast(IntegerType).as("tax"),join.col("REMARK").as("remark"),
join.col("LOANSTATE"),join.col("LOANGIVEOUTTIME"),join.col("LOANORGRAN"),join.col("PRODUCTYPE"),
join.col("YEARRATE").cast(IntegerType),join.col("DOWNPAYMENT").cast(IntegerType),join.col("LOANAMOUNT").cast(IntegerType),join.col("INVALIDDATA"),
join.col("CREDATE"),join.col("CREATEUSER"),join.col("UPDATETIME"),join.col("UPDATEUSER"),
join.col("SALESDATE"),join.col("CANCELDATE"),join.col("BUYERID"),join.col("LOANPERIED"),
join.col("ISTTIME"),join.col("UPTTIME"),join.col("DELTIME"),join.col("CONTACTDETAIL")
)
EsSparkSQL.saveToEs(res,"mtd_vehicle_order_history/mtd_vehicle_type")
sc.stop()
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.ftms.spark</groupId>
<artifactId>ftms</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<spark.version>2.0.0</spark.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--<dependency>-->
<!-- <groupId>com.databricks</groupId>-->
<!-- <artifactId>spark-csv_2.10</artifactId>-->
<!-- <version>1.0.3</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-xml</artifactId>
<version>2.11.0-M4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.6</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.16</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>-->
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>${spark.version}</version>
</dependency>-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<!--<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>-->
<!--<dependency>-->
<!--<groupId>org.elasticsearch</groupId>-->
<!--<artifactId>elasticsearch-hadoop</artifactId>-->
<!--<version>5.5.2</version>-->
<!--<exclusions>-->
<!--<exclusion>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-core_2.10</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-sql_2.10</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>org.apache.storm</groupId>-->
<!--<artifactId>storm-core</artifactId>-->
<!--</exclusion>-->
<!--<exclusion>-->
<!--<groupId>cascading</groupId>-->
<!--<artifactId>cascading-hadoop</artifactId>-->
<!--</exclusion>-->
<!--</exclusions>-->
<!--</dependency>-->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.10</artifactId>
<version>5.5.1</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.ftms.scala.result
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql.EsSparkSQL
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
/**
* Created by othc on 2017/10/10.
*/
object Test {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf().setAppName("import").setMaster("local[2]")
.set("spark.sql.warehouse.dir","file:///C:/WorkSpace/IdeaProjects/FTMS_MAVEN/spark-warehouse")
// .set("es.index.auto.create", "true")
.set("es.nodes", "192.168.1.212:9200")
.set("es.mapping.id", "VINNO")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
//车辆
val jdbcMapv = Map("url" -> "jdbc:oracle:thin:@//192.168.1.212:1521/ftms",
"user" -> "ftms",
d4da
"password" -> "ftms",
"dbtable" -> "MTD_VEHICLE",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val jdbcDFv = sqlContext.read.options(jdbcMapv).format("jdbc").load
jdbcDFv.registerTempTable("vehicle")
val data: DataFrame = sqlContext.sql("select * from vehicle")
println(data.count())
//联系人信息
val jdbcMapm = Map("url" -> "jdbc:oracle:thin:@//192.168.1.212:1521/ftms",
"user" -> "ftms",
"password" -> "ftms",
"dbtable" -> "MTD_CONTACTINFORMATION",
"driver" -> "oracle.jdbc.driver.OracleDriver")
val jdbcDFm = sqlContext.read.options(jdbcMapm).format("jdbc").load
jdbcDFm.registerTempTable("information")
val info: DataFrame = sqlContext.sql("select * from information")
//join
val join: DataFrame = sqlContext.sql(
"""
|select v.*,concat_ws(',', collect_set(n.CONTACTDETAIL)) as CONTACTDETAIL from vehicle v
|left join (
|select m.* from information m where m.CONTACTTYPE in ('1','2','3','4')
|) n on v.customerid=n.customerid
|group by v.VEHICLEID,v.VINNO,v.CUSTOMERID,v.VEHICLETYPE,v.VEHICLEUSETYPE,v.VEHICLENAMECODE,
| v.VEHICLENAME,v.VEHICLEGRADE,v.MISSION,v.MODEL,v.MODELYEAR,v.SFX,v.COLORCODE,v.COLORNAME,
| v.ENGINENO,v.EXHAUST,v.REGISTERNO,v.BUYDATE,v.SALESDEALERCODE,v.DEALERCODE,v.BILLTIME,v.BILLPRICE,
| v.BILLNUMBER,v.DELIVERYENDDATE,v.BODYPRICE,v.BODYCUT,v.BODYPAY,v.TAXRATE,v.TAX,v.REMARK,v.LOANSTATE,
| v.LOANGIVEOUTTIME,v.LOANORGRAN,v.PRODUCTYPE,v.YEARRATE,v.DOWNPAYMENT,v.LOANAMOUNT,v.INVALIDDATA,
| v.CREDATE,v.CREATEUSER,v.UPDATETIME,v.UPDATEUSER,v.SALESDATE,v.CANCELDATE,v.BUYERID,
| v.LOANPERIED,v.ISTTIME,v.UPTTIME,v.DELTIME
""".stripMargin
)
// join.rdd.repartition(1).saveAsTextFile("C:\\data\\ttttt")
val schema: StructType = join.schema
println(schema)
val res = join.select(join.col("VEHICLEID").cast(StringType).as("vehicleid"),
join.col("VINNO").as("vinno"),join.col("CUSTOMERID").as("customerid"),join.col("VEHICLETYPE").as("vehicletype"),
join.col("VEHICLEUSETYPE").as("vehicleusetype"),join.col("VEHICLENAMECODE").as("vehiclenamecode"),join.col("VEHICLENAME").as("vehiclename"),
join.col("VEHICLEGRADE").as("vehicleusetype"),join.col("MISSION").as("mission"),join.col("MODEL").as("model"),
join.col("MODELYEAR").as("modelyear"),join.col("SFX").as("sfx"),join.col("COLORCODE").as("colorcode"),join.col("COLORNAME").as("colorname"),
join.col("ENGINENO").as("engineno"),join.col("EXHAUST").as("exhaust"),join.col("REGISTERNO").as("registerno"),join.col("BUYDATE").as("buydate"),
join.col("SALESDEALERCODE").as("salesdealercode"),join.col("DEALERCODE").as("dealercode"),join.col("BILLTIME").as("billtime"),join.col("BILLPRICE").cast(StringType).as("billprice"),
join.col("BILLNUMBER").as("billnumber"),join.col("DELIVERYENDDATE").as("deliveryendddate"),join.col("BODYPRICE").cast(IntegerType).as("bodyprice"),join.col("BODYCUT").cast(IntegerType).as("bodycut"),
join.col("BODYPAY").cast(IntegerType).as("bodypay"),join.col("TAXRATE").cast(IntegerType).as("taxrate"),join.col("TAX").cast(IntegerType).as("tax"),join.col("REMARK").as("remark"),
join.col("LOANSTATE"),join.col("LOANGIVEOUTTIME"),join.col("LOANORGRAN"),join.col("PRODUCTYPE"),
join.col("YEARRATE").cast(IntegerType),join.col("DOWNPAYMENT").cast(IntegerType),join.col("LOANAMOUNT").cast(IntegerType),join.col("INVALIDDATA"),
join.col("CREDATE"),join.col("CREATEUSER"),join.col("UPDATETIME"),join.col("UPDATEUSER"),
join.col("SALESDATE"),join.col("CANCELDATE"),join.col("BUYERID"),join.col("LOANPERIED"),
join.col("ISTTIME"),join.col("UPTTIME"),join.col("DELTIME"),join.col("CONTACTDETAIL")
)
EsSparkSQL.saveToEs(res,"mtd_vehicle_order_history/mtd_vehicle_type")
sc.stop()
}
}
相关文章推荐
- spark-sql与elasticsearch整合&测试
- Spark读取HDFS上的SQL语句,然后导入MySQL
- Spark SQL读取hive数据时报找不到mysql驱动
- Spark 整合hive 实现数据的读取输出
- spark中读取elasticsearch数据
- 关于spark读取elasticsearch中数据,但是无法实现过滤数据的问题
- Spark通过https的方式读取elasticsearch中的数据
- SparkSQL读取HBase数据,通过自定义外部数据源(hbase的Hive外关联表)
- spark-sql读取映射hbase数据的hive外部表
- SPark SQL 从 DB 读取数据方法和方式
- SparkSQL读取Hive中的数据
- oracle sql 随机读取N条数据
- spark 读取Oracle数据,做成文件并统计分析
- SQL 数据的导入导出,对远程(MSsql,OracleAccess,)数据库的操作以及读取Excel,txt文件中的数据
- Spark Streaming 读取Kafka数据写入Elasticsearch
- Oracle 使用PL/SQL 读取CSV文件,将数据拆分到表中丢失数据行
- spark-sql与elasticsearch整合&测试
- spark SQL (五)数据源 Data Source----json hive jdbc等数据的的读取与加载
- 数据库操作_连接SQL Server数据库示例;连接ACCESS数据库;连接到 Oracle 数据库示例;SqlCommand 执行SQL命令示例;SqlDataReader 读取数据示例;使用DataAdapter填充数据到DataSet;使用DataTable存储数据库表;将数据库数据填充到 XML 文件;10 使用带输入参数的存储过程;11 使用带输入、输出参数的存储过程示;12 获得数据库中表的数目和名称;13 保存图片到SQL Server数据库示例;14 获得插入记录标识号;Exce
- spark 读取elasticsearch中数据不完整问题