您的位置:首页 > 数据库 > MySQL

Spark Standalone模式集群 对TIDB/Mysql支持

2018-02-07 16:38 459 查看



Spark让TIDB、MySQL一些大的查询速度提升多倍

集群环境:

CentOS 7.4

Master:192.168.106.42

Slave  : 192.168.106.43 192.168.106.46 192.168.106.47

安装java1.8

hostnamectl set-hostname master-42

mkdir /usr/java

cp /mnt/jdk-8u111-linux-x64.gz /usr/java/

cd /usr/java

tar -zxvf jdk-8u111-linux-x64.gz 

source /etc/profile

java -version

master节点192.168.106.42安装spark

cd /opt/

tar -xzvf spark-2.1.1-bin-hadoop2.7.tgz 

cd spark-2.1.1-bin-hadoop2.7

./sbin/start-master.sh -h 192.168.106.42

支持TIDB

复制tispark-SNAPSHOT-jar-with-dependencies.jar 到/opt/spark-2.1.1-bin-hadoop2.7/jars

支持MySql

复制mysql-connector-java-5.1.44-bin.jar到/opt/spark-2.1.1-bin-hadoop2.7/jars

./sbin/stop-master.sh -h 192.168.106.42

./sbin/start-master.sh -h 192.168.106.42

注意关闭防火墙

iptables -F

slave节点192.168.106.43安装spark

cd /opt/

tar -xzvf spark-2.1.1-bin-hadoop2.7.tgz

cd /opt/spark-2.1.1-bin-hadoop2.7

支持TIDB

复制tispark-SNAPSHOT-jar-with-dependencies.jar 到/opt/spark-2.1.1-bin-hadoop2.7/jars

支持MySql

复制mysql-connector-java-5.1.44-bin.jar到/opt/spark-2.1.1-bin-hadoop2.7/jars.

/sbin/start-slave.sh spark://192.168.106.42:7077

分别安装slave节点192.168.106.46 和192.168.106.47

通过web监控可以要看到各节点
http://192.168.106.42:8080/
Spark Master at spark://192.168.106.42:7077

URL: spark://192.168.106.42:7077

REST URL: spark://192.168.106.42:6066 (cluster mode)

Alive Workers: 3

Cores in use: 12 Total, 12 Used

Memory in use: 19.8 GB Total, 3.0 GB Used

Applications: 1 Running, 0 Completed

Drivers: 0 Running, 0 Completed

Status: ALIVE

Workers

Worker Id                                                               Address        StateCores      Memory

worker-20180205145117-192.168.106.43-35632 192.168.106.43:35632ALIVE4 (4 Used)6.6 GB (1024.0 MB Used)

worker-20180205145204-192.168.106.46-36288 192.168.106.46:36288ALIVE4 (4 Used)6.6 GB (1024.0 MB Used)

worker-20180205145314-192.168.106.47-33544 192.168.106.47:33544ALIVE4 (4 Used)6.6 GB (1024.0 MB Used)

Running Applications

Application ID                                   NameCoresMemory  per NodeSubmitted TimeUserState 
     Duration

app-20180205145503-0000(kill)  Spark shell  12 1024.0 MB2018/02/05 14:55:03   rootRUNNING12
min

Completed Applications

Application  ID                           NameCoresMemory per NodeSubmitted Time 
User   State     Duration

app-20180205145503-0000 Spark shell
12     1024.0 MB
  2018/02/05 14:55:03 root
FINISHED 27 min

单机方式测试spark

[root@master-42 bin]# ./spark-shell 

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/opt/spark-2.1.1-bin-hadoop2.7/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/opt/spark-2.1.1-bin-hadoop2.7/jars/tispark-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

18/01/31 15:53:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

18/01/31 15:53:10 WARN Utils: Your hostname, master-42 resolves to a loopback address: 127.0.0.1; using 192.168.106.42 instead (on interface eth0)

18/01/31 15:53:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address

18/01/31 15:53:18 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

Spark context Web UI available at http://192.168.106.42:4040
Spark context available as 'sc' (master = local[*], app id = local-1517385191540).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1

      /_/

         

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)

Type in expressions to have them evaluated.

Type :help for more information.

scala> 

集群方式测试spark

