您的位置:首页 > 运维架构 > Shell

IgniteRDD学习笔记(四)在SparkShell上部署测试IgniteRDD

2015-10-27 10:15 525 查看
启动服务

1)下载Spark到每个节点

2)下载Ignite到每个节点

3)在Master节点进入$SPARK_HOME执行下面的脚本

sbin/start-master.sh


master URL检查SPARK日志:spark://master_host:master_port

Web UI url 检查SPARK日志:http://master_host:8080

4)在每个Worker节点上进入$SPARK_HOME执行以下脚本

bin/spark-class org.apache.spark.deploy.worker.Worker spark://master_host:master_port


所有节点启动后,在WEB UI检查状态,所有子节点应该都是ALIVE的

5)在每个worker节点进入$IGNITE_HOME执行下面的脚本启动igniteNodes

bin/ignite.sh


在SPARK_SHELL里测试IgniteRDD

1)启动spark shell

i)通过提供maven地址来启动

<span style="font-size:14px;">./bin/spark-shell
--packages org.apache.ignite:ignite-spark:1.3.0
--master spark://master_host:master_port
--repositories http://www.gridgainsystems.com/nexus/content/repositories/external</span>[/code] 
ii)通过jar包来启动

<span style="font-size:14px;">./bin/spark-shell --jars path/to/ignite-core.jar,path/to/ignite-spark.jar,path/to/cache-api.jar,path/to/ignite-log4j.jar,path/to/log4j.jar --master spark://master_host:master_port</span>


iii)如果需要spring启动(暂时不懂是什么意思)

<span style="font-size:14px;">./bin/spark-shell
--packages org.apache.ignite:ignite-spark:1.3.0,org.apache.ignite:ignite-spring:1.3.0
--master spark://master_host:master_port
--repositories http://www.gridgainsystems.com/nexus/content/repositories/external</span>[/code] 
2)通过默认配置来创建一个IgniteContext实例

<span style="font-size:14px;">import org.apache.ignite.spark._
import org.apache.ignite.configuration._

val ic = new IgniteContext[Integer, Integer](sc, () => new IgniteConfiguration())</span>
可以看到下面的提示信息:

<span style="font-size:14px;">ic: org.apache.ignite.spark.IgniteContext[Integer,Integer] = org.apache.ignite.spark.IgniteContext@62be2836</span>
也可以用配置文件来创造Context实例(需要添加$IGNITE_HOME环境变量):

import org.apache.ignite.spark._
import org.apache.ignite.configuration._

val ic = new IgniteContext[Integer, Integer](sc, "config/default-config.xml")


3)通过默认配置和名为"partitioned"缓存来创建IgniteRDD实例

val sharedRDD = ic.fromCache("partitioned")
可以看到以下信息:

shareRDD: org.apache.ignite.spark.IgniteRDD[Integer,Integer] = IgniteRDD[0] at RDD at IgniteAbstractRDD.scala:27
需要注意的是创建RDD是一个本地操作,不会在集群中缓存

4)现在让spark来使用刚创建的IgniteRDD,(例:获取所有小于10的键值对)

sharedRDD.filter(_._2 < 10).collect()
因为此时的缓存是空的,所以得到一个空的Array

res0: Array[(Integer, Integer)] = Array()


5)现在给IgniteRDD里写入一些值

sharedRDD.savePairs(sc.parallelize(1 to 100000, 10).map(i => (i, i)))
执行后,缓存里有100000个元素

6)现在我们关闭spark shell,然后重新执行1->3的步骤,在另一个spark job中读取刚才的IgniteRDD

来测试一下有多少大于50000的值

sharedRDD.filter(_._2 > 50000).count
结果为

res0: Long = 50000
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: