pulsar的介绍
Apache Pulsar is a cloud-native, distributed messaging and streaming platform originally created at Yahoo! and now a top-level Apache Software Foundation project
这是官网的一段介绍说Apache Pulsar是一个云原生分布式消息和流处理平台,最初创建于雅虎!现在是Apache软件基金会的顶级项目
关于具体pulsar的详细内容,大家可以通过官方文档来学习
需求
在更新主订单运输状态的时候,有个业务需求比如主订单的明细行有A,B,C,D四个物料,A,B物料行对应运输单号T0001,C,D物料对应运输单号T0002。后台业务有个逻辑就是只有当所有物料对应的运输单号被签收的时候,主订单运输状态才能关闭。
原有消息消费者是使用Shared(共享)模式消费的,官网对于该模式是这样解释的,消息通过round robin轮询机制分发给不同的消费者,并且每个消息仅会被分发给一个消费者。 当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。
那这种模式在需求中,有哪些问题呢?首先服务是多节点部署的(消费者假如有2个),现在上游服务推送一条消息(A,B物料运输单号T0001),接着推送一条消息(C,D物料运输单号T0002),这两个消费者同时消费这块逻辑,那我们的主订单状态判断的逻辑就会出现问题呢或者处理起来很麻烦。
那消息中间件pulsar怎么来简单完成实现这样的需求呢?我当时第一想法可以使用Exclusive独占模式,但是由于对于消费的吞吐量的影响来说,我也就直接放弃了。根据之前对于kafka的印象,我想了下pulsar有没有分区这样的内容呢?果然,我找到了分区topic来实现这个问题消息 · Apache Pulsar
生产者
通过admin API指定创建的topic分区并且创建的时候可以指明分区的数量。
生产者发送分区消息封装的类
@Slf4j
@Component
public class ProducerPartionFactory {
@Autowired
private PulsarClient client;
@Autowired
private PulsarAdmin pulsarAdmin;
// 消息生产者集合
private ConcurrentHashMap<String, Producer<byte[]>> producerMap = new ConcurrentHashMap<>();
private Producer<byte[]> getTheProducer(PulsarProducePartionRoter pulsarProducePartionRoter) throws Exception {
String topic = pulsarProducePartionRoter.getTopic();
Producer<byte[]> producer = producerMap.get(topic);
if (producer == null) {
synchronized (topic.intern()) {
producer = producerMap.get(topic);
if (producer == null) {
PartitionedTopicMetadata metadata = null;
String localIpAddress = null;
try {
metadata = pulsarAdmin.topics().getPartitionedTopicMetadata("persistent://public/test/" + topic);
if (metadata.partitions == 0) {
//使用pulsarAdmin来初始化创建分区消息,这里的分区建议按照消费者数量或者消费者数量*n来定义
pulsarAdmin.topics().createPartitionedTopic("persistent://public/test/" + topic, 4);
}
//因为我们这里是多个节点,这里使用ip地址来区别我们不同的生产者名称
localIpAddress = IPUtils.getLocalIpAddress();
if(StringUtils.isNotBlank(localIpAddress)){
localIpAddress.replaceAll("\\.","_");
}
} catch (Exception ex) {
log.error("创建分区主题异常:{}", ex);
throw ex;
}
try {
// topic 生产者消息写到哪个主题中
// producerName 生产者的名字,不唯一即可,如果不设置,会有默认值
// sendTimeout 超时时间
// 默认情况下,当队列已满时,所有对Send和SendAsync方法的调用都将失败,除非您将BlockIfQueueFull设置为true
producer = client.newProducer(Schema.BYTES)
.topic("persistent://public/test/" + topic)
.producerName(topic + localIpAddress)
.sendTimeout(10, TimeUnit.SECONDS)
// .blockIfQueueFull(true)
.create();
producerMap.putIfAbsent(topic, producer);
return producer;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
return producer;
}
public boolean send(PulsarProducePartionRoter pulsarProducePartionRoter, String message) {
boolean isSuccess = false;
try {
Producer<byte[]> producer = this.getTheProducer(pulsarProducePartionRoter);
MessageId messageId = producer.newMessage().key(pulsarProducePartionRoter.getKey()).value(message.getBytes()).send();
log.info("获取分区发送信息:{}", JSON.toJSONString(messageId));
if (messageId != null) {
isSuccess = true;
}
} catch (Exception e) {
log.error("pulsar发送异常:", e);
}
return isSuccess;
}
}
@Data
public class PulsarProducePartionRoter {
private String topic;
private String key;
}
IPUtils工具类
public class IPUtils {
/**
* 获取本地IP地址
*
* @return 本地IP地址
*/
public static String getLocalIpAddress() {
try {
Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
// 排除回环和虚拟接口
if (iface.isLoopback() || iface.isVirtual() || !iface.isUp()) {
continue;
}
Enumeration<InetAddress> addresses = iface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress addr = addresses.nextElement();
if (!addr.isLinkLocalAddress() && !addr.isLoopbackAddress() && addr.isSiteLocalAddress()) {
return addr.getHostAddress();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
生产者发送消息的一段逻辑
String topic ="KEY_NEW_ORDER_SIGN";
PulsarProducePartionRoter pulsarProducePartionRoter = new PulsarProducePartionRoter();
pulsarProducePartionRoter.setTopic(topic);
pulsarProducePartionRoter.setKey(requestBean.getStoreNo());
boolean isSuccess = producerPartionFactory.send(pulsarProducePartionRoter, JSON.toJSONString(requestBean));
我们可以看到相同的stroeNo 分到了相同的分区编号
消费者
我们在项目启动完成后,启动消费者线程池来消费分区消息业务逻辑
@Slf4j
@Component
public class SignKeyNewShardConsumerLister implements CommandLineRunner {
private volatile boolean isRunning = true;
private ThreadPoolExecutor threadPoolExecutor = null;
@Autowired
private PulsarClient pulsarClient;
private final EmitterProcessor<FailedMessage> exceptionEmitter = EmitterProcessor.create();
private Consumer<byte[]> consumer;
@Autowired
private PulsarAdmin pulsarAdmin;
@Override
public void run(String... args) throws Exception {
threadPoolExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
try {
PartitionedTopicMetadata metadata = null;
String topic = null;
try {
topic = "persistent://public/test/KEY_NEW_ORDER_SIGN" ;
metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topic);
if (metadata.partitions == 0) {
pulsarAdmin.topics().createPartitionedTopic(topic, 4);
}
} catch (Exception ex) {
if (ex.getMessage().contains("Topic not exist")) {
pulsarAdmin.topics().createPartitionedTopic(topic, 4);
} else {
log.error("创建分区主题异常:{}", ex);
throw ex;
}
}
consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName(Topic.KEY_NEW_ORDER_SIGN.getCode())
.receiverQueueSize(1)
.subscriptionType(SubscriptionType.Key_Shared)
.keySharedPolicy(KeySharedPolicy.KeySharedPolicySticky.autoSplitHashRange())
// .autoUpdatePartitions(true)
.subscribe();
} catch (Exception e) {
log.error("初始化客户端SignConsumerLister 异常", e);
}
threadPoolExecutor.submit(() -> {
Message<byte[]> receiveMsg = null;
while (isRunning) {
try {
receiveMsg = consumer.receive();
if (receiveMsg != null) {
try {
// todo 执行业务逻辑
} catch (Exception ex) {
log.error("处理签收业务异常:{}", ex);
}
consumer.acknowledge(receiveMsg);
}
} catch (Exception e) {
log.error("SignConsumerLister exec processMessage Exception: {}", e);
consumer.negativeAcknowledge(receiveMsg);
exceptionEmitter.onNext(new FailedMessage(e, consumer, receiveMsg));
}
}
});
}
@PreDestroy
public void doDestroyListen() {
isRunning = false;
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdown();
}
try {
consumer.close();
} catch (Exception e) {
log.error("SignConsumerLister exec destroy Exception: {}", e);
}
}
}
我们也看到了消费者业务在相同的一台实例节点处理了这条消息并且顺序消费