您的位置:首页 > 编程语言 > Java开发

HDPCD-Java-复习笔记(18) - lab

2017-10-17 21:43 399 查看
Java lab booklet

Understanding Pig

root@ubuntu:~/java/labs/demos# pig


grunt> copyFromLocal/root/java/labs/demos/pigdemo.txt demos/


grunt> cd demos


Define the employees relation, using a schema:

grunt> employees = LOAD 'pigdemo.txt'  AS (state,name);


grunt> describe employees;
employees: {state:bytearray,name: bytearray}

grunt> DUMP employees;


·        (SD,Rich)
·        (NV,Barry)
·        (CO,George)
·        (CA,Ulf)
·        (IL,Danielle)
·        (OH,Tom)
·        (CA,manish)
·        (CA,Brian)

·        (CO,Mark)

grunt> emp_group = GROUP employees BY state;

grunt> describe emp_group;
emp_group: {group:bytearray,employees: {(state: bytearray,name: bytearray)}}

grunt> DUMP emp_group;
·        The output is:
·        (CA,{(CA,Ulf),(CA,manish),(CA,Brian)})
·        (CO,{(CO,George),(CO,Mark)})
·        (IL,{(IL,Danielle)})
·        (NV,{(NV,Barry)})
·        (OH,{(OH,Tom)})
·        (SD,{(SD,Rich)})

grunt> STORE emp_group INTO 'emp_group_csv' USING
PigStorage(',');

grunt> cat emp_group_csv/part-r-00000

CA,{(CA,Brian),(CA,manish),(CA,Ulf)}

CO,{(CO,Mark),(CO,George)}

IL,{(IL,Danielle)}

NV,{(NV,Barry)}

OH,{(OH,Tom)}

SD,{(SD,Rich)}

The aliases command shows a list of currently defined aliases:
grunt> aliases

aliases: [1-41, ca_only, 1-39, emp_group, employees]

Lab: Writing a Pig User Defined Function (UDF)

A Pig script the computes the on-balance volume of a specified stock.

package stockudfs;

import java.io.IOException;

import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;

public class OnBalanceVolume extends EvalFunc<Long> {
private long previousObv = 0;
private double previousClose = 0;

@Override
public Long exec(Tuple input) throws IOException {
long volume = Long.parseLong(input.get(0).toString());
double currentClose = Double.parseDouble(input.get(1).toString());
long obv;
if (currentClose > previousClose) {
obv = previousObv + volume;
}else if (currentClose < previousClose) {
obv = previousObv - volume;
}else {
obv = previousObv;
}
return obv;
}

}stockvolume.pig
register stockudfs.jar;

stockdata = LOAD 'stocksA' using PigStorage(',') AS (exchange:chararray,symbol:chararray,

date:chararray,open:float,high:float,low:float,close:float,volume:int);

stock_all = FOREACH stockdata GENERATE symbol,date,close,volume;

stock_filter = FILTER stock_all BY symbol == '$symbol';

stock_sorted = ORDER stock_filter BY date ASC;

obv_result = FOREACH stock_sorted GENERATE symbol, date, stockudfs.OnBalanceVolume(volume, close) AS obv;

dump obv_result;

//STORE obv_result INTO 'obv_result';

结果输出:

(AVA,2009-12-02,245700)

(AVA,2009-12-03,426000)

(AVA,2009-12-04,262200)

(AVA,2009-12-07,131300)

(AVA,2009-12-08,190900)

(AVA,2009-12-09,138700)

(AVA,2009-12-10,217300)

(AVA,2009-12-11,139200)

(AVA,2009-12-14,165500)

(AVA,2009-12-15,311900)

(AVA,2009-12-16,177800)

(AVA,2009-12-17,177900)

(AVA,2009-12-18,532500)

(AVA,2009-12-21,177100)

Lab:
Writing an Accumulator User Defined Function(UDF)


A Pig script the outputs a stock’s highest closing price, along with the following 4 closing prices following the highest close.

package stockudfs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.pig.AccumulatorEvalFunc;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;

public class HighestClosingPriceWindow extends AccumulatorEvalFunc<String> {
private int windowSize;
private int accumulatedPrices;
private List<String> highDates;
private List<Float> highCloses;
private float highClose;

public HighestClosingPriceWindow(String size) {
int winSize = Integer.parseInt(size);
if (winSize > 0) {
windowSize = winSize;
}else {
windowSize = 1;
}
highDates = new ArrayList<String>();
highCloses = new ArrayList<Float>();
}

@Override
public void accumulate(Tuple b) throws IOException {
DataBag values = (DataBag) b.get(0);
Iterator<Tuple> iterator = values.iterator();
float currentClose;
while (iterator.hasNext()) {
Tuple tuple = (Tuple) iterator.next();
currentClose = Float.parseFloat(tuple.get(2).toString());
if (currentClose > highClose) {
highClose = currentClose;
highCloses.add(0, currentClose);
highDates.add(0, tuple.get(1).toString());
accumulatedPrices = 1;
}else if (accumulatedPrices < windowSize) {
highCloses.add(accumulatedPrices, Float.parseFloat(tuple.get(2).toString()));
highDates.add(accumulatedPrices, tuple.get(1).toString());
accumulatedPrices ++;
}
}
}

@Override
public void cleanup() {
highClose = 0;
highCloses.clear();
highDates.clear();
accumulatedPrices = 0;
}

@Override
public String getValue() {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < highCloses.size(); i++) {
builder.append(highDates.get(i) + " " + highCloses.get(i) + "\n");
}
return builder.toString();
}

}
highclose.pig
register stockudfs.jar;

define HighestClosingPriceWindow stockudfs.HighestClosingPriceWindow('4');

stockdata = LOAD 'stocksA' using PigStorage(',') AS (
exchange:chararray, 
symbol:chararray, 
date:chararray, 
open:float, 
high:float, 
low:float, 
close:float, 
volume:int
);
stocks_all = FOREACH stockdata GENERATE symbol, date, close;
stocks_group = GROUP stocks_all BY symbol;
stocks_high = FOREACH stocks_group {
sorted = ORDER stocks_all BY date ASC;
GENERATE group as symbol, HighestClosingPriceWindow(sorted) as result; 
}
dump stocks_high;

结果:

(AVT,2000-04-28 78.62

2000-05-01 78.44

2000-05-02 78.31

2000-05-03 74.06

2000-04-27 76.25

2000-04-26 74.19

2000-04-25 73.44

1997-12-08 72.16)

(AXE,2007-07-24 86.11

2007-07-25 83.8

2007-07-26 82.25

2007-07-27 81.3)




内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  hdp