Trident OpaqueKafkaSpout: transactions not committed












0














So I'm using the storm-kafka-client to subscribe my topology to a Kafka topic. Since I need exactly-once semantics I'm using the Opaque Spout, however the first transaction is never committed, it gets saved to the state and then it's replayed over and over again periodically. If I push more data into Kafka, this data is not being saved to the state either because the previous transaction hasn't been commited. The State I'm using is the Kafka State as well.



I'm using Strom 1.2.1 and Kafka 1.0.1 if that matters.



edit: this is the topology I'm using.



Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);

KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());

TridentTopology topology = new TridentTopology();

Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));

Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
.each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());

LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());


And this is what I get in the logs when I receive a message from Kafka in debug mode.



2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1


Activating the DEBUG logging on the MasterCoordinator I get these errors periodically.



2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6


Aparently the batches failing are empty. If there is data in the topic when the topology starts it works because there's no empty batches beforehand, however after the first batch is committed (the one with data), the empty batches start failing and new Kafka data is not being processed because there's uncommitted transactions. If the topic is empty, it just fails constantly and it doesn't push anything to Kafka.










share|improve this question
























  • Is it the storm-kafka or storm-kafka-client spout?
    – Stig Rohde Døssing
    Nov 23 '18 at 13:23










  • @StigRohdeDøssing it's the storm-kafka-client
    – Algeel
    Nov 23 '18 at 13:31










  • Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
    – Stig Rohde Døssing
    Nov 23 '18 at 13:37










  • @StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
    – Algeel
    Nov 26 '18 at 8:48










  • The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
    – Stig Rohde Døssing
    Nov 26 '18 at 18:34
















0














So I'm using the storm-kafka-client to subscribe my topology to a Kafka topic. Since I need exactly-once semantics I'm using the Opaque Spout, however the first transaction is never committed, it gets saved to the state and then it's replayed over and over again periodically. If I push more data into Kafka, this data is not being saved to the state either because the previous transaction hasn't been commited. The State I'm using is the Kafka State as well.



I'm using Strom 1.2.1 and Kafka 1.0.1 if that matters.



edit: this is the topology I'm using.



Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);

KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());

TridentTopology topology = new TridentTopology();

Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));

Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
.each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());

LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());


And this is what I get in the logs when I receive a message from Kafka in debug mode.



2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1


Activating the DEBUG logging on the MasterCoordinator I get these errors periodically.



2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6


Aparently the batches failing are empty. If there is data in the topic when the topology starts it works because there's no empty batches beforehand, however after the first batch is committed (the one with data), the empty batches start failing and new Kafka data is not being processed because there's uncommitted transactions. If the topic is empty, it just fails constantly and it doesn't push anything to Kafka.










share|improve this question
























  • Is it the storm-kafka or storm-kafka-client spout?
    – Stig Rohde Døssing
    Nov 23 '18 at 13:23










  • @StigRohdeDøssing it's the storm-kafka-client
    – Algeel
    Nov 23 '18 at 13:31










  • Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
    – Stig Rohde Døssing
    Nov 23 '18 at 13:37










  • @StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
    – Algeel
    Nov 26 '18 at 8:48










  • The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
    – Stig Rohde Døssing
    Nov 26 '18 at 18:34














0












0








0







So I'm using the storm-kafka-client to subscribe my topology to a Kafka topic. Since I need exactly-once semantics I'm using the Opaque Spout, however the first transaction is never committed, it gets saved to the state and then it's replayed over and over again periodically. If I push more data into Kafka, this data is not being saved to the state either because the previous transaction hasn't been commited. The State I'm using is the Kafka State as well.



I'm using Strom 1.2.1 and Kafka 1.0.1 if that matters.



edit: this is the topology I'm using.



Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);

KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());

TridentTopology topology = new TridentTopology();

Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));

Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
.each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());

LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());


And this is what I get in the logs when I receive a message from Kafka in debug mode.



2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1


Activating the DEBUG logging on the MasterCoordinator I get these errors periodically.



2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6


Aparently the batches failing are empty. If there is data in the topic when the topology starts it works because there's no empty batches beforehand, however after the first batch is committed (the one with data), the empty batches start failing and new Kafka data is not being processed because there's uncommitted transactions. If the topic is empty, it just fails constantly and it doesn't push anything to Kafka.










share|improve this question















So I'm using the storm-kafka-client to subscribe my topology to a Kafka topic. Since I need exactly-once semantics I'm using the Opaque Spout, however the first transaction is never committed, it gets saved to the state and then it's replayed over and over again periodically. If I push more data into Kafka, this data is not being saved to the state either because the previous transaction hasn't been commited. The State I'm using is the Kafka State as well.



I'm using Strom 1.2.1 and Kafka 1.0.1 if that matters.



edit: this is the topology I'm using.



Config conf = new Config();
conf.setNumWorkers(NUM_WORKERS);
conf.setDebug(true);
conf.setMaxSpoutPending(MAX_SPOUT_PENDING);

KafkaSpoutConfig.Builder<String, String> spoutConfig = KafkaSpoutConfig.builder(KAFKA_BROKERS, KAFKA_TOPIC);
spoutConfig.setProp(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SCHEMA_REGISTRY_URL);
spoutConfig.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
spoutConfig.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
spoutConfig.setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_LEAST_ONCE);
spoutConfig.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
spoutConfig.setOffsetCommitPeriodMs(5000);
KafkaTridentSpoutOpaque kafkaSpout = new KafkaTridentSpoutOpaque<> (spoutConfig.build());

TridentTopology topology = new TridentTopology();

Properties props = new Properties();
props.put("bootstrap.servers", KAFKA_BROKERS);
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("parsed_value", "parsed_value"));

Stream tramasRedis = topology.newStream("KAFKA_TRAMAS", kafkaSpout).parallelismHint(KAFKA_SPOUT_PARALELLISM)
.each(new Fields("key", "value"), new ReadRedisMessage(), new Fields("parsed_value"));
tramasRedis.partitionPersist(stateFactory, new Fields("parsed_value"), new TridentKafkaStateUpdater());

LocalCluster lc = new LocalCluster();
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, topology.build());


And this is what I get in the logs when I receive a message from Kafka in debug mode.



