您的位置:首页 > 其它

Storm系列(二十)分区事务PartitionTransaction及示例

2015-11-13 15:12 429 查看
在Storm中分区事务的处理,Spout端需要实现IPartitionedTransactionalSpout接口,用于对批次及偏移量的控制,而Bolt都必须实现IBatchBolt接口,通常继承至BaseTransactionalBolt抽象类。

 

API详解

Spout端

IPartitionedTransactionalSpout接口

public interface IPartitionedTransactionalSpout<T> extends IComponent {
    public interface Coordinator {
        // 返回分区的个数,每开始一个事务调用一次
        int numPartitions();
        // 启动事务,true表示开始
        boolean isReady();
        // 主要用于结束时调时释放资源
        void close();
    }
10      
11      public interface Emitter<X> {
12          // 发射一个新的Batch
13          X emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta);
14      // 当Batch发送失败时,负责重发该Batch
15          void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X partitionMeta);
16   
17          void close();
18      }
19      
20      Coordinator getCoordinator(Map conf, TopologyContext context);
21   
22      Emitter<T> getEmitter(Map conf, TopologyContext context);      
23  }
 

Bolt

public interface IBatchBolt<T> extends Serializable, IComponent {
    // Batch预处理
    void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
    // 
    void execute(Tuple tuple);
    // 
    void finishBatch();
}
 

说明:

在PartitionTransaction事务中,Spout端设置为多并行度时,Coordinator 实例只会有一个,而Emitter实例有对应的是多个,可通过增加Emitter实例来增加处理的数据的吞吐量。Coordinator 中主要用于定义分区个数和启动事务,Emitter用于发送Batch中的 Tuple和控制数据所在队列的偏移量,合理的情况下多少个分区就应该分配多少个Emitter实例。
Bolt被标记为Committer与不标记为Committer的区别:被标记为Committer的Bolt具有强制一致性,即根据事务ID大小有严格的先后执行顺序,而没有被标记为Committer的Bolt就没有顺序性,并且可以并行执行。
 

示例代码

入口类

public class TestTransactionTopology {
    public static void main(String[] args) {
 
        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder(
                "ttbId""spoutid"new TestPartitionTransaction(), 2);
        builder.setBolt("bolt1"new TestTransactionBolt1(), 2).shuffleGrouping(
                "spoutid");
        builder.setBolt("committer"new TestTransactionBolt2(), 1).shuffleGrouping(
                "bolt1");
10   
11          Config conf = new Config();
12          conf.setDebug(false);
13   
14          if (args.length > 0) {
15              try {
16                  StormSubmitter.submitTopology(args[0], conf,
17                          builder.buildTopology());
18              } catch (AlreadyAliveException e) {
19                  e.printStackTrace();
20              } catch (InvalidTopologyException e) {
21                  e.printStackTrace();
22              }
23          } else {
24              LocalCluster localCluster = new LocalCluster();
25              localCluster.submitTopology("mytopology", conf,
26                      builder.buildTopology());
27          }
28      }
29  }
 

Spout处理类

public class TestPartitionTransaction implements
        IPartitionedTransactionalSpout<TestMetaDate> {
 
    private static final long serialVersionUID = 1L;
    public static Map<Integer, Map<Long, String>> DATA_BASE = new HashMap<Integer, Map<Long, String>>();
    public static int PART_COUNT = 2;
    public static int BATCH_NUM = 7;
 
    public TestPartitionTransaction() {
10          Random random = new Random();
11   
12          for (int i = 0; i < PART_COUNT; i++) {
13              Map<Long, String> map = new HashMap<Long, String>();
14              if (2 == i){
15                  for (long j = 0; j < 60; j++) {
16                      map.put(j, "TestPartitionTransaction:" + random.nextInt(100));
17                  }
18              }else {
19                  for (long j = 0; j < 100; j++) {
20                      map.put(j, "TestPartitionTransaction:" + random.nextInt(100));
21                  }
22              }
23             
24              DATA_BASE.put(i, map);
25          }
26   
27          System.err.println("TestTransaction start");
28      }
29   
30      public void declareOutputFields(OutputFieldsDeclarer declarer) {
31          declarer.declare(new Fields("tx""content""partition"));
32      }
33   
34      @Override
35      public Map<String, Object> getComponentConfiguration() {
36          return null;
37      }
38   
39      @Override
40      public backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout.Coordinator getCoordinator(
41              @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
42          System.err.println("------------- TestPartitionCoord ");
43          return new TestPartitionCoord();
44      }
45   
46      @Override
47      public backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout.Emitter<TestMetaDate> getEmitter(
48              @SuppressWarnings("rawtypes") Map conf, TopologyContext context) {
49          System.err.println("------------- TestPartitionEmitter ");
50          return new TestPartitionEmitter();
51      }
52   
53      public class TestPartitionCoord implements
54              IPartitionedTransactionalSpout.Coordinator {
55   
56          @Override
57          public int numPartitions() {
58              System.err.println("-------------numPartitions :"+PART_COUNT);
59              return PART_COUNT;
60          }
61   
62          @Override
63          public boolean isReady() {
64              Utils.sleep(1000);
65              return true;
66          }
67   
68          @Override
69          public void close() {
70          }
71   
72      }
73   
74      public class TestPartitionEmitter implements
75              IPartitionedTransactionalSpout.Emitter<TestMetaDate> {
76   
77          @Override
78          public TestMetaDate emitPartitionBatchNew(TransactionAttempt tx,
79                  BatchOutputCollector collector, int partition,
80                  TestMetaDate lastPartitionMeta) {
81              long index = 0;
82              if (null == lastPartitionMeta) {
83                  index = 0;
84              } else {
85                  index = lastPartitionMeta.get_index()
86                          + lastPartitionMeta.get_size();
87              }
88              TestMetaDate data = new TestMetaDate();
89              data.set_index(index);
90              data.set_size(BATCH_NUM);
91   
92              emitPartitionBatch(tx, collector, partition, data);
93              System.err.println("开始启动一个事务 partition:" + partition + ",data:"
94                      + data.toString());
95   
96              return data;
97          }
98   
99          @Override
100          public void emitPartitionBatch(TransactionAttempt tx,
101                  BatchOutputCollector collector, int partition,
102                  TestMetaDate partitionMeta) {
103              System.err
104                      .println("TestPartitionEmitter emitPartitionBatch partition:"
105                              + partition);
106   
107              Map<Long, String> map = DATA_BASE.get(partition);
108              long start = partitionMeta.get_index();
109              long size = partitionMeta.get_size();
110              for (long i = start; i < start + size; i++) {
111                  String content = map.get(i);
112                  if (null == content) {
113                      System.err
114                      .println("################ TestPartitionEmitter emitPartitionBatch content null");
115                      break;
116                  }
117                  collector.emit(new Values(tx, content, partition));
118              }
119          }
120   
121          @Override
122          public void close() {
123          }
124   
125      }
126   
127  }
 

