1.美图
2.概述
flink集成siddhi然后报错
2020-04-14 20:32:45,587 INFO com.dbapp.baas.engine.common.siddhi.operator.AbstractSiddhiControlOperator - siddhioperator #^# start rule : sumBytesInSrcAddressAppProtocol ,exp : from cepStatLogStream[((deviceSendProductName == "安恒全流量深度威胁检测系统" AND securityEyeLogType == "4") OR dataType == "flow") AND not (appProtocol is null) AND not (srcAddress is null) AND not (bytesIn is null)]#window.timeBatch(60 sec,0) select 'metric_55d3ed6e8fdb5c55' as datasetId,custom:now() as endTime,custom:uniqueEventId() as eventId,'metric' as modelType,custom:mkarray('sumBytesInSrcAddressAppProtocol') as metricIds,srcGeoCountry,sum(bytesIn) as stat_sum_bytesIn_sumBytesInSrcAddressAppProtocol,srcGeoCity,deviceSendProductName,securityEyeLogType,appProtocol,dataType,srcAddress,srcGeoRegion,'6a01945641db02c6' as filterHash,'8ec8dbefa2461d34' as groupHash,'0' as customRule,true as saveToEs group by appProtocol,srcAddress order by stat_sum_bytesIn_sumBytesInSrcAddressAppProtocol desc limit 2000 insert into outPutSecurityEvent;
2020-04-14 20:32:45,587 INFO com.dbapp.baas.engine.common.siddhi.operator.AbstractSiddhiControlOperator - siddhioperator #^# start rule : sumDeviceAddress ,exp : from cepStatLogStream[not (deviceAddress is null) AND not (eventCount is null)]#window.timeBatch(60 sec,0) select 'metric_d73a562fa17279f8' as datasetId,custom:now() as endTime,custom:uniqueEventId() as eventId,'metric' as modelType,custom:mkarray('sumDeviceAddress','countDeviceAddress') as metricIds,deviceAddress,count() as stat_count__countDeviceAddress,sum(eventCount) as stat_sum_eventCount_sumDeviceAddress,'8f00b204e9800998' as filterHash,'c5fac874db35b04a' as groupHash,'0' as customRule,true as saveToEs group by deviceAddress order by stat_count__countDeviceAddress,stat_sum_eventCount_sumDeviceAddress desc limit 2000 insert into outPutSecurityEvent;
2020-04-14 20:33:00,158 ERROR org.wso2.siddhi.core.stream.output.StreamCallback - Error on sending events[Event{timestamp=1586867531460, data=[metric_c6536da6187a18e2, 2020-04-14 20:33:00, 5370483738347311424, metric, ["sumBytesOutMonitor","sumBytesInMonitor"], 32650956, null, 8898605, flow, 7288767324f8d900, 8f00b204e9800998, 0, true], isExpired=false}]
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at com.dbapp.baas.engine.common.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:69)
at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:106)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:82)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:74)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at org.wso2.siddhi.core.query.processor.stream.window.TimeBatchWindowProcessor.process(TimeBatchWindowProcessor.java:251)
at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:66)
at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:123)
at org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:50)
at org.wso2.siddhi.core.util.Scheduler.sendTimerEvents(Scheduler.java:140)
at org.wso2.siddhi.core.util.SystemTimeBasedScheduler$EventCaller.run(SystemTimeBasedScheduler.java:95)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 27 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 33 more
2020-04-14 20:33:00,176 ERROR org.wso2.siddhi.core.stream.output.StreamCallback - Error on sending events[Event{timestamp=1586867531249, data=[metric_3910b0767be2c97e, 2020-04-14 20:33:00, 5370483733496040616, metric, ["innerZoneSumVisitCount","innerZoneSumVisitByteOut"], http, inner_886da035-c033-46b8-8ce8-b31e03db7ab2_1537173568979, 2532077, inner_886da035-c033-46b8-8ce8-b31e03db7ab2_1537173568979, 2442, 00, 30626f692cfaf26c, fcdf5c91c4aa2f98, 0, true], isExpired=false}]
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at com.dbapp.baas.engine.common.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:69)
at org.wso2.siddhi.core.stream.output.StreamCallback.receiveEvents(StreamCallback.java:106)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:82)
at org.wso2.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:74)
at org.wso2.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:129)
at org.wso2.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:353)
at org.wso2.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at org.wso2.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:88)
at org.wso2.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:46)
at org.wso2.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at org.wso2.siddhi.core.query.processor.stream.window.TimeBatchWindowProcessor.process(TimeBatchWindowProcessor.java:251)
at org.wso2.siddhi.core.query.processor.stream.window.WindowProcessor.processEventChunk(WindowProcessor.java:66)
at org.wso2.siddhi.core.query.processor.stream.AbstractStreamProcessor.process(AbstractStreamProcessor.java:123)
at org.wso2.siddhi.core.query.input.stream.single.EntryValveProcessor.process(EntryValveProcessor.java:50)
at org.wso2.siddhi.core.util.Scheduler.sendTimerEvents(Scheduler.java:140)
at org.wso2.siddhi.core.util.SystemTimeBasedScheduler$EventCaller.run(SystemTimeBasedScheduler.java:95)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 27 more
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:239)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:213)
at org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:181)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:256)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.getBufferBuilder(RecordWriter.java:249)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:169)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:154)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:120)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
... 33 more
第这行是at com.dbapp.baas.engine.common.siddhi.operator.StreamOutputHandler.receive(StreamOutputHandler.java:69)
这个是out.collect(xxx)
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.util;
import org.apache.flink.annotation.Public;
/**
* Collects a record and forwards it. The collector is the "push" counterpart of the
* {@link java.util.Iterator}, which "pulls" data in.
*/
@Public
public interface Collector<T> {
/**
* Emits a record.
*
* @param record The record to collect.
*/
void collect(T record);
/**
* Closes the collector. If any data was buffered, that data will be flushed.
*/
void close();
}
意思是siddhi处理完了,要交给Flink的程序继续处理了,然后报错如上。