Librdkafka对Kafka Metadata的封装和操作
- 我们在之前的Kafka源码分析系列中介绍过kafka集群的metadata, 大家可以参考一下;
- 简单说, kafka集群的metadata包括:
- 所有broker的信息: ip和port;
- 所有topic的信息: topic name, partition数量, 每个partition的leader, isr, replica集合等
- kafka集群的每一台broker都缓存了整个集群的metadata, 当broker或某一个topic的metadata信息发生变化时, 集群的Controller 都会感知到作相应的状态转换, 同时把发生变化的新的metadata信息广播到所有的broker;
- 下面我们介绍一下librdkafka中对metadata的封装和操作,基本上就是metadata的获取,定时刷新以及引用的操作, 比如说partition leader的迁移, partition个数的变化, broker的上下线等等;
Metadata的获取
- 我们先来看一下metadata的定义, 在
rdkafka.h
中
- kafka集群整体metadata的定义, 包括broker, topic, partition三部分:
typedef struct rd_kafka_metadata { int broker_cnt; /**< Number of brokers in \p brokers */ struct rd_kafka_metadata_broker *brokers; /**< Brokers */ int topic_cnt; /**< Number of topics in \p topics */ struct rd_kafka_metadata_topic *topics; /**< Topics */ int32_t orig_broker_id; /**< Broker originating this metadata */ 表示这个metadata是从哪个broker返回的 char *orig_broker_name; /**< Name of originating broker */ } rd_kafka_metadata_t;
- broker的metadata:
typedef struct rd_kafka_metadata_broker { int32_t id; /**< Broker Id */ char *host; /**< Broker hostname */ int port; /**< Broker listening port */ } rd_kafka_metadata_broker_t;
- topic的metadata:
typedef struct rd_kafka_metadata_topic { char *topic; /**< Topic name */ int partition_cnt; /**< Number of partitions in \p partitions*/ struct rd_kafka_metadata_partition *partitions; /**< Partitions */ rd_kafka_resp_err_t err; /**< Topic error reported by broker */ } rd_kafka_metadata_topic_t;
- partition的metadata:
typedef struct rd_kafka_metadata_partition { int32_t id; /**< Partition Id */ rd_kafka_resp_err_t err; /**< Partition error reported by broker */ int32_t leader; /**< Leader broker */ int replica_cnt; /**< Number of brokers in \p replicas */ int32_t *replicas; /**< Replica brokers */ int isr_cnt; /**< Number of ISR brokers in \p isrs */ int32_t *isrs; /**< In-Sync-Replica brokers */ } rd_kafka_metadata_partition_t;
- metadata的获取, 阻塞操作
rd_kafka_metadata
:
rd_kafka_metadata (rd_kafka_t *rk, int all_topics, rd_kafka_topic_t *only_rkt, const struct rd_kafka_metadata **metadatap, int timeout_ms) { rd_kafka_q_t *rkq; rd_kafka_broker_t *rkb; rd_kafka_op_t *rko; rd_ts_t ts_end = rd_timeout_init(timeout_ms); rd_list_t topics; // 在timeout_ms时长的超时时间内选择一台有效的broker, rkb = rd_kafka_broker_any_usable(rk, timeout_ms, 1); if (!rkb) return RD_KAFKA_RESP_ERR__TRANSPORT; // 创建一个queue, 作为metadata的response返回完生成的op的replay队列 rkq = rd_kafka_q_new(rk); rd_list_init(&topics, 0, rd_free); if (!all_topics) { if (only_rkt) rd_list_add(&topics, rd_strdup(rd_kafka_topic_a2i(only_rkt)-> rkt_topic->str)); else rd_kafka_local_topics_to_list(rkb->rkb_rk, &topics); } // 创建op, 里面会关联metadata request的buffer rko = rd_kafka_op_new(RD_KAFKA_OP_METADATA); rd_kafka_op_set_replyq(rko, rkq, 0); rko->rko_u.metadata.force = 1; /* Force metadata request regardless * of outstanding metadata requests. */ // 构造metadata request的二进制协议并最终放到broker的发送buffer队列里 rd_kafka_MetadataRequest(rkb, &topics, "application requested", rko); rd_list_destroy(&topics); rd_kafka_broker_destroy(rkb); // 等待metadata response返回或超时, 如果正常返回,在放入这个replay queue之前, 返回的response二进制协议会被parse到` rd_kafka_metadata`对象中 rko = rd_kafka_q_pop(rkq, rd_timeout_remains(ts_end), 0); rd_kafka_q_destroy_owner(rkq); /* Timeout */ if (!rko) return RD_KAFKA_RESP_ERR__TIMED_OUT; /* Error */ if (rko->rko_err) { rd_kafka_resp_err_t err = rko->rko_err; rd_kafka_op_destroy(rko); return err; } /* Reply: pass metadata pointer to application who now owns it*/ rd_kafka_assert(rk, rko->rko_u.metadata.md); *metadatap = rko->rko_u.metadata.md; rko->rko_u.metadata.md = NULL; rd_kafka_op_destroy(rko); return RD_KAFKA_RESP_ERR_NO_ERROR; }
- metadata的拷贝操作
rd_kafka_metadata_copy (const struct rd_kafka_metadata *src, size_t size)
: 执行一个深度copy, 使用rd_tmpabuf_t,保证内存的对齐; - 处理metadata的response
rd_kafka_parse_Metadata
:
rd_kafka_parse_Metadata (rd_kafka_broker_t *rkb, rd_kafka_buf_t *request, rd_kafka_buf_t *rkbuf) { rd_kafka_t *rk = rkb->rkb_rk; int i, j, k; rd_tmpabuf_t tbuf; struct rd_kafka_metadata *md; size_t rkb_namelen; const int log_decode_errors = LOG_ERR; rd_list_t *missing_topics = NULL; const rd_list_t *requested_topics = request->rkbuf_u.Metadata.topics; int all_topics = request->rkbuf_u.Metadata.all_topics; const char *reason = request->rkbuf_u.Metadata.reason ? request->rkbuf_u.Metadata.reason : "(no reason)"; int ApiVersion = request->rkbuf_reqhdr.ApiVersion; rd_kafkap_str_t cluster_id = RD_ZERO_INIT; int32_t controller_id = -1; rd_kafka_assert(NULL, thrd_is_current(rk->rk_thread)); /* Remove topics from missing_topics as they are seen in Metadata. */ if (requested_topics) missing_topics = rd_list_copy(requested_topics, rd_list_string_copy, NULL); rd_kafka_broker_lock(rkb); rkb_namelen = strlen(rkb->rkb_name)+1; // 使用tmpbuf从parse的内存分配 rd_tmpabuf_new(&tbuf, sizeof(*md) + rkb_namelen + (rkbuf->rkbuf_totlen * 4), 0/*dont assert on fail*/); if (!(md = rd_tmpabuf_alloc(&tbuf, sizeof(*md)))) goto err; md->orig_broker_id = rkb->rkb_nodeid; md->orig_broker_name = rd_tmpabuf_write(&tbuf, rkb->rkb_name, rkb_namelen); rd_kafka_broker_unlock(rkb); // 解析并添充broker信息 . . . // 解析并添充topic和partition信息 . . . // 如果正在shutdown, 我们就不更新medata信息了 if (rd_kafka_terminating(rkb->rkb_rk)) goto done; // 如果解析出来的broker个数是0, 或者topic个数是0, 就准备retry吧 if (md->broker_cnt == 0 && md->topic_cnt == 0) { rd_rkb_dbg(rkb, METADATA, "METADATA", "No brokers or topics in metadata: retrying"); goto err; } // 更新broker 列表, 具体的更新操作, 我们在分析broker时再具体介绍 // 已存在的更新,没有的新添加, 最终会开启一个新的broker的io event loop for (i = 0 ; i < md->broker_cnt ; i++) { rd_kafka_broker_update(rkb->rkb_rk, rkb->rkb_proto, &md->brokers[i]); } /* Update partition count and leader for each topic we know about */ for (i = 0 ; i < md->topic_cnt ; i++) { rd_kafka_metadata_topic_t *mdt = &md->topics[i]; /* Ignore topics in blacklist */ if (rkb->rkb_rk->rk_conf.topic_blacklist && rd_kafka_pattern_match(rkb->rkb_rk->rk_conf.topic_blacklist, mdt->topic)) { rd_rkb_dbg(rkb, TOPIC, "BLACKLIST", "Ignoring blacklisted topic \"%s\" " "in metadata", mdt->topic); continue; } /* Ignore metadata completely for temporary errors. (issue #513) * LEADER_NOT_AVAILABLE: Broker is rebalancing */ if (mdt->err == RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE && mdt->partition_cnt == 0) { rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, mdt->topic, (void *)strcmp)); continue; } // 更新topic detadata, 会涉及到leader的涉及, 处理partition的新增和减少, 都是通过op作的异步操作 rd_kafka_topic_metadata_update2(rkb, mdt); if (requested_topics) { rd_list_free_cb(missing_topics, rd_list_remove_cmp(missing_topics, mdt->topic, (void*)strcmp)); if (!all_topics) { rd_kafka_wrlock(rk); rd_kafka_metadata_cache_topic_update(rk, mdt); rd_kafka_wrunlock(rk); } } } // 对没有获取到metadata的topic, 作清理操作 if (missing_topics) { char *topic; RD_LIST_FOREACH(topic, missing_topics, i) { shptr_rd_kafka_itopic_t *s_rkt; s_rkt = rd_kafka_topic_find(rkb->rkb_rk, topic, 1/*lock*/); if (s_rkt) { rd_kafka_topic_metadata_none( rd_kafka_topic_s2i(s_rkt)); rd_kafka_topic_destroy0(s_rkt); } } } rd_kafka_wrlock(rkb->rkb_rk); rkb->rkb_rk->rk_ts_metadata = rd_clock(); /* Update cached cluster id. */ if (RD_KAFKAP_STR_LEN(&cluster_id) > 0 && (!rkb->rkb_rk->rk_clusterid || rd_kafkap_str_cmp_str(&cluster_id, rkb->rkb_rk->rk_clusterid))) { if (rkb->rkb_rk->rk_clusterid) rd_free(rkb->rkb_rk->rk_clusterid); rkb->rkb_rk->rk_clusterid = RD_KAFKAP_STR_DUP(&cluster_id); } if (all_topics) { // 更新 metadata cache rd_kafka_metadata_cache_update(rkb->rkb_rk, md, 1/*abs update*/); if (rkb->rkb_rk->rk_full_metadata) rd_kafka_metadata_destroy(rkb->rkb_rk->rk_full_metadata); rkb->rkb_rk->rk_full_metadata = rd_kafka_metadata_copy(md, tbuf.of); rkb->rkb_rk->rk_ts_full_metadata = rkb->rkb_rk->rk_ts_metadata; rd_rkb_dbg(rkb, METADATA, "METADATA", "Caching full metadata with " "%d broker(s) and %d topic(s): %s", md->broker_cnt, md->topic_cnt, reason); } else { rd_kafka_metadata_cache_expiry_start(rk); } /* Remove cache hints for the originally requested topics. */ if (requested_topics) rd_kafka_metadata_cache_purge_hints(rk, requested_topics); rd_kafka_wrunlock(rkb->rkb_rk); /* Check if cgrp effective subscription is affected by * new metadata. */ if (rkb->rkb_rk->rk_cgrp) rd_kafka_cgrp_metadata_update_check( rkb->rkb_rk->rk_cgrp, 1/*do join*/); done: if (missing_topics) rd_list_destroy(missing_topics); return md; err_parse: err: if (requested_topics) { /* Failed requests shall purge cache hints for * the requested topics. */ rd_kafka_wrlock(rkb->rkb_rk); rd_kafka_metadata_cache_purge_hints(rk, requested_topics); rd_kafka_wrunlock(rkb->rkb_rk); } if (missing_topics) rd_list_destroy(missing_topics); rd_tmpabuf_destroy(&tbuf); return NULL; }
- 刷新给定的topic list的metadata
rd_kafka_metadata_refresh_topics
:
rd_kafka_metadata_refresh_topics (rd_kafka_t *rk, rd_kafka_broker_t *rkb, const rd_list_t *topics, int force, const char *reason) { rd_list_t q_topics; int destroy_rkb = 0; if (!rk) rk = rkb->rkb_rk; rd_kafka_wrlock(rk); // 确定一下发送metadata request的broker if (!rkb) { if (!(rkb = rd_kafka_broker_any_usable(rk, RD_POLL_NOWAIT, 0))){ rd_kafka_wrunlock(rk); rd_kafka_dbg(rk, METADATA, "METADATA", "Skipping metadata refresh of %d topic(s):" " no usable brokers", rd_list_cnt(topics)); return RD_KAFKA_RESP_ERR__TRANSPORT; } destroy_rkb = 1; } rd_list_init(&q_topics, rd_list_cnt(topics), rd_free); if (!force) { // 如查不是强制必须立刻刷新的话, 只是把这个topic对应在metadata cache中的状态改为RD_KAFKA_RESP_ERR__WAIT_CACHE, 设置新的刷新时间 rd_kafka_metadata_cache_hint(rk, topics, &q_topics, 0/*dont replace*/); rd_kafka_wrunlock(rk); if (rd_list_cnt(&q_topics) == 0) { rd_list_destroy(&q_topics); if (destroy_rkb) rd_kafka_broker_destroy(rkb); return RD_KAFKA_RESP_ERR_NO_ERROR; } } else { // 立即发送metadata request rd_kafka_wrunlock(rk); rd_list_copy_to(&q_topics, topics, rd_list_string_copy, NULL); } rd_kafka_MetadataRequest(rkb, &q_topics, reason, NULL); rd_list_destroy(&q_topics); if (destroy_rkb) rd_kafka_broker_destroy(rkb); return RD_KAFKA_RESP_ERR_NO_ERROR; }
- 只请求broker相关的metadata
rd_kafka_resp_err_t rd_kafka_metadata_refresh_brokers (rd_kafka_t *rk, rd_kafka_broker_t *rkb, const char *reason) { return rd_kafka_metadata_request(rk, rkb, NULL /*brokers only*/, reason, NULL); }
- 一个快速地刷新partition leader的操作
rd_kafka_metadata_fast_leader_query
这个rkmc_query_tmr
是专门真对没有leader的topic的定期刷新
void rd_kafka_metadata_fast_leader_query (rd_kafka_t *rk) { rd_ts_t next; /* Restart the timer if it will speed things up. */ next = rd_kafka_timer_next(&rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, 1/*lock*/); if (next == -1 /* not started */ || next > rk->rk_conf.metadata_refresh_fast_interval_ms*1000) { //开始一个fast ledader query定时期, 过期执行 rd_kafka_metadata_leader_query_tmr_cb rd_kafka_timer_start(&rk->rk_timers, &rk->rk_metadata_cache.rkmc_query_tmr, rk->rk_conf. metadata_refresh_fast_interval_ms*1000, rd_kafka_metadata_leader_query_tmr_cb, NULL); } }
- leader定时刷新处理回调函数
rd_kafka_metadata_leader_query_tmr_cb
:
static void rd_kafka_metadata_leader_query_tmr_cb (rd_kafka_timers_t *rkts, void *arg) { rd_kafka_t *rk = rkts->rkts_rk; rd_kafka_timer_t *rtmr = &rk->rk_metadata_cache.rkmc_query_tmr; rd_kafka_itopic_t *rkt; rd_list_t topics; rd_kafka_wrlock(rk); rd_list_init(&topics, rk->rk_topic_cnt, rd_free); TAILQ_FOREACH(rkt, &rk->rk_topics, rkt_link) { int i, no_leader = 0; rd_kafka_topic_rdlock(rkt); // 跳过不存在的topic的处理 if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS) { /* Skip topics that are known to not exist. */ rd_kafka_topic_rdunlock(rkt); continue; } no_leader = rkt->rkt_flags & RD_KAFKA_TOPIC_F_LEADER_UNAVAIL; // 如果某个topic中有一个partition没有leader就是个没有leader的topic for (i = 0 ; !no_leader && i < rkt->rkt_partition_cnt ; i++) { rd_kafka_toppar_t *rktp = rd_kafka_toppar_s2i(rkt->rkt_p[i]); rd_kafka_toppar_lock(rktp); no_leader = !rktp->rktp_leader && !rktp->rktp_next_leader; rd_kafka_toppar_unlock(rktp); } if (no_leader || rkt->rkt_partition_cnt == 0) rd_list_add(&topics, rd_strdup(rkt->rkt_topic->str)); rd_kafka_topic_rdunlock(rkt); } rd_kafka_wrunlock(rk); if (rd_list_cnt(&topics) == 0) { /* No leader-less topics+partitions, stop the timer. */ rd_kafka_timer_stop(rkts, rtmr, 1/*lock*/); } else { // 针对选择好的topic发送metadata request rd_kafka_metadata_refresh_topics(rk, NULL, &topics, 1/*force*/, "partition leader query"); /* Back off next query exponentially until we reach * the standard query interval - then stop the timer * since the intervalled querier will do the job for us. */ if (rk->rk_conf.metadata_refresh_interval_ms > 0 && rtmr->rtmr_interval * 2 / 1000 >= rk->rk_conf.metadata_refresh_interval_ms) rd_kafka_timer_stop(rkts, rtmr, 1/*lock*/); else rd_kafka_timer_backoff(rkts, rtmr, (int)rtmr->rtmr_interval); } rd_list_destroy(&topics); }
Metadata在内存中的cache及其操作
- cache相关结构体
- cache entry:
struct rd_kafka_metadata_cache_entry { rd_avl_node_t rkmce_avlnode; /* rkmc_avl */ TAILQ_ENTRY(rd_kafka_metadata_cache_entry) rkmce_link; /* rkmc_expiry */ rd_ts_t rkmce_ts_expires; /* Expire time */ 这个entry对应的metadata下次刷新的时间 rd_ts_t rkmce_ts_insert; /* Insert time */ rd_kafka_metadata_topic_t rkmce_mtopic; /* Cached topic metadata */ 对应的metadata信息 /* rkmce_partitions memory points here. */ };
- cache结构体定义:
struct rd_kafka_metadata_cache { rd_avl_t rkmc_avl; // 使用红黑树来存储每个entry, key为topic name, 加速查找 TAILQ_HEAD(, rd_kafka_metadata_cache_entry) rkmc_expiry; // 使用tailq来存储所有被cached的entry, 过期时间早的会被排在 tailq的前面 rd_kafka_timer_t rkmc_expiry_tmr; int rkmc_cnt; /* Protected by full_lock: */ // 针对所有topic或broker的metadata的请求在repsonse没有返回之前最多只能发送一个 mtx_t rkmc_full_lock; int rkmc_full_topics_sent; /* Full MetadataRequest for * all topics has been sent, * awaiting response. */ int rkmc_full_brokers_sent; /* Full MetadataRequest for * all brokers (but not topics) * has been sent, * awaiting response. */ // 用于没有leader的topic的定时metadata请求 rd_kafka_timer_t rkmc_query_tmr; /* Query timer for topic's without * leaders. */ cnd_t rkmc_cnd; /* cache_wait_change() cond. */ mtx_t rkmc_cnd_lock; /* lock for rkmc_cnd */ };
- cache的初始化
rd_kafka_metadata_cache_init
:
void rd_kafka_metadata_cache_init (rd_kafka_t *rk) { // 初始化红黑树 rd_avl_init(&rk->rk_metadata_cache.rkmc_avl, rd_kafka_metadata_cache_entry_cmp, 0); // 初初化tailq TAILQ_INIT(&rk->rk_metadata_cache.rkmc_expiry); mtx_init(&rk->rk_metadata_cache.rkmc_full_lock, mtx_plain); mtx_init(&rk->rk_metadata_cache.rkmc_cnd_lock, mtx_plain); cnd_init(&rk->rk_metadata_cache.rkmc_cnd); }
- cache删除操作
rd_kafka_metadata_cache_delete
rd_kafka_metadata_cache_delete (rd_kafka_t *rk, struct rd_kafka_metadata_cache_entry *rkmce, int unlink_avl) { // 需要从红黑树上摘掉的话就摘掉 if (unlink_avl) RD_AVL_REMOVE_ELM(&rk->rk_metadata_cache.rkmc_avl, rkmce); TAILQ_REMOVE(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link); rd_kafka_assert(NULL, rk->rk_metadata_cache.rkmc_cnt > 0); rk->rk_metadata_cache.rkmc_cnt--; rd_free(rkmce); }
- cache的查找操作
rd_kafka_metadata_cache_find
: 从红黑树上查找metadata
struct rd_kafka_metadata_cache_entry * rd_kafka_metadata_cache_find (rd_kafka_t *rk, const char *topic, int valid) { struct rd_kafka_metadata_cache_entry skel, *rkmce; skel.rkmce_mtopic.topic = (char *)topic; rkmce = RD_AVL_FIND(&rk->rk_metadata_cache.rkmc_avl, &skel); if (rkmce && (!valid || RD_KAFKA_METADATA_CACHE_VALID(rkmce))) return rkmce; return NULL; }
- cache的插入操作
rd_kafka_metadata_cache_insert
static struct rd_kafka_metadata_cache_entry * rd_kafka_metadata_cache_insert (rd_kafka_t *rk, const rd_kafka_metadata_topic_t *mtopic, rd_ts_t now, rd_ts_t ts_expires) { struct rd_kafka_metadata_cache_entry *rkmce, *old; size_t topic_len; rd_tmpabuf_t tbuf; int i; // 叙事我用tmpbuffer来得新构造一个metadata entry topic_len = strlen(mtopic->topic) + 1; rd_tmpabuf_new(&tbuf, RD_ROUNDUP(sizeof(*rkmce), 8) + RD_ROUNDUP(topic_len, 8) + (mtopic->partition_cnt * RD_ROUNDUP(sizeof(*mtopic->partitions), 8)), 1/*assert on fail*/); rkmce = rd_tmpabuf_alloc(&tbuf, sizeof(*rkmce)); rkmce->rkmce_mtopic = *mtopic; /* Copy topic name and update pointer */ rkmce->rkmce_mtopic.topic = rd_tmpabuf_write_str(&tbuf, mtopic->topic); /* Copy partition array and update pointer */ rkmce->rkmce_mtopic.partitions = rd_tmpabuf_write(&tbuf, mtopic->partitions, mtopic->partition_cnt * sizeof(*mtopic->partitions)); /* Clear uncached fields. */ for (i = 0 ; i < mtopic->partition_cnt ; i++) { rkmce->rkmce_mtopic.partitions[i].replicas = NULL; rkmce->rkmce_mtopic.partitions[i].replica_cnt = 0; rkmce->rkmce_mtopic.partitions[i].isrs = NULL; rkmce->rkmce_mtopic.partitions[i].isr_cnt = 0; } /* Sort partitions for future bsearch() lookups. */ qsort(rkmce->rkmce_mtopic.partitions, rkmce->rkmce_mtopic.partition_cnt, sizeof(*rkmce->rkmce_mtopic.partitions), rd_kafka_metadata_partition_id_cmp); // 插到缓存tailq的队尾 TAILQ_INSERT_TAIL(&rk->rk_metadata_cache.rkmc_expiry, rkmce, rkmce_link); rk->rk_metadata_cache.rkmc_cnt++; rkmce->rkmce_ts_expires = ts_expires; rkmce->rkmce_ts_insert = now; /* Insert (and replace existing) entry. */ // 插入到红黑树中, 如果已存在,就替换到原有的 old = RD_AVL_INSERT(&rk->rk_metadata_cache.rkmc_avl, rkmce, rkmce_avlnode); if (old) rd_kafka_metadata_cache_delete(rk, old, 0); /* Explicitly not freeing the tmpabuf since rkmce points to its * memory. */ return rkmce; }
- 清空所有的cache
static void rd_kafka_metadata_cache_purge (rd_kafka_t *rk) { struct rd_kafka_metadata_cache_entry *rkmce; int was_empty = TAILQ_EMPTY(&rk->rk_metadata_cache.rkmc_expiry); // 清除每一个entry while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry))) rd_kafka_metadata_cache_delete(rk, rkmce, 1); // 停掉过期刷新的timter rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_metadata_cache.rkmc_expiry_tmr, 1); // brordcast 状态 if (!was_empty) rd_kafka_metadata_cache_propagate_changes(rk); }
- cache的更新
rd_kafka_metadata_cache_topic_update
rd_kafka_metadata_cache_topic_update (rd_kafka_t *rk, const rd_kafka_metadata_topic_t *mdt) { rd_ts_t now = rd_clock(); rd_ts_t ts_expires = now + (rk->rk_conf.metadata_max_age_ms * 1000); int changed = 1; if (!mdt->err) // 获取的metadata没有错误,就insert rd_kafka_metadata_cache_insert(rk, mdt, now, ts_expires); else // 获取的metadata有错误, 有删除 changed = rd_kafka_metadata_cache_delete_by_name(rk, mdt->topic); // 插入了新的,或者成功删除了已有的, 都表明cache有改变, 需要broadcast状态 if (changed) rd_kafka_metadata_cache_propagate_changes(rk); }
- 开始或更新过期刷新metadata的timer 如果调用时这个timer已经开始就更新下它的expire时间
void rd_kafka_metadata_cache_expiry_start (rd_kafka_t *rk) { struct rd_kafka_metadata_cache_entry *rkmce; if ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry))) rd_kafka_timer_start(&rk->rk_timers, &rk->rk_metadata_cache.rkmc_expiry_tmr, rkmce->rkmce_ts_expires - rd_clock(), rd_kafka_metadata_cache_evict_tmr_cb, rk); }
- 过期刷新时的回调函数
static int rd_kafka_metadata_cache_evict (rd_kafka_t *rk) { int cnt = 0; rd_ts_t now = rd_clock(); struct rd_kafka_metadata_cache_entry *rkmce; //过期时间早的会被排在 tailq的前面, 删除所有过期的cache while ((rkmce = TAILQ_FIRST(&rk->rk_metadata_cache.rkmc_expiry)) && rkmce->rkmce_ts_expires <= now) { rd_kafka_metadata_cache_delete(rk, rkmce, 1); cnt++; } if (rkmce) // 删除所有过期的cacher后,若队列不为空, 则开始新的timer rd_kafka_timer_start(&rk->rk_timers, &rk->rk_metadata_cache.rkmc_expiry_tmr, rkmce->rkmce_ts_expires - now, rd_kafka_metadata_cache_evict_tmr_cb, rk); else // // 删除所有过期的cacher后,若队列为空, 则stop timer rd_kafka_timer_stop(&rk->rk_timers, &rk->rk_metadata_cache.rkmc_expiry_tmr, 1); if (cnt) rd_kafka_metadata_cache_propagate_changes(rk); return cnt; }
- 根据topic名字获取对应的topic metadata
const rd_kafka_metadata_topic_t * rd_kafka_metadata_cache_topic_get (rd_kafka_t *rk, const char *topic, int valid) { struct rd_kafka_metadata_cache_entry *rkmce; if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, valid))) return NULL; return &rkmce->rkmce_mtopic; }
- 根据topic-parition获取对应的partition metadata
int rd_kafka_metadata_cache_topic_partition_get ( rd_kafka_t *rk, const rd_kafka_metadata_topic_t **mtopicp, const rd_kafka_metadata_partition_t **mpartp, const char *topic, int32_t partition, int valid) { const rd_kafka_metadata_topic_t *mtopic; const rd_kafka_metadata_partition_t *mpart; rd_kafka_metadata_partition_t skel = { .id = partition }; *mtopicp = NULL; *mpartp = NULL; if (!(mtopic = rd_kafka_metadata_cache_topic_get(rk, topic, valid))) return -1; *mtopicp = mtopic; /* Partitions array may be sparse so use bsearch lookup. */ // 二分法查找 mpart = bsearch(&skel, mtopic->partitions, mtopic->partition_cnt, sizeof(*mtopic->partitions), rd_kafka_metadata_partition_id_cmp); if (!mpart) return 0; *mpartp = mpart; return 1; }
- 广播cache内容的变化
static void rd_kafka_metadata_cache_propagate_changes (rd_kafka_t *rk) { mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock); cnd_broadcast(&rk->rk_metadata_cache.rkmc_cnd); mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock); } * 等待cache内容的变化
int rd_kafka_metadata_cache_wait_change (rd_kafka_t *rk, int timeout_ms) { int r;
mtx_lock(&rk->rk_metadata_cache.rkmc_cnd_lock); r = cnd_timedwait_ms(&rk->rk_metadata_cache.rkmc_cnd, &rk->rk_metadata_cache.rkmc_cnd_lock, timeout_ms); mtx_unlock(&rk->rk_metadata_cache.rkmc_cnd_lock); return r == thrd_success;
}
* 清除给定的topic列表里对应的cache,在红黑树中找不到的,或者找到了但状态是RD_KAFKA_RESP_ERR__WAIT_CACHE的都不需要删除
void rd_kafka_metadata_cache_purge_hints (rd_kafka_t *rk, const rd_list_t *topics) { const char *topic; int i; int cnt = 0;
RD_LIST_FOREACH(topic, topics, i) { struct rd_kafka_metadata_cache_entry *rkmce; // 在红黑树中找不到的,或者找到了但状态是RD_KAFKA_RESP_ERR__WAIT_CACHE的都不需要删除 if (!(rkmce = rd_kafka_metadata_cache_find(rk, topic, 0/*any*/)) || RD_KAFKA_METADATA_CACHE_VALID(rkmce)) continue; rd_kafka_metadata_cache_delete(rk, rkmce, 1/*unlink avl*/); cnt++; } if (cnt > 0) { rd_kafka_metadata_cache_propagate_changes(rk); }
}
--- ### [Librdkafka源码分析-Content Table](https://www.jianshu.com/p/1a94bb09a6e6)