TestTransactionBolt1实现类

public class TestTransactionBolt1 extends BaseTransactionalBolt {
 
    private static final long serialVersionUID = 1L;
    private BatchOutputCollector _collector = null;
    private TopologyContext _context = null;
    private Map<Integer, Integer> statMap = null;
    private TransactionAttempt _txAttempt = null;
 
    @Override
10      public void prepare(@SuppressWarnings("rawtypes") Map conf,
11              TopologyContext context, BatchOutputCollector collector,
12              TransactionAttempt id) {
13          this._context = context;
14          this._collector = collector;
15          this._txAttempt = id;
16          if (null == statMap) {
17              statMap = new HashMap<Integer, Integer>();
18          }
19   
20          System.err.println("TestTransactionBolt1 prepare TransactionAttempt:"
21                  + id.toString() + ",TaskId:" + _context.getThisTaskId());
22      }
23   
24      @Override
25      public void execute(Tuple tuple) {
26   
27          String context = tuple.getStringByField("content");
28          if (null == context || context.isEmpty()) {
29              return;
30          }
31          int nPart = tuple.getIntegerByField("partition");
32          TransactionAttempt tx = (TransactionAttempt) tuple
33                  .getValueByField("tx");
34   
35          Integer count = statMap.get(nPart);
36          if (null == count) {
37              count = 0;
38          }
39          count++;
40          statMap.put(nPart, count);
41   
42          System.err.println("TestTransactionBolt1 execute tx:" + tx.toString()
43                  + ",nPart:" + nPart + ",count:" + count + ",TaskId:"
44                  + _context.getThisTaskId());
45      }
46   
47      @Override
48      public void finishBatch() {
49          String jsonString = JSON.toJSONString(statMap, true);
50   
51          System.err.println("jsonString:" + jsonString + ",TaskId:"
52                  + _context.getThisTaskId());
53   
54          _collector.emit(new Values(_txAttempt, jsonString));
55      }
56   
57      @Override
58      public void declareOutputFields(OutputFieldsDeclarer declarer) {
59          declarer.declare(new Fields("tx""result"));
60      }
61   
62  }
 

TestTransactionBolt2 实现类

public class TestTransactionBolt2 extends BaseTransactionalBolt implements
        ICommitter {
 
    private static final long serialVersionUID = 1L;
 
    private TransactionAttempt _txAttempt = null;
    private TopologyContext _context = null;
    private static Map<String, Integer> statMap = null;
    private static boolean ifbatch = false;
10   
11      @Override
12      public void prepare(@SuppressWarnings("rawtypes") Map conf,
13              TopologyContext context, BatchOutputCollector collector,
14              TransactionAttempt id) {
15          this._context = context;
16          this._txAttempt = id;
17          if (null == statMap) {
18              statMap = new HashMap<String, Integer>();
19              ifbatch = true;
20          }
21          System.err.println("TestTransactionBolt2 prepare _txAttempt:"
22                  + _txAttempt.toString());
23      }
24   
25      @Override
26      public void execute(Tuple tuple) {
27          TransactionAttempt id = (TransactionAttempt) tuple
28                  .getValueByField("tx");
29          String sResult = tuple.getStringByField("result");
30          if (null == sResult || sResult.isEmpty()) {
31              return;
32          }
33          JSONObject obj = JSON.parseObject(sResult);
34          for (Map.Entry<String, Object> entry:obj.entrySet()){
35              Integer count =statMap.get(entry.getKey());
36              if (null == count){
37                  count = 0;
38              }
39              Integer value = (Integer)entry.getValue();
40              count += value;
41              statMap.put(entry.getKey(), count);
42              ifbatch = true;
43          }
44         
45          System.err.println("TestTransactionBolt2 execute id:"
46                  + id.toString()+",statMap:"+statMap.toString());
47      }
48   
49      @Override
50      public void finishBatch() {
51          if (statMap.size() > 0 && ifbatch){
52              System.err.println("TestTransactionBolt2 finishBatch statMap:"+statMap.toString());
53              ifbatch = false;
54          }
55      }
56   
57      @Override
58      public void declareOutputFields(OutputFieldsDeclarer declarer) {
59   
60      }
61   
62  }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: