Producer (消息生产者)
Consumer (消息消费者)
Topic (消息的主题)
Partition (分区)
Message (消息)
Broker (就是meta的服务端)
Group (消费者分组)
Offset (消息偏移量)
下载地址 http://fnil.net/downloads/index.html
GitHub地址 https://github.com/killme2008/Metamorphosis
主要内容
1. Meta是什么,特征和适用场景
2. 在公司的应用状况
3. 原理和内部实现
4. Meta的使用
5. 性能
Metamorphosis是什么?
A distributed publish-subscribe messaging system
开源MQ-kafka的Java版本
Linkedin开源的MQ
《The metamorphosis》——卡夫卡的代表作
设计原则
消息都是持久的,保存在磁盘
吞吐量第一
消费状态保存在客户端
分布式,生产者、服务器和消费者都可分布。
跟kafka有什么不同?
• 添加了实时统计功能和协议
Meta有什么特性?
Meta能做什么
Meta应用
Meta的部署结构
Meta的系统结构
主要概念的对应关系
Producer和Broker之间的负载均衡
同一Group的Consumer和Broker之间的负载均衡
同一Group的Consumer和Broker之间的负载均衡
异步发送消息
异步发送消息 流量控制
异步发送消息 流量控制
服务端FAQ
发送端FAQ
消费者的FAQ
消费者的FAQ
消费者的FAQ
消费者的FAQ
Meta的使用 发送消息
// New session factory,强烈建议使用单例
MessageSessionFactory sessionFactory = new MetaMessageSessionFactory(new MetaClientConfig());
// create producer, 强烈建议使用单例
MessageProducer producer = sessionFactory.createProducer();
// publish topic
final String topic = “meta-test”;
//调一次就够了
producer.publish(topic);
byte []data=...
// send message
SendResult sendResult = producer.sendMessage(new Message(topic,data));
Meta的使用 异步发送
// New session factory,强烈建议使用单例
AsyncMessageSessionFactory sessionFactory = new AsyncMetaMessageSessionFactory(new MetaClientConfig());
// create producer, 强烈建议使用单例
MessageProducer producer = sessionFactory. createAsyncProducer();
// publish topic
final String topic = “meta-test”;
//调一次就够了
producer.publish(topic);
byte []data=...
// send message
SendResult sendResult = producer.sendMessage(new Message(topic,data));
Meta的使用 同步消费
// subscribed topic
final String topic = “meta-test”;
// consumer group
final String group = “meta-example”;
// create consumer
MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
// start offset
long offset = 0;
MessageIterator it = null;
// fetch messages
while ((it = consumer.get(topic, new Partition("0-0"), offset, 1024 * 1024)) != null) {
while (it.hasNext()) {
final Message msg = it.next();
System.out.println("Receive message " + new String(msg.getData()));
}
// move offset forward
offset += it.getOffset();
}
Meta的使用 异步消费(推荐)
// subscribed topic
final String topic = "meta-test";
// consumer group
final String group = “meta-example”;
// create consumer , 强烈建议使用单例
MessageConsumer consumer = sessionFactory.createConsumer(new ConsumerConfig(group));
// subscribe topic
consumer.subscribe(topic, 1024 * 1024, new MessageListener() {
public void recieveMessages(Message message) {
System.out.println(“Receive message ” + new String(message.getData()));
}
public Executor getExecutor() {
// Thread pool to process messages,maybe null.
return null;
}
});
// complete subscribe,调用一次就够了
consumer.completeSubscribe();
Meta的使用 广播方式接收消息
// New session factory,强烈建议使用单例
BroadcastMessageSessionFactory sessionFactory = new MetaBroadcastMessageSessionFactory (new MetaClientConfig());
// create broadcast consumer , 强烈建议使用单例
MessageConsumer consumer = sessionFactory. createBroadcastConsumer(new ConsumerConfig(group));
Meta的使用 高级使用方式
Meta的性能
测试场景 |
刷盘阀值 |
消息大小 |
平均每次发送消耗时间 |
每秒钟发送的消息数 |
CPU Utilization (服务端) |
IO wait(服务端) |
Average Load(服务端) |
单台Meta,10个同组 |
默认 |
256 |
2.3ms |
45000 |
13% |
0.15 |
2.5 |
单台Meta,10个同组 |
默认 |
2k |
4ms |
26000 |
14% |
1 |
3.5 |
单台Meta,10个同组 |
一条一刷 |
2k |
20.1ms |
4600 |
5% |
0.17 |
10.8 |
单台Meta,10个同组 |
默认 |
4k |
6.4ms |
16000 |
12% |
1 |
2 |
单台Meta,10个不同组 |
默认 |
4k |
6.3ms |
15800 |
17% |
0.4 |
6 |
单台Meta,本地事务发送 |
事务日志由操作系统决定刷盘 |
4k |
3.5ms |
6750 |
15% |
0.59 |
7.5 |
单台Meta,异步发送 |
默认 |
4k |
0.015ms |
12483 |
10% |
0.55 |
2.1 |
单台Meta,异步发送 |
默认 |
4k |
3ms |
33000-39000 |
11% |
0.7 |
2.1 |