您的位置:首页 > 其它

Spark日志分析项目Demo(5)--自定义Accumulator

2017-09-03 14:48 501 查看
累加器(accumulator)

我们传递给Spark的函数,如map(),或者filter()的判断条件函数,能够利用定义在函数之外的变量,但是集群中的每一个task都会得到变量的一个副本,并且task在对变量进行的更新不会被返回给driver。而Spark的两种共享变量:累加器(accumulator)和广播变量(broadcast variable),在广播和结果聚合这两种常见类型的通信模式上放宽了这种限制。

使用累加器可以很简便地对各个worker返回给driver的值进行聚合。累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数。例如,当我们统计输入文件信息时,有时需要统计空白行的数量。下面的scala程序描述了这个过程。

import org.apache.spark.{SparkContext, SparkConf}

object AccumulatorTest{
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster("local")
.setAppName("accumulatrorTest")
.setJars(List("E:\\IdeaProjects\\SparkExercise\\out\\artifacts\\SparkExercise_jar\\SparkExercise.jar"))
val sc = new SparkContext(conf)
val file = sc.textFile("E:\\file.txt")
val blankLines = sc.accumulator(0) // Create an Accumulator[Int] initialized to 0,task会得到blankLines的一个副本,且每个task对它的更新不会返回给driver
val callSigns = file.flatMap(line => {
if (line == "") {
blankLines += 1 // Add to the accumulator
}
line.split(" ")
})
callSigns.saveAsTextFile("E:\\output.txt")
println("Blank lines: " + blankLines)
sc.stop()
}
}


在上面的例子中,我们创建了一个名为blankLines的整型累加器(Accumulator[Int]),初始化为0,然后再每次读到一个空白行的时候blankLines加一。因此,累加器使我们可以用一种更简便的方式,在一个RDD的转换过程中对值进行聚合,而不用额外使用一个filter()或reduce()操作。

需要注意的是,由于Spark的lazy机制,只有在saveAsTestFile这个action算子执行后我们才能得到blankLines的正确结果。

由于对于worker节点来说,累加器的值是不可访问的,所有对于worker上的task,累加器是write-only的。这使得累加器可以被更高效的实现,而不需要在每次更新时都进行通信。

自定义累加器

上面的例子中使用的是Spark内建的Integer类型累加器。同时,Spark还支持Double,Long,和Float类型的累加器。除此之外,Spark还提供了自定义累加器类型和聚合操作(如查找最大值等加操作以外的操作)的API,但要保证定义的操作满足交换律和结合律。

下载介绍项目中用到的自定义累加器,重载了方法zero(),add()

public class SessionAggrStatAccumulator implements AccumulatorParam<String> {
private static final long serialVersionUID = 6311074555136039130L;
/**
* zero方法,其实主要用于数据的初始化
* 那么,我们这里,就返回一个值,就是初始化中,所有范围区间的数量,都是0
* 各个范围区间的统计数量的拼接,还是采用一如既往的key=value|key=value的连接串的格式
*/
public String zero(String initialValue) {
return Constants.SESSION_COUNT + "=0|"
+ Constants.TIME_PERIOD_1s_3s + "=0|"
+ Constants.TIME_PERIOD_4s_6s + "=0|"
+ Constants.TIME_PERIOD_7s_9s + "=0|"
+ Constants.TIME_PERIOD_10s_30s + "=0|"
+ Constants.TIME_PERIOD_30s_60s + "=0|"
+ Constants.TIME_PERIOD_1m_3m + "=0|"
+ Constants.TIME_PERIOD_3m_10m + "=0|"
+ Constants.TIME_PERIOD_10m_30m + "=0|"
+ Constants.TIME_PERIOD_30m + "=0|"
+ Constants.STEP_PERIOD_1_3 + "=0|"
+ Constants.STEP_PERIOD_4_6 + "=0|"
+ Constants.STEP_PERIOD_7_9 + "=0|"
+ Constants.STEP_PERIOD_10_30 + "=0|"
+ Constants.STEP_PERIOD_30_60 + "=0|"
+ Constants.STEP_PERIOD_60 + "=0";
}
/**
* addInPlace和addAccumulator
* 可以理解为是一样的
*
* 这两个方法,其实主要就是实现,v1可能就是我们初始化的那个连接串
* v2,就是我们在遍历session的时候,判断出某个session对应的区间,然后会用Constants.TIME_PERIOD_1s_3s
* 所以,我们,要做的事情就是
* 在v1中,找到v2对应的value,累加1,然后再更新回连接串里面去
*
*/
public String addAccumulator(String v1, String v2) {
return add(v1,v2);
}

public String addInPlace(String v1, String v2) {
return add(v1,v2);
}
/**
* session统计计算逻辑
* @param v1 连接串
* @param v2 范围区间
* @return 更新以后的连接串
*/
private String add(String v1, String v2){
// 校验:v1为空的话,直接返回v2
if(StringUtils.isEmpty(v1)) {
return v2;
}

// 使用StringUtils工具类,从v1中,提取v2对应的值,并累加1
String oldValue = StringUtils.getFieldFromConcatString(v1, "\\|", v2);
if(oldValue != null) {
// 将范围区间原有的值,累加1
int newValue = Integer.valueOf(oldValue) + 1;
// 使用StringUtils工具类,将v1中,v2对应的值,设置成新的累加后的值
return StringUtils.setFieldInConcatString(v1, "\\|", v2, String.valueOf(newValue));
}
return v1;
}

}


