1.美图
2.背景
因为这个问题点击,我把
synchronized (output){
output.collect(record)
}
改成如下,然后报错
// synchronized (output){
output.collect(record)
// }
错误详情如下
2020-04-15 10:46:00,970 ERROR org.wso2.siddhi.core.stream.output.StreamCallback - Error on sending events[Event{timestamp=1586918759506, data=[metric_3910b0767be2c97e, 2020-04-15 10:46:00, 5370913091060203300, metric, ["innerZoneSumVisitCount","innerZoneSumVisitByteOut"], http, inner_886da035-c033-46b8-8ce8-b31e03db7ab2_1537173568979, 35037498, inner_886da035-c033-46b8-8ce8-b31e03db7ab2_1537173568979, 9087, 00, 30626f692cfaf26c, fcdf5c91c4aa2f98, 0, true], isExpired=false}]
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at com.dbapp.baas.engine.common.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:69)
at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:106)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:82)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:74)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at org.wso2.siddhi.core.query.processor.stream.window.TimeBatchWindowProcessor.process(TimeBatchWindowProcessor.java:251)
at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:66)
at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:123)
at org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:50)
at org.wso2.siddhi.core.util.Scheduler.sendTimerEvents(Scheduler.java:140)
at org.wso2.siddhi.core.util.SystemTimeBasedScheduler$EventCaller.run(SystemTimeBasedScheduler.java:95)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 27 more
Caused by: java.lang.IllegalStateException: The Kryo Output still contains data from a previous serialize call. It has to be flushed or cleared at the end of the serialize call.
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:300)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:152)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 33 more
2020-04-15 10:46:01,002 ERROR org.wso2.siddhi.core.stream.output.StreamCallback - Error on sending events[Event{timestamp=1586918759507, data=[metric_f00436b8ce6df0b6, 2020-04-15 10:46:00, 5370913091060929569, metric,
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at com.dbapp.baas.engine.common.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:69)
at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:106)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:76)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at org.wso2.siddhi.core.query.processor.stream.window.TimeBatchWindowProcessor.process(TimeBatchWindowProcessor.java:251)
at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:66)
at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:123)
at org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:50)
at org.wso2.siddhi.core.util.Scheduler.sendTimerEvents(Scheduler.java:140)
at org.wso2.siddhi.core.util.SystemTimeBasedScheduler$EventCaller.run(SystemTimeBasedScheduler.java:95)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:157)
at com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
... 26 more