您的位置:首页 > 编程语言 > Java开发

spark java程序入门(三)外部程序调用

2016-10-30 15:02 351 查看
在编写spark程序时可能会需要调用已经写好的程序,而有时该外部程序的源码适用的语言可能并不适用于spark或者可能拿不到程序的源码,这时候就会需要进行外部程序的调用。

这里我们以spark的java独立应用调用hspice为例说明整个调用过程。

下面是调用hspice的java类

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.io.*;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

public class Toolcall {
public void h_invoke(ArrayList<String> data_line)  //data_line为后续调用hspice时需要用到的扫描数据
//h_invoke方法接受外部传入的hspice需要的使用的扫描数据
{
//initial spark,详见spark java程序入门(一)
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("hspicecall").set("spark.testing.memory","2147480000");
JavaSparkContext sc = new JavaSparkContext(conf);

//invoke perl script,由于在调用hspice前需要对扫描数据进行一些处理,这里先调用Perl script 再从Perl script中调用hspice
String invoke_pl = "file:///home/nfs/input/amcsm/perl/outer.pl";
String plscriptName = "outer.pl";
sc.addFile(invoke_pl);//添加脚本文件供spark的节点下载,参数是该脚本的路径(可以是本地路径,HDFS路径或HTTP、FTP的URL都可以)

JavaRDD<String> ini_rdd = sc.parallelize(data_line);//把扫描数据转化成RDD

JavaRDD<String> p_rdd = ini_rdd.pipe(SparkFiles.get(plscriptName));//pipe()方法可以将RDD中的元素以标准输入的方式传入调用的脚本,调用的脚本只需再将结果写入标准输出就可以返回结果

p_rdd.saveAsTextFile("file:///home/nfs/input/rddrecord");//存储RDD
}

}


下面是被调用的Perl脚本

#!/usr/bin/perl
my @rdd;	<span style="white-space:pre">	</span>#rdd input
my @sp_name;		#store sp file name
my %data_content;	#store data content
my @cmd_list;		#store command

my $source_dir = "/home/nfs/input/amcsm/source/";
my $target_dir = "/home/nfs/input/amcsm/target/";
my $dname = "";
my $dname_info ="";

#从标准输入读入输入数据
while(<>)
{
$rdd[scalar @rdd] = $_;
}

#对输入进行处理,提取信息,格式整理
foreach $line(@rdd)
{
@tmp_a = split(/\|/,$line);
@tmp_name = split(/\//,$tmp_a[0]);
$dname = $tmp_name[-1];
$dname_info = $dname."|".$tmp_a[2];

$num = scalar @tmp_name;
for($i = 1; $i < $num - 1; $i++)
{
$tmp_name[0] = $tmp_name[0]."/".$tmp_name[$i];
}
$source_dir = $tmp_name[0]."/";

$flag = 0;
for $name(@data_name)
{
if($name eq $dname)
{
$flag = 1;
}
}
if($flag == 0)
{
$data_content{$dname_info} = [];
$data_name[scalar @data_name] = $dname;

$sp_tmp = $dname;
$sp_tmp =~ s/\..*/\.sp/;
$sp_tmp = $sp_tmp."|".$tmp_a[2];
$sp_name[scalar @sp_name] = $sp_tmp;
}

foreach $k(keys %data_content)
{
@tmp_x = split(/\|/,$k);
if($tmp_x[0] eq $dname)
{
${$data_content{$k}}[scalar @{$data_content{$k}}] = $tmp_a[1];
}
}
}

my @hsp_info;
#deal with the sp file
foreach $line(@sp_name)
{
$line =~ s/\n//;
@tmp_info = split(/\|/,$line);
$tmp_cmd = "mkdir ".$target_dir.$tmp_info[1];
`$tmp_cmd`;		#mkdir,根据数据的行号建立不同文件夹防止网表文件或数据文件重名

$tmp_cmd = "cp ".$source_dir.$tmp_info[0]." ".$target_dir.$tmp_info[1]."/".$tmp_info[0];
`$tmp_cmd`;		#copy sp file,<span style="font-family: Arial, Helvetica, sans-serif;">获取hspice所需的网表文件</span>

$abs_r = $target_dir.$tmp_info[1]."/".$tmp_info; #根据新产生的数据文件对网表做相应的修改
open SCAN,"<$abs_r";
@con = <SCAN>;
open NEW,">$abs_r";
foreach $i(@con)
{

$i =~ s/FILE='(.*)'/FILE='$target_dir$tmp_info[1]\/$1'/;
print NEW $i;

}

$cmd_list[scalar @cmd_list] = "hspice ".$target_dir.$tmp_info[1]."/".$tmp_info[0];		#store hspice command
$hsp_info[scalar @hsp_info] = $line;	#store the information for hspice data

}

foreach $line(keys %data_content)		#create data file
{
@tmp_info = split(/\|/,$line);
$tmp_info[1] =~ s/\n//;
$tmp_data = ">".$target_dir.$tmp_info[1]."/".$tmp_info[0];
open DATA,"$tmp_data";

foreach $data(@{$data_content{$line}})
{
print DATA "$data\n";

}

$tmp_test = ">".$target_dir.$tmp_info[1]."/test".$tmp_info[0];
}

#execute hspice command
my @s_content = ();

my $info_counter = 0;
foreach $cmd(@cmd_list)
{
my @all = `$cmd`; #执行调用hspice命令
my $start = 0;
my $end = 0;
=pod
open LIS,">>/home/nfs/input/lis";
foreach $x(@all)
{
print LIS $x;
}
=cut
my $line_num = 0;
my $len;
#从hspice的运行结果中提取数据并进行格式处理
foreach $line(@all)
{
if($line =~ /^x/)
{
$start = $line_num + 4;
}

if($line =~ /^y/)
{
$end = $line_num - 1;
}

if($end != 0)
{
$len = scalar @s_content;
for($i = $start; $i <= $end; $i++)
{
@tmp_a = split(/[\s]+/,$all[$i]);
$tmp_a[2] =~ s/f/E-15/;
$tmp_a[2] =~ s/p/E-12/;
$tmp_a[2] =~ s/n/E-9/;
$tmp_a[2] =~ s/u/E-6/;
$tmp_a[2] =~ s/m/E-3/;
$tmp_a[2] =~ s/a/E-10/;
$s_content[$len] = $tmp_a[2]."|".$hsp_info[$info_counter];
#print YS "$len\t$s_content[$len]\n";
$len++;
}
$start = 0;
$end = 0;
}

$line_num++;

}
$info_counter++;
}

#将处理结果写入标准输出,传回spark程序中
foreach $i(@s_content)
{
print "$i\n";
}


总结:spark程序可以通过pipe()方法调用可执行文件,通过标准输入输出流在spark程序和调用程序间传递数据。(注:对于Linux下的脚本程序第一行的表明解释器的语句必不可少,对于window下运行的spark程序,可以调用的是直接可执行的文件)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  RDD java spark hspice