/**
* 字符串工具类
* @author Administrator
*
*/
public class StringUtils {

/**
* 判断字符串是否为空
* @param str 字符串
* @return 是否为空
*/
public static boolean isEmpty(String str) {
return str == null || "".equals(str);
}

/**
* 判断字符串是否不为空
* @param str 字符串
* @return 是否不为空
*/
public static boolean isNotEmpty(String str) {
return str != null && !"".equals(str);
}

/**
* 截断字符串两侧的逗号
* @param str 字符串
* @return 字符串
*/
public static String trimComma(String str) {
if(str.startsWith(",")) {
str = str.substring(1);
}
if(str.endsWith(",")) {
str = str.substring(0, str.length() - 1);
}
return str;
}

/**
* 补全两位数字
* @param str
* @return
*/
public static String fulfuill(String str) {
if(str.length() == 2) {
return str;
} else {
return "0" + str;
}
}

/**
* 从拼接的字符串中提取字段
* @param str 字符串
* @param delimiter 分隔符
* @param field 字段
* @return 字段值
*/
public static String getFieldFromConcatString(String str,
String delimiter, String field) {
try {
String[] fields = str.split(delimiter);
for(String concatField : fields) {
// searchKeywords=|clickCategoryIds=1,2,3
if(concatField.split("=").length == 2) {
String fieldName = concatField.split("=")[0];
String fieldValue = concatField.split("=")[1];
if(fieldName.equals(field)) {
return fieldValue;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
* 从拼接的字符串中给字段设置值
* @param str 字符串
* @param delimiter 分隔符
* @param field 字段名
* @param newFieldValue 新的field值
* @return 字段值
*/
public static String setFieldInConcatString(String str,
String delimiter, String field, String newFieldValue) {
String[] fields = str.split(delimiter);

for(int i = 0; i < fields.length; i++) {
String fieldName = fields[i].split("=")[0]
c61c
;
if(fieldName.equals(field)) {
String concatField = fieldName + "=" + newFieldValue;
fields[i] = concatField;
break;
}
}

StringBuffer buffer = new StringBuffer("");
for(int i = 0; i < fields.length; i++) {
buffer.append(fields[i]);
if(i < fields.length - 1) {
buffer.append("|");
}
}

return buffer.toString();
}

}


public class AccumuletorTest {
public static void main(String[] args) {
// 构建Spark上下文
SparkConf conf = new SparkConf()
.setAppName("AccumuletorTest")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
final Accumulator<String> sessionAggrStatAccumulator = sc.accumulator(
"", new SessionAggrStatAccumulator());
List<Long> seq= new ArrayList<Long>();
for (int i = 0; i < 1000; i++) {
seq.add(Long.valueOf((int) (Math.random() * 1000)));
}
JavaRDD<Long> rdd=sc.parallelize(seq);
//必须用RDD执行
rdd.foreach(new VoidFunction<Long>() {
public void call(Long aLong) throws Exception {
sessionAggrStatAccumulator.add(Constants.SESSION_COUNT);
calculateVisitLength(aLong);
calculateStepLength(aLong);
}
/**
* 计算访问时长范围
* @param visitLength
*/
private void calculateVisitLength(long visitLength) {
if (visitLength >= 1 && visitLength <= 3) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1s_3s);
} else if (visitLength >= 4 && visitLength <= 6) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_4s_6s);
} else if (visitLength >= 7 && visitLength <= 9) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_7s_9s);
} else if (visitLength >= 10 && visitLength <= 30) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10s_30s);
} else if (visitLength > 30 && visitLength <= 60) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30s_60s);
} else if (visitLength > 60 && visitLength <= 180) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_1m_3m);
} else if (visitLength > 180 && visitLength <= 600) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_3m_10m);
} else if (visitLength > 600 && visitLength <= 1800) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_10m_30m);
} else if (visitLength > 1800) {
sessionAggrStatAccumulator.add(Constants.TIME_PERIOD_30m);
}
}

/**
* 计算访问步长范围
* @param stepLength
*/
private  void calculateStepLength(long stepLength) {
if (stepLength >= 1 && stepLength <= 3) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_1_3);
} else if (stepLength >= 4 && stepLength <= 6) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_4_6);
} else if (stepLength >= 7 && stepLength <= 9) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_7_9);
} else if (stepLength >= 10 && stepLength <= 30) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_10_30);
} else if (stepLength > 30 && stepLength <= 60) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_30_60);
} else if (stepLength > 60) {
sessionAggrStatAccumulator.add(Constants.STEP_PERIOD_60);
}
}
});
System.out.println("accumuletor2:"+sessionAggrStatAccumulator.value());
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: