1.CEP(Complex Event Processing复杂事件处理)
事件(Event):一般情况下指的是一个系统中正在发生的事,事件可能发生在系统的各个层面上,它可以是某个动作,例如客户下单,发送消息,提交报告等,也可以是某种状态的改变,例如温度的变化,超时等等。事件处理(Event processing)指的是跟踪系统中发生的事件,分析事件中的信息并从中得到某种结论。而复杂事件处理,则是结合多个事件源中的事件,从中推断出更加复杂的情况下的事件。
实现一个CEP引擎我们需要思考的问题:(1)吞吐量;(2)低延迟;(3)复杂逻辑处理。
2.Esper
Esper是一个开源的复杂事件处理引擎,它的目的是让用户能够通过它提供的接口,构建一个用于处理复杂事件的应用程序
从Esper的架构图中我们可以看出,它包含三个部分:Input adapter,Esper engine,Output adapter。
Input adapter 和 Output adapter
输入适配器和输出适配器的主要目的是接收来自不同事件源的事件,并向不同的目的地输出事件。 目前,Esper提供的适配器包括File Input and Output adpter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter等等。这些适配器提供了一系列接口,可以让用户从不同的数据源读取数据,并将数据发送给不同的目的数据源,用户可以不用自己单独编写客户端代码来连接这些数据源,感觉相当于对这些数据源提供了一层封装。
Esper engine
Esper引擎是处理事件的核心,它允许用户定义需要接收的事件以及对这些事件的处理方式。
Esper支持的事件表现形式
Esper支持多种事件表现形势:包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java对象),实现了Map接口的对象,对象数组,XML文档对象,以及Apache Avro(一个支持JSON和Schema的数据序列化系统,可以将数据结构或对象转化成便于存储和传输的格式)。 这些事件表现形式的共同之处在于,它们都提供了事件类型的元数据,也就是说能够表示事件的一系列属性,例如,一个Java对象可以通过其成员变量来表示其事件属性,一个Map对象能够通过键值对来表示属性。由此可见,本质上事件是一系列属性值的集合,对事件的操作即对事件中的部分或全部属性的操作。
Esper事件处理模型
Esper事件处理模型主要包含两部分:(1)Statement 利用Esper的事件处理语言EPL声明对事件进行的操作,Esper中提供了多种类型的事件操作,包括过滤、加窗、事件聚合等等。EPL是一种类似于SQL的语言,从这一点上来看,Esper恰好与数据库相反,数据库时保存数据,并在数据上运行查询语句,而Esper是保存查询语句,在这些查询上运行数据,只要事件与查询条件匹配,Esper就会实时进行处理,而不是只有在查询提交的时候才处理。(2)Listener用于监听事件的处理情况,接收事件处理的结果,通过UpdateListener接口来实现,它相当于一个回调函数,当事件处理完成之后,可以通过该回调函数向结果发送到目的地。
3.实例
举例一个用户注册事件:定义事件对象包含name和age属性实现get方法,实时统计注册用户的平均年龄。
定义事件类
public class MyEvent {
private String name;
private int age;
public MyEvent() {
}
public MyEvent(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
测试对事件的操作
public class EsperTest {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.addEventType("myEvent", MyEvent.class);
EPServiceProvider provider = EPServiceProviderManager.getDefaultProvider(conf);
EPAdministrator admin = provider.getEPAdministrator();
EPStatement statement = admin.createEPL("select name,age,avg(age) as avgAge from myEvent");
statement.addListener(new UpdateListener() {
@Override
public void update(EventBean[] newEvents, EventBean[] oldEvents) {
for (EventBean eb : newEvents) {
System.out.println("注册用户姓名:"+eb.get("name")+",注册用户年龄:"+eb.get("age")+",平均年龄:"+eb.get("avgAge"));
}
}
});
EPRuntime epr = provider.getEPRuntime();
epr.sendEvent(new MyEvent("sean", 30));
epr.sendEvent(new MyEvent("sime", 25));
epr.sendEvent(new MyEvent("jack",28));
epr.sendEvent(new MyEvent("lulcy",26));
epr.sendEvent(new MyEvent("duck", 35));
}
}
执行结果
注册用户姓名:sean,注册用户年龄:30,平均年龄:30.0
注册用户姓名:sime,注册用户年龄:25,平均年龄:27.5
注册用户姓名:jack,注册用户年龄:28,平均年龄:27.666666666666668
注册用户姓名:lulcy,注册用户年龄:26,平均年龄:27.25
注册用户姓名:duck,注册用户年龄:35,平均年龄:28.8
转载:https://www.cnblogs.com/yitudake/p/6747990.html