一、Pulsar架构和组件
1、分层架构
Pulsar的整体架构采用了分层设计,这种分层架构为可伸缩性、可靠性和灵活性提供了很大的优势。
1.1 Pulsar的整体架构
Pulsar的整体架构包含多个关键组件,其中包括:
-
Broker(代理): Brokers是Pulsar的核心组件,负责接收、存储和传递消息。Pulsar集群包含多个Broker,它们分布在整个系统中,形成一个高度可伸缩的消息传递网络。
-
ZooKeeper: Pulsar使用ZooKeeper来进行集群协调和元数据管理。ZooKeeper维护了Pulsar集群的配置信息、主题信息以及活跃的Broker列表。
-
BookKeeper: BookKeeper是Pulsar的存储层,用于持久化消息。它提供了高度可靠的分布式日志存储,确保消息的持久性和可靠性。
-
Pulsar Proxy: Pulsar Proxy是一个可选的组件,允许客户端通过代理访问Pulsar集群。它提供了负载均衡、安全性和协议转换等功能。
-
Pulsar Functions: Pulsar Functions是一个用于处理和转换消息的服务器less计算框架。它允许用户以简单的方式编写和部署函数来处理消息流。
1.2 分层架构的优势
-
可伸缩性: 分层架构使得Pulsar能够轻松扩展,通过增加Broker来提高消息处理能力。每个Broker负责特定范围的主题分区,实现了水平扩展。
-
可靠性: 使用BookKeeper作为存储层确保了消息的可靠性和持久性。分层设计还使得系统能够更好地应对部分组件的故障,从而提高整体可用性。
-
灵活性: 分层设计使得Pulsar能够灵活地适应不同的场景和需求。例如,可以通过添加代理层实现负载均衡,通过引入函数计算层实现消息处理逻辑的扩展。
2、BookKeeper和ZooKeeper
Pulsar的架构中涉及两个重要的组件:BookKeeper和ZooKeeper。它们分别扮演着存储层和协调层的角色,为Pulsar提供了可靠性和协同管理的支持。
2.1 BookKeeper的角色
-
存储层: BookKeeper是Pulsar的存储层组件,负责持久化消息。它采用分布式日志存储的方式,将消息以日志的形式存储在多个Bookie节点上。这种设计确保了消息的可靠性和持久性,即使在节点故障的情况下也能保证消息不丢失。
-
分布式日志: BookKeeper使用分布式日志来记录消息的写入操作,这使得多个Broker能够协同工作,同时确保消息的有序性。每个主题的消息都被分割成多个分区,并在BookKeeper中进行存储。
2.2 ZooKeeper的角色
-
协调层: ZooKeeper是Pulsar的协调层组件,用于进行集群协调和元数据管理。Pulsar集群的配置信息、主题信息以及活跃的Broker列表等关键元数据都由ZooKeeper进行管理和维护。
-
故障检测与恢复: ZooKeeper负责检测Pulsar集群中的节点故障,并进行相应的故障恢复。如果某个Broker节点出现故障,ZooKeeper将通知其他节点,并协助进行重新分配分区,确保系统的高可用性。
2.3 示例
在Go语言中,与BookKeeper和ZooKeeper交互的示例通常涉及使用相应的客户端库。
示例演示如何使用Go语言创建一个ZooKeeper连接和一个BookKeeper客户端:
package main
import (
"fmt"
"github.com/samuel/go-zookeeper/zk"
"github.com/apache/bookkeeper/bkclient-go/bkclient"
)
func main() {
// 连接到ZooKeeper
zkConn, _, err := zk.Connect([]string{
"localhost:2181"}, 5000)
if err != nil {
fmt.Println("Error connecting to ZooKeeper:", err)
return
}
defer zkConn.Close()
// 创建BookKeeper客户端
bkClient, err := bkclient.New(&bkclient.ClientConfig{
ZkServers: []string{
"localhost:2181"},
BkEnsemble: 3,
BkWriteQuorum: 2,
BkAckQuorum: 2,
})
if err != nil {
fmt.Println("Error creating BookKeeper client:", err)
return
}
defer bkClient.Close()
// 可以使用ZooKeeper和BookKeeper客户端进行更多的操作,例如创建主题、发布消息等。
fmt.Println("ZooKeeper and BookKeeper connection established successfully.")
}
3、Broker服务
Pulsar的架构中,Broker是核心组件之一,承担着接收、存储和传递消息的关键角色。
3.1 Broker的功能
-
消息接收: Broker负责接收生产者发送的消息。生产者将消息发送到特定的主题(Topic),而Broker负责接收这些消息并进行处理。
-
消息存储: 接收到的消息会被Broker存储在本地或通过BookKeeper进行持久化。这样即使Broker节点发生故障,消息也能够得到保留,确保消息的可靠性和持久性。
-
消息传递: 一旦消息被存储,Broker负责将消息传递给订阅了相关主题的消费者。这涉及到消息的路由、分发和传输。
-
主题和分区管理: Broker管理主题的创建、删除以及与分区的关联。主题可以被分为多个分区,每个分区由不同的Broker进行管理,以实现水平扩展。
-
消费者协调: Broker与消费者进行协调,确保消息按照正确的顺序和适当的策略被传递给消费者。这包括负载均衡、故障恢复等方面的工作。
3.2 Broker的工作流程
-
接收消息: 当生产者发送消息到Pulsar集群时,消息首先到达Broker。每个Broker负责接收属于自己管理范围内的主题和分区的消息。
-
消息存储: 接收到的消息会被存储在本地,同时也可以选择通过BookKeeper进行持久化。这确保了消息在发生Broker节点故障时仍然可用。
-
消息传递: 一旦消息被存储,Broker负责将消息传递给相应主题的订阅者。这可能涉及到消息的路由,以确保消息被正确地传递给订阅了相应主题的消费者。
-
主题和分区管理: Broker管理主题的创建、删除和分区的分配。它维护了主题和分区的元数据,以便在系统中进行正确的资源分配。
-
消费者协调: Broker与消费者协调,确保消息以正确的顺序和策略传递给消费者。这包括处理消费者的加入、退出、负载均衡以及故障恢复等情况。
4、Pulsar IO(连接器)
Pulsar IO是Pulsar的一个重要组件,也称为连接器(Connectors),它提供了一种灵活的方式来集成Pulsar与外部系统,实现数据的输入和输出。这使得Pulsar能够与各种数据源和目标系统进行集成,从而支持更广泛的应用场景。
4.1 Pulsar IO的概念
-
连接器: Pulsar IO连接器是用于将Pulsar与其他数据存储、处理系统连接起来的组件。它们可以用于将数据从外部系统导入Pulsar,也可以将Pulsar中的数据导出到其他系统中。
-
Source连接器: Source连接器负责从外部系统读取数据并将其推送到Pulsar的主题中。这样可以实现将外部系统的数据引入到Pulsar中,以便进行进一步的处理和分发。
-
Sink连接器: Sink连接器负责将Pulsar中的数据导出到外部系统中。通过Sink连接器,可以将Pulsar中的消息传输到目标系统,实现数据的持久化、分析或其他处理。
4.2 Pulsar IO的使用场景
-
数据集成: Pulsar IO连接器可用于将数据从各种数据源引入Pulsar,实现不同系统之间的数据集成。这对于构建数据湖、数据仓库或数据流处理平台非常有用。
-
异构系统集成: Pulsar IO连接器支持与各种异构系统的集成,包括关系型数据库、NoSQL数据库、消息队列等。这使得 Pulsar 成为一个通用的中间件,能够在不同系统之间进行数据交换。
-
实时数据处理: 使用Source连接器,可以将实时产生的数据源接入 Pulsar,并通过 Pulsar 的分布式消息传递机制进行高效处理。Sink连接器则可以将处理结果输出到其他系统,实现实时数据处理流程。
4.3 示例
假设我们有一个 Kafka 主题 “kafka-source-topic”,并且我们想要将其消息导入到 Pulsar 中的主题 “pulsar-destination-topic”。
示例
:
使用 Pulsar IO 的 Kafka Source 连接器:
-
准备工作:
- 安装 Pulsar 和 Kafka。
- 确保 Pulsar IO 的 Kafka Source 连接器已经安装和配置。
-
创建 Kafka 生产者:
package main import ( "github.com/confluentinc/confluent-kafka-go/kafka" "fmt" ) func main() { p, err := kafka.NewProducer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", }) if err != nil { fmt.Printf("Failed to create Kafka producer: %s\n", err) return } defer p.Close() // 生产者发送消息到 Kafka 主题 err = p.Produce(&kafka.Message{ TopicPartition: kafka.TopicPartition{ Topic: &"kafka-source-topic", Partition: kafka.PartitionAny}, Value: []byte("Message from Kafka"), }, nil) if err != nil { fmt.Printf("Failed to produce message: %s\n", err) } // 等待消息发送完成 p.Flush(5000) }
以上代码使用 Confluent Go 客户端创建了一个 Kafka 生产者,并发送了一条消息到 “kafka-source-topic” 主题。
-
配置并启动 Kafka Source 连接器:
-
配置文件示例(kafka-source-config.yaml):
tenant: public namespace: default name: kafka-source className: org.apache.pulsar.io.kafka.KafkaSource topic: pulsar-destination-topic kafkaTopic: kafka-source-topic bootstrapServers: localhost:9092
-
启动连接器:
bin/pulsar-admin source create --name kafka-source \ --destination-topic-name pulsar-destination-topic \ --source-type org.apache.pulsar.io.kafka.KafkaSource \ --source-config-file kafka-source-config.yaml
-
-
观察结果:
- 在 Pulsar 中消费者订阅 “pulsar-destination-topic” 主题,以接收从 Kafka 导入的消息。
// 创建 Pulsar 消费者 consumer, _ := client.Subscribe(pulsar.ConsumerOptions{ Topic: "pulsar-destination-topic", SubscriptionName: "my-subscription", }) defer consumer.Close() // 接收消息 msg, _ := consumer.Receive(context.Background()) fmt.Printf("Received message from Pulsar: %s\n", string(msg.Payload())) // 手动确认消息 consumer.Ack(msg)
这个示例演示了使用 Pulsar IO 的 Kafka Source 连接器将消息从 Kafka 主题导入到 Pulsar 中。
二、Pulsar的安装和配置
1、安装步骤
下面说明在 Windows、Linux 和 macOS 上进行 Pulsar 的单节点安装和集群安装。
1.1 单节点安装
-
下载 Pulsar:
- 从 Apache Pulsar 的官方网站(https://pulsar.apache.org/download/)下载最新版本的 Pulsar 发行版。选择适合您操作系统的二进制发行版并下载。
-
解压 Pulsar:
- 解压下载的 Pulsar 发行版文件到您选择的目录。您会得到一个包含 Pulsar 文件的文件夹。
-
配置 Pulsar:
- 进入 Pulsar 解压目录,找到
conf
目录,然后编辑standalone.conf
文件。配置文件中包含了 Pulsar 的各种参数,例如网络端口、存储路径等。至少需要配置brokerServicePort
和webServicePort
。
# standalone.conf # Pulsar broker service port brokerServicePort=6650 # Pulsar web service port webServicePort=8080
- 进入 Pulsar 解压目录,找到
-
启动 Pulsar:
-
在命令行中进入 Pulsar 解压目录,运行以下命令启动 Pulsar Standalone:
-
Windows:
bin\pulsar standalone
-
Linux/Mac:
bin/pulsar standalone
-
-
验证安装:
- 使用 Pulsar 提供的命令行工具或客户端 API 连接到 Pulsar 实例,确保它正常运行并可访问。
1.2 集群安装
-
复制 Pulsar:
- 将单节点安装中的 Pulsar 目录复制到所有集群节点。确保所有节点上的 Pulsar 版本一致。
-
配置集群:
- 在每个节点上编辑
broker.conf
文件,配置集群的相关信息,包括advertisedAddress
、brokerServicePort
、webServicePort
等。确保所有节点的配置相似,但advertisedAddress
需要设置为每个节点的实际 IP 地址。
# broker.conf # Advertised address for the broker advertisedAddress=<Node_IP_Address> # Pulsar broker service port brokerServicePort=6650 # Pulsar web service port webServicePort=8080
- 在每个节点上编辑
-
启动集群:
-
在每个节点上运行以下命令启动 Pulsar Broker:
-
Windows:
bin\pulsar broker
-
Linux/Mac:
bin/pulsar broker
-
-
验证集群:
- 使用 Pulsar 提供的命令行工具或客户端 API 连接到集群中的任何节点,确保它正常运行并可访问。您可以使用任何节点的 IP 地址和端口(默认为 6650)连接到集群。
2、配置文件
2.1 配置文件结构
Pulsar 的配置文件通常采用 .conf
格式,是键值对的集合,用于指定 Pulsar 的各项参数。
以下是一个配置文件结构示例:
# 注释以 '#' 开头
key1=value1
key2=value2
# ...
[Section1]
key3=value3
key4=value4
# ...
[Section2]
key5=value5
key6=value6
# ...
- 注释行以
#
开头,用于注释说明。 - 键值对用于设置配置项的值。
- 方括号用于定义配置项的分组,称为部分(Section)。
2.2 常见配置项解析
1. Broker 配置 (broker.conf
)
-
advertisedAddress
:- 用于设置 Pulsar Broker 广告的地址,即客户端连接到 Broker 使用的地址。可以是 IP 地址或主机名。
-
brokerServicePort
:- 设置 Pulsar Broker 提供服务的端口号,客户端通过该端口连接到 Broker。
-
webServicePort
:- 设置 Pulsar Web 服务的端口号,用于提供管理和监控功能的 Web 界面。
-
zookeeperServers
:- 指定 ZooKeeper 服务器的地址,Pulsar 使用 ZooKeeper 来协调集群中的各个节点。
2. Standalone 模式配置 (standalone.conf
)
-
brokerServicePort
:- 同上,设置 Pulsar Standalone 模式下提供服务的端口号。
-
webServicePort
:- 同上,设置 Pulsar Standalone 模式下 Web 服务的端口号。
3. 其他常见配置项
-
zookeeperServers
:- 在集群中,此项也可以在 Broker 配置中用于指定 ZooKeeper 服务器的地址。
-
configurationStoreServers
:- 用于指定 Pulsar 配置信息的存储服务器地址,Pulsar 使用这些信息来共享配置。
-
managedLedgerDefaultEnsembleSize
和managedLedgerDefaultWriteQuorum
:- 配置 Managed Ledger 的默认副本数量和写入的最小数量。
-
bookkeeperMetadataServiceUri
:- 指定 BookKeeper Metadata Service 的 URI,用于存储 BookKeeper 的元数据。
2.3 补充说明
-
配置文件位置:
- 配置文件通常位于 Pulsar 安装目录下的
conf
子目录中。
- 配置文件通常位于 Pulsar 安装目录下的
-
文档参考:
- 建议查阅 Apache Pulsar 官方文档,其中包含了详细的配置项说明和最佳实践建议。
-
动态配置:
- 在运行时,您也可以通过 Pulsar 的命令行工具或 API 动态更改配置,而无需重新启动 Broker。