您的位置:首页 > 编程语言 > Java开发

java.lang.IllegalArgumentException: Tuple created with wrong number of fields. Expected 1 fields but

2017-12-06 20:03 1246 查看
由于以前使用的storm是0.9.8版本的,新公司使用的版本是1.1.1版本的,在本地模式启动storm的时候报错如下:

8747 [Thread-32-spout-executor[4 4]] ERROR o.a.s.util - Async loop died!
java.lang.IllegalArgumentException: Tuple created with wrong number of fields. Expected 1 fields but got 2 fields
at org.apache.storm.tuple.TupleImpl.<init>(TupleImpl.java:58) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__4962$fn__4977$send_spout_msg__4983.invoke(executor.clj:570) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__4962$fn$reify__4993.emit(executor.clj:611) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:50) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:65) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:204) ~[storm-kafka-1.1.1.jar:1.1.1]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:136) ~[storm-kafka-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__4962$fn__4977$fn__5008.invoke(executor.clj:646) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
8748 [Thread-32-spout-executor[4 4]] ERROR o.a.s.d.executor -
java.lang.IllegalArgumentException: Tuple created with wrong number of fields. Expected 1 fields but got 2 fields
at org.apache.storm.tuple.TupleImpl.<init>(TupleImpl.java:58) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__4962$fn__4977$send_spout_msg__4983.invoke(executor.clj:570) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__4962$fn$reify__4993.emit(executor.clj:611) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:50) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:65) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:204) ~[storm-kafka-1.1.1.jar:1.1.1]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:136) ~[storm-kafka-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__4962$fn__4977$fn__5008.invoke(executor.clj:646) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
8832 [Thread-32-spout-executor[4 4]] ERROR o.a.s.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.1.1.jar:1.1.1]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn__5628$fn__5629.invoke(worker.clj:759) [storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$mk_executor_data$fn__4848$fn__4849.invoke(executor.clj:276) [storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:494) [storm-core-1.1.1.jar:1.1.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]

扯一下:这个错误弄了一天才弄好,

查资料有人说field的数量和 Value的数量不一致,但是我的明明一致????

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("end")
);
@Override
public void execute(Tuple input) {
Room room = (Room)input.getValueByField("room");
System.out.println(room);
collector.emit(new Values("Aaaa","aaaaaa"));
}

后来仔细研究发现:问题出现在Scheme上面,在storm0.9.8的时候实现Scheme接口的方法
deserialize的参数为byte[] ,而storm1.1.1的参数为ByteBuffer
形式如下:
package logAnalyze.storm.spout;

import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.List;

public class StringScheme implements Scheme {
public List<Object> deserialize(byte[] bytes) {
try {
return new Values(new String(bytes));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Fields getOutputFields() {
return new Fields("line");
}
}


所以,只需要吧implements Scheme的类进行重写就行,形式如下:
package com.qk365.report.storm.spout;

import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;

public class StringScheme implements Scheme {
public List<Object> deserialize(ByteBuffer bytes) {
try {
return new Values(getStringByByteBuffer(bytes));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Fields getOutputFields() {
return new Fields("line");
}

private String getStringByByteBuffer(ByteBuffer buffer) {
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;
try {
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
return charBuffer.toString();
} catch (Exception e) {
return "";
}
}
}


最后不要忘了在main方法里面进行设置
package com.qk365.report.storm;

import com.qk365.report.storm.bolt.ParseRecordBolt;
import com.qk365.report.storm.spout.StringScheme;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;

import java.util.HashMap;
import java.util.Map;

public class KafkaAndStormTopologyMain {
public static void main(String[] args) throws Exception{
//配置zookeeper 主机:端口号
BrokerHosts brokerHosts =new ZkHosts("qkt-datacenter1:2181,qkt-datacenter2:2181,qkt-datacenter3:2181");
//接收消息队列的主题
String topic="orderMq_room";
//zookeeper设置文件中的配置,如果zookeeper配置文件中设置为主机名:端口号 ,该项为空
String zkRoot="/orderMq";
//任意
String spoutId="input";
SpoutConfig spoutConfig=new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
//设置如何处理kafka消息队列输入流
spoutConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
Config conf=new Config();
//不输出调试信息
conf.setDebug(true);
//设置一个spout task中处于pending状态的最大的tuples数量
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
Map<String, String> map=new HashMap<String,String>();
// 配置Kafka broker地址
map.put("metadata.broker.list", "qkt-datacenter3:9092,qkt-datacenter5:9092,qkt-datacenter6:9092");
// serializer.class为消息的序列化类
map.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("kafka.broker.properties", map);
// 配置KafkaBolt生成的topic
conf.put("topic", "kafkaSpout");

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(spoutConfig),1);

builder.setBolt("bolt1", new ParseRecordBolt(),1).setNumTasks(1).shuffleGrouping("spout");
//  builder.setBolt("bolt2", new HandleRecordBolt(),1).setNumTasks(1).shuffleGrouping("bolt1");
conf.setNumWorkers(2);

//3、提交任务  -----两种模式 本地模式和集群模式
if (args.length>0) {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}else {
conf.setMaxTaskParallelism(3);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("test", conf, builder.createTopology());
}
}
}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