2018-11-26 08:20:16.166 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS s1 [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.167 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]]
2018-11-26 08:20:16.168 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting direct: 5; spout-KAFKA_TRAMAS $coord-bg0 [13:2, 1]
2018-11-26 08:20:16.168 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 5 tuple: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]]
2018-11-26 08:20:16.169 o.a.s.d.task Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Emitting: spout-KAFKA_TRAMAS __ack_ack [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] TRANSFERING tuple [dest: 3 tuple: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] BOLT ack TASK: 6 TIME: -1 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Execute done TUPLE source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-7985297489562004428=4123209423349964546}, [13:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]] TASK: 6 DELTA: -1
2018-11-26 08:20:16.169 o.a.s.d.executor Thread-10-spout-KAFKA_TRAMAS-executor[6 6] [INFO] Processing received message FOR 6 TUPLE: source: $spoutcoord-spout-KAFKA_TRAMAS:2, stream: $batch, id: {-1505053285028712763=8306088157808961249}, [14:2, [{partition=3, topic=redis-tramas-fnet}, {partition=2, topic=redis-tramas-fnet}, {partition=1, topic=redis-tramas-fnet}, {partition=0, topic=redis-tramas-fnet}, {partition=7, topic=redis-tramas-fnet}, {partition=6, topic=redis-tramas-fnet}, {partition=5, topic=redis-tramas-fnet}, {partition=4, topic=redis-tramas-fnet}, {partition=11, topic=redis-tramas-fnet}, {partition=10, topic=redis-tramas-fnet}, {partition=9, topic=redis-tramas-fnet}, {partition=8, topic=redis-tramas-fnet}, {partition=15, topic=redis-tramas-fnet}, {partition=14, topic=redis-tramas-fnet}, {partition=13, topic=redis-tramas-fnet}, {partition=12, topic=redis-tramas-fnet}]]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831]
2018-11-26 08:20:16.171 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: __ack_ack, id: {}, [-7985297489562004428 3421159403193702831] TASK: 3 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: 6 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}]
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: s1, id: {}, [13:2, redis-tramas-fnet, 1, 0, null, {"original": "test1"}] TASK: 5 DELTA: -1
2018-11-26 08:20:16.175 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Processing received message FOR 5 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.175 o.a.s.d.task Thread-14-b-0-executor[5 5] [INFO] Emitting: b-0 __ack_ack [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] TRANSFERING tuple [dest: 3 tuple: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] BOLT ack TASK: 5 TIME: -1 TUPLE: source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1]
2018-11-26 08:20:16.176 o.a.s.d.executor Thread-14-b-0-executor[5 5] [INFO] Execute done TUPLE source: spout-KAFKA_TRAMAS:6, stream: $coord-bg0, id: {-7985297489562004428=1604111899640219309}, [13:2, 1] TASK: 5 DELTA: -1
2018-11-26 08:20:16.177 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.task Thread-6-__acker-executor[3 3] [INFO] Emitting direct: 1; __acker __ack_ack [-7985297489562004428 37198]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] TRANSFERING tuple [dest: 1 tuple: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309]
2018-11-26 08:20:16.178 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: b-0:5, stream: __ack_ack, id: {}, [-7985297489562004428 1604111899640219309] TASK: 3 DELTA: -1
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Processing received message FOR 1 TUPLE: source: __acker:3, stream: __ack_ack, id: {}, [-7985297489562004428 37198]
2018-11-26 08:20:16.181 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Acking message -7985297489562004428 13:2
2018-11-26 08:20:16.561 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 $batch [88:1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 2 tuple: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]]
2018-11-26 08:20:16.562 o.a.s.d.task Thread-8-$mastercoord-bg0-executor[1 1] [INFO] Emitting: $mastercoord-bg0 __ack_init [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.562 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] TRANSFERING tuple [dest: 3 tuple: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]]
2018-11-26 08:20:16.563 o.a.s.d.executor Thread-4-$spoutcoord-spout-KAFKA_TRAMAS-executor[2 2] [INFO] Processing received message FOR 2 TUPLE: source: $mastercoord-bg0:1, stream: $batch, id: {-5934215212694827955=1827497614943510587}, [88:1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Processing received message FOR 3 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] BOLT ack TASK: 3 TIME: -1 TUPLE: source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1]
2018-11-26 08:20:16.565 o.a.s.d.executor Thread-6-__acker-executor[3 3] [INFO] Execute done TUPLE source: $mastercoord-bg0:1, stream: __ack_init, id: {}, [-5934215212694827955 1827497614943510587 1] TASK: 3 DELTA: -1


Activating the DEBUG logging on the MasterCoordinator I get these errors periodically.



2018-11-28 10:57:44.614 o.a.s.t.t.MasterBatchCoordinator Thread-8-$mastercoord-bg0-executor[1 1] [DEBUG] Fail. [tx_attempt = 18:4], [tx_status = null], [MasterBatchCoordinator{_states=[org.apache.storm.trident.topology.state.TransactionalState@6168a033], _activeTx={3=3:3 <PROCESSING>, 4=4:3 <PROCESSING>, 5=5:3 <PROCESSING>, 6=6:3 <PROCESSING>}, _attemptIds={3=3, 4=3, 5=3, 6=3, 7=3, 8=3, 9=3, 10=3, 11=3, 12=3, 13=3, 14=4, 15=4, 16=3, 17=3, 18=4, 19=7, 20=7, 21=7, 22=7, 23=7, 24=7, 25=7, 26=7, 27=7, 28=7, 29=7, 30=7, 31=7, 32=7, 33=7, 34=7, 35=7, 36=7, 37=7, 38=7, 39=7, 40=7, 41=7, 42=7, 43=6, 44=6, 45=5, 46=5, 47=5, 48=5, 49=5, 50=6, 51=5, 52=5, 53=5, 54=5, 55=5, 56=5, 57=5, 58=5, 59=5, 60=5, 61=5, 62=5, 63=5, 64=5, 65=5, 66=5, 67=5, 68=4, 69=4, 70=4, 71=4, 72=4, 73=4, 74=4, 75=4, 76=3, 77=3, 78=1, 79=1, 80=1, 81=1, 82=1, 83=1, 84=2, 85=1, 86=1, 87=1, 88=1, 89=1, 90=1, 91=1, 92=1, 93=1, 94=1, 95=1, 96=1, 97=1, 98=1, 99=1, 100=1, 101=0, 102=0}, _collector=org.apache.storm.spout.SpoutOutputCollector@1f58e2fd, _currTransaction=3, _maxTransactionActive=100, _coordinators=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Coordinator@6faffb26], _managedSpoutIds=[KAFKA_TRAMAS], _spouts=[org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor@59e6917f], _throttler=org.apache.storm.utils.WindowedTimeThrottler@7d2bdb48, _active=true}]
2018-11-28 10:57:44.616 o.a.s.d.executor Thread-8-$mastercoord-bg0-executor[1 1] [INFO] SPOUT Failing -2574568553084235376: {:stream "$batch", :values [#object[org.apache.storm.trident.topology.TransactionAttempt 0x4fce1854 "19:6"]]} REASON: TIMEOUT MSG-ID: 19:6


Aparently the batches failing are empty. If there is data in the topic when the topology starts it works because there's no empty batches beforehand, however after the first batch is committed (the one with data), the empty batches start failing and new Kafka data is not being processed because there's uncommitted transactions. If the topic is empty, it just fails constantly and it doesn't push anything to Kafka.







java apache-kafka apache-storm






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 28 '18 at 11:32







Algeel

















asked Nov 23 '18 at 11:06









AlgeelAlgeel

12




12












  • Is it the storm-kafka or storm-kafka-client spout?
    – Stig Rohde Døssing
    Nov 23 '18 at 13:23










  • @StigRohdeDøssing it's the storm-kafka-client
    – Algeel
    Nov 23 '18 at 13:31










  • Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
    – Stig Rohde Døssing
    Nov 23 '18 at 13:37










  • @StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
    – Algeel
    Nov 26 '18 at 8:48










  • The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
    – Stig Rohde Døssing
    Nov 26 '18 at 18:34


















  • Is it the storm-kafka or storm-kafka-client spout?
    – Stig Rohde Døssing
    Nov 23 '18 at 13:23










  • @StigRohdeDøssing it's the storm-kafka-client
    – Algeel
    Nov 23 '18 at 13:31










  • Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
    – Stig Rohde Døssing
    Nov 23 '18 at 13:37










  • @StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
    – Algeel
    Nov 26 '18 at 8:48










  • The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
    – Stig Rohde Døssing
    Nov 26 '18 at 18:34
















Is it the storm-kafka or storm-kafka-client spout?
– Stig Rohde Døssing
Nov 23 '18 at 13:23




Is it the storm-kafka or storm-kafka-client spout?
– Stig Rohde Døssing
Nov 23 '18 at 13:23












@StigRohdeDøssing it's the storm-kafka-client
– Algeel
Nov 23 '18 at 13:31




@StigRohdeDøssing it's the storm-kafka-client
– Algeel
Nov 23 '18 at 13:31












Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
– Stig Rohde Døssing
Nov 23 '18 at 13:37




Are you seeing anything in the worker logs? The State should log if it fails to write to Kafka. Also try posting your topology setup.
– Stig Rohde Døssing
Nov 23 '18 at 13:37












@StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
– Algeel
Nov 26 '18 at 8:48




@StigRohdeDøssing updated the post with more info. I don't get any writing failure. Actually it only saves the first batch if there's already data in the topic when i run the topology. If the topic is empty and I push data when the topology is running, it doesn't write that batch either.
– Algeel
Nov 26 '18 at 8:48












The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
– Stig Rohde Døssing
Nov 26 '18 at 18:34




The log you posted is helpful, but I think we need to enable debug logging for the master batch coordinator (the component that controls emit/commit behavior in Trident, you can see it in the log named something like $mastercoord). Could you enable debug logging for org.apache.storm.trident.topology.MasterBatchCoordinator and try again?
– Stig Rohde Døssing
Nov 26 '18 at 18:34












0






active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");

StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});

function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});


}
});














draft saved

draft discarded


















StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53445535%2ftrident-opaquekafkaspout-transactions-not-committed%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown

























0






active

oldest

votes








0






active

oldest

votes









active

oldest

votes






active

oldest

votes
















draft saved

draft discarded




















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid



  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.


To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53445535%2ftrident-opaquekafkaspout-transactions-not-committed%23new-answer', 'question_page');
}
);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

What visual should I use to simply compare current year value vs last year in Power BI desktop

How to ignore python UserWarning in pytest?

Alexandru Averescu