[root@slave-43 spark-2.1.1-bin-hadoop2.7]# ./bin/spark-shell --master spark://192.168.106.42:7077

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

Setting default log level to "WARN".

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/opt/spark-2.1.1-bin-hadoop2.7/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/opt/spark-2.1.1-bin-hadoop2.7/jars/tispark-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

18/02/05 14:55:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

18/02/05 14:55:10 WARN Utils: Your hostname, slave-43 resolves to a loopback address: 127.0.0.1; using 192.168.106.43 instead (on interface eth0)

18/02/05 14:55:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address

18/02/05 14:55:18 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0

18/02/05 14:55:18 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException

18/02/05 14:55:19 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException

Spark context Web UI available at http://192.168.106.43:4040
Spark context available as 'sc' (master = spark://192.168.106.42:7077, app id = app-20180205145503-0000).

Spark session available as 'spark'.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 2.1.1

      /_/

         

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)

Type in expressions to have them evaluated.

Type :help for more information.

读取TiDB数据

scala> import org.apache.spark.sql.TiContext

import org.apache.spark.sql.TiContext

scala> val ti = new TiContext(spark,List("192.168.40.161:2379"))

ti: org.apache.spark.sql.TiContext = org.apache.spark.sql.TiContext@1b7648f

scala> ti.tidbMapDatabase("cqgs")

scala> spark.sql("select count(*) from ZJ_XK_714_DOWN_L1_GR").show

+--------+                                                                      

|count(1)|

+--------+

| 1550177|

+--------+

读取MySQL数据

scala>  import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.SQLContext

scala> val sqlContext = new SQLContext(sc)

warning: there was one deprecation warning; re-run with -deprecation for details

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@136cad3e

scala> val url ="jdbc:mysql://192.168.106.139:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull"

url: String = jdbc:mysql://192.168.106.139:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull

scala> val prop = new java.util.Properties

prop: java.util.Properties = {}

scala> prop.setProperty("user","hhj")

res0: Object = null

scala> prop.setProperty("password","hhj.123")

res1: Object = null

scala> val stud_scoreDF = sqlContext.read.jdbc(url,"t1",prop)

Mon Feb 05 16:57:56 CST 2018 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set.
For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

stud_scoreDF: org.apache.spark.sql.DataFrame = [id: int, name: string]

scala> stud_scoreDF.show()

Mon Feb 05 16:58:19 CST 2018 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set.
For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

+---+----+

| id|name|

+---+----+

|  1| HHJ|

|  2| ABC|

|  3| HXW|

|  5| HHJ|

|  7| AHJ|

| 10| ABC|

|101| ABC|

+---+----+

scala> 

读取本地文本数据

scala> val textfile=sc.textFile("file:///opt/spark-2.1.1-bin-hadoop2.7/README.md")

textfile: org.apache.spark.rdd.RDD[String] = file:///opt/spark-2.1.1-bin-hadoop2.7/README.md MapPartitionsRDD[11] at textFile at <console>:26

scala> textfile.count()

res6: Long = 104                                                                

scala> textfile.first()

res7: String = # Apache Spark

读取本地json数据

scala> 

scala> val df=spark.read.json("file:///opt/spark-2.1.1-bin-hadoop2.7/examples/src/main/resources/employees.json")

df: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]

scala> df.show()

+-------+------+

|   name|salary|

+-------+------+

|Michael|  3000|

|   Andy|  4500|

| Justin|  3500|

|  Berta|  4000|

+-------+------+

scala> df.select($"name",$"salary"+1000).show()

+-------+---------------+

|   name|(salary + 1000)|

+-------+---------------+

|Michael|           4000|

|   Andy|           5500|

| Justin|           4500|

|  Berta|           5000|

+-------+---------------+

scala> df.filter($"salary">=4500).show()

+----+------+

|name|salary|

+----+------+

|Andy|  4500|

+----+------+

使用SQL语句来读取

scala> df.createOrReplaceTempView("employees")

scala> val sqldf=spark.sql("select * from employees")

sqldf: org.apache.spark.sql.DataFrame = [name: string, salary: bigint]

scala> sqldf.show()

+-------+------+

|   name|salary|

+-------+------+

|Michael|  3000|

|   Andy|  4500|

| Justin|  3500|

|  Berta|  4000|

+-------+------+
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: