【Flink】The Kryo Output still contains data from a previous serialize call. It has to be flushed or

1.美图

在这里插入图片描述

2.背景

因为这个问题点击,我把

synchronized (output){
	output.collect(record)
}

改成如下,然后报错

// synchronized (output){
	output.collect(record)
// }

错误详情如下

2020-04-15 10:46:00,970 ERROR org.wso2.siddhi.core.stream.output.StreamCallback             - Error on sending events[Event{timestamp=1586918759506, data=[metric_3910b0767be2c97e, 2020-04-15 10:46:00, 5370913091060203300, metric, ["innerZoneSumVisitCount","innerZoneSumVisitByteOut"], http, inner_886da035-c033-46b8-8ce8-b31e03db7ab2_1537173568979, 35037498, inner_886da035-c033-46b8-8ce8-b31e03db7ab2_1537173568979, 9087, 00, 30626f692cfaf26c, fcdf5c91c4aa2f98, 0, true], isExpired=false}]
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
	at com.dbapp.baas.engine.common.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:69)
	at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:106)
	at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:82)
	at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:74)
	at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
	at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
	at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
	at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
	at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
	at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
	at org.wso2.siddhi.core.query.processor.stream.window.TimeBatchWindowProcessor.process(TimeBatchWindowProcessor.java:251)
	at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:66)
	at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:123)
	at org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:50)
	at org.wso2.siddhi.core.util.Scheduler.sendTimerEvents(Scheduler.java:140)
	at org.wso2.siddhi.core.util.SystemTimeBasedScheduler$EventCaller.run(SystemTimeBasedScheduler.java:95)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call.
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
	at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
	... 27 more
Caused by: java.lang.IllegalStateException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call.
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:300)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
	at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
	at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
	at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:152)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
	... 33 more
2020-04-15 10:46:01,002 ERROR org.wso2.siddhi.core.stream.output.StreamCallback             - Error on sending events[Event{timestamp=1586918759507, data=[metric_f00436b8ce6df0b6, 2020-04-15 10:46:00, 5370913091060929569, metric, 




org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
	at com.dbapp.baas.engine.common.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:69)
	at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:106)
	at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:76)
	at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
	at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
	at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
	at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
	at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
	at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
	at org.wso2.siddhi.core.query.processor.stream.window.TimeBatchWindowProcessor.process(TimeBatchWindowProcessor.java:251)
	at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:66)
	at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:123)
	at org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:50)
	at org.wso2.siddhi.core.util.Scheduler.sendTimerEvents(Scheduler.java:140)
	at org.wso2.siddhi.core.util.SystemTimeBasedScheduler$EventCaller.run(SystemTimeBasedScheduler.java:95)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
	at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
	at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
	at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
	at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
	at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
	... 26 more
发布了1236 篇原创文章 · 获赞 464 · 访问量 157万+

猜你喜欢

转载自blog.csdn.net/qq_21383435/article/details/105532214