DPDK系列之二十六缓冲Cache的管理

一、Cache的用处

其实一直不想分析这个问题,主要是这个问题太多了。即使不学DPDK,计算机的原理和操作系统,内存型框架等等中都回避不了这个问题,包括多线程的伪共享也提到了这个问题。可以说这个问题是绕不开的,老生常谈谈得都糊了。
所以这里重点不谈Cache这个原理,书和网上都多得看不过来了。这里重点分析一下在DPDK是怎么样使用Cache的,也就是说DPDK的Cache有什么用处?
1、减少对内存锁的并发的冲突,目的也是为提高读写速度
2、提高读写速度

二、DPDK中的Cache处理

1、对Cache的支持
大页内存:命中率提高
DIDO:直接和硬件缓冲打交道,略过内存
TLB:TLB配合大页,仍然是提高Cache命中率的方法

2、预取指令
一般到Cache这个级别,都是硬件,最多OS操作处理一下,对上层一般是不开放的。但随着技术的发展,软件开发者也可以操作预取指令,同时底层也开放了这些软件预取的指令。DPDK就可以利用这个技术来处理Cache的数据加载,提高执行效率。但需要注意的是,如果自己也软件也要这样做,就得考虑好策略,别到时候儿画虎不成反类其犬。
预取指令一般是汇编命令,但有些程序也提供了封装的上层API库。
在DPDK中,为了和数据处理保持时钟周期的匹配,即达到最大效率,就必须保证数据都可以在Cache中,否则性能会严重下降。而这个预取指令就是为了配合这个命中采用的一种手段。当然,在DPDK中还有其它的手段同样也可以达到这个目的,结果只有一个,让处理和数据读取保持一致。

3、DPDK中预取一致性处理
现代计算机基本都是多核或者多CPU的,DPDK如何处理当不同的核去访问同一Cache的冲突呢。也就是说,如何保证数据的一致性呢?解决的方法很简单粗暴,直接每个核给单独一个Cache。这样读写只操作自己的数据队列,就不会有冲突的问题。但是,这也带来了数据的最终一致性问题,这个就需要设计来搞定了。尽量避免冲突,如果非要有,那么只好加锁或者用一些协议来解决。
另外为了保持冲突最小化Cache Line(Cache最小单元)直接在分配时对齐。这算是另外一个手段。换句话说,尽量保证能在一个Cache Line的不让他们分成两个。
那么一致性协议有几种呢:
目录协议( Directory-based protocol ) 和总线窥探协议 ( Bus snoopingprotocol),这里不展开,有兴趣可以去查查资料。

三、源码分析

下面看一下Cache的相关源码:

//Ring
struct rte_ring {
	/*
	 * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
	 * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
	 * next time the ABI changes
	 */
	char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
	int flags;               /**< Flags supplied at creation. */
	const struct rte_memzone *memzone;
			/**< Memzone, if any, containing the rte_ring */
	uint32_t size;           /**< Size of ring. */
	uint32_t mask;           /**< Mask (size-1) of ring. */
	uint32_t capacity;       /**< Usable size of ring */

	char pad0 __rte_cache_aligned; /**< empty cache line */

	/** Ring producer status. */
	struct rte_ring_headtail prod __rte_cache_aligned;
	char pad1 __rte_cache_aligned; /**< empty cache line */

	/** Ring consumer status. */
	struct rte_ring_headtail cons __rte_cache_aligned;
	char pad2 __rte_cache_aligned; /**< empty cache line */
};

// librte_eal/common/include/rte_common.h
/** Force alignment to cache line. */
#define __rte_cache_aligned __rte_aligned(RTE_CACHE_LINE_SIZE)
#define __rte_aligned(a) __attribute__((__aligned__(a)))  

在基础的数据结构中经常可以看到__rte_cache_aligned这个宏,它其实就是对Cache Line的一种处理对齐方式。
再看一下为每个核的配置数据结构定义:

struct lcore_conf {
	uint16_t nb_rx_queue;
	struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
	uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
	struct buffer tx_mbufs[RTE_MAX_ETHPORTS];
	struct ipsec_ctx inbound;
	struct ipsec_ctx outbound;
	struct rt_ctx *rt4_ctx;
	struct rt_ctx *rt6_ctx;
	struct {
		struct rte_ip_frag_tbl *tbl;
		struct rte_mempool *pool_dir;
		struct rte_mempool *pool_indir;
		struct rte_ip_frag_death_row dr;
	} frag;
} __rte_cache_aligned;//总是行对齐,防止跨Cache Line
static struct lcore_conf lcore_conf[RTE_MAX_LCORE];

RTE_MAX_LCORE是当前的最大核心数量,通过编号来控制对核心的访问,避免出现多个核心访问同一个数据结构的问题。同样,现代的网卡一般都支持多队列网卡,DPDK为其准备多个读写队列来应对这些问题,如果从前面的安装过来的读者可能会想起在安装时的配置。
看一下收的预取:

uint16_t
ixgbe_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
		uint16_t nb_pkts)
{
	struct ixgbe_rx_queue *rxq;
	volatile union ixgbe_adv_rx_desc *rx_ring;
	volatile union ixgbe_adv_rx_desc *rxdp;
	struct ixgbe_rx_entry *sw_ring;
	struct ixgbe_rx_entry *rxe;
	struct rte_mbuf *rxm;
	struct rte_mbuf *nmb;
	union ixgbe_adv_rx_desc rxd;
	uint64_t dma_addr;
	uint32_t staterr;
	uint32_t pkt_info;
	uint16_t pkt_len;
	uint16_t rx_id;
	uint16_t nb_rx;
	uint16_t nb_hold;
	uint64_t pkt_flags;
	uint64_t vlan_flags;

	nb_rx = 0;
	nb_hold = 0;
	rxq = rx_queue;
	rx_id = rxq->rx_tail;
	rx_ring = rxq->rx_ring;
	sw_ring = rxq->sw_ring;
	vlan_flags = rxq->vlan_flags;
	while (nb_rx < nb_pkts) {
		/*
		 * The order of operations here is important as the DD status
		 * bit must not be read after any other descriptor fields.
		 * rx_ring and rxdp are pointing to volatile data so the order
		 * of accesses cannot be reordered by the compiler. If they were
		 * not volatile, they could be reordered which could lead to
		 * using invalid descriptor fields when read from rxd.
		 */
		rxdp = &rx_ring[rx_id];
		staterr = rxdp->wb.upper.status_error;
		if (!(staterr & rte_cpu_to_le_32(IXGBE_RXDADV_STAT_DD)))
			break;
		rxd = *rxdp;

		/*
		 * End of packet.
		 *
		 * If the IXGBE_RXDADV_STAT_EOP flag is not set, the RX packet
		 * is likely to be invalid and to be dropped by the various
		 * validation checks performed by the network stack.
		 *
		 * Allocate a new mbuf to replenish the RX ring descriptor.
		 * If the allocation fails:
		 *    - arrange for that RX descriptor to be the first one
		 *      being parsed the next time the receive function is
		 *      invoked [on the same queue].
		 *
		 *    - Stop parsing the RX ring and return immediately.
		 *
		 * This policy do not drop the packet received in the RX
		 * descriptor for which the allocation of a new mbuf failed.
		 * Thus, it allows that packet to be later retrieved if
		 * mbuf have been freed in the mean time.
		 * As a side effect, holding RX descriptors instead of
		 * systematically giving them back to the NIC may lead to
		 * RX ring exhaustion situations.
		 * However, the NIC can gracefully prevent such situations
		 * to happen by sending specific "back-pressure" flow control
		 * frames to its peer(s).
		 */
		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_id=%u "
			   "ext_err_stat=0x%08x pkt_len=%u",
			   (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
			   (unsigned) rx_id, (unsigned) staterr,
			   (unsigned) rte_le_to_cpu_16(rxd.wb.upper.length));

		nmb = rte_mbuf_raw_alloc(rxq->mb_pool);
		if (nmb == NULL) {
			PMD_RX_LOG(DEBUG, "RX mbuf alloc failed port_id=%u "
				   "queue_id=%u", (unsigned) rxq->port_id,
				   (unsigned) rxq->queue_id);
			rte_eth_devices[rxq->port_id].data->rx_mbuf_alloc_failed++;
			break;
		}

		nb_hold++;
		rxe = &sw_ring[rx_id];
		rx_id++;
		if (rx_id == rxq->nb_rx_desc)
			rx_id = 0;

		/* Prefetch next mbuf while processing current one. */
		rte_ixgbe_prefetch(sw_ring[rx_id].mbuf);

		/*
		 * When next RX descriptor is on a cache-line boundary,
		 * prefetch the next 4 RX descriptors and the next 8 pointers
		 * to mbufs.
		 */
		if ((rx_id & 0x3) == 0) {
			rte_ixgbe_prefetch(&rx_ring[rx_id]);
			rte_ixgbe_prefetch(&sw_ring[rx_id]);
		}

		rxm = rxe->mbuf;
		rxe->mbuf = nmb;
		dma_addr =
			rte_cpu_to_le_64(rte_mbuf_data_iova_default(nmb));
		rxdp->read.hdr_addr = 0;
		rxdp->read.pkt_addr = dma_addr;

		/*
		 * Initialize the returned mbuf.
		 * 1) setup generic mbuf fields:
		 *    - number of segments,
		 *    - next segment,
		 *    - packet length,
		 *    - RX port identifier.
		 * 2) integrate hardware offload data, if any:
		 *    - RSS flag & hash,
		 *    - IP checksum flag,
		 *    - VLAN TCI, if any,
		 *    - error flags.
		 */
		pkt_len = (uint16_t) (rte_le_to_cpu_16(rxd.wb.upper.length) -
				      rxq->crc_len);
		rxm->data_off = RTE_PKTMBUF_HEADROOM;
		rte_packet_prefetch((char *)rxm->buf_addr + rxm->data_off);
		rxm->nb_segs = 1;
		rxm->next = NULL;
		rxm->pkt_len = pkt_len;
		rxm->data_len = pkt_len;
		rxm->port = rxq->port_id;

		pkt_info = rte_le_to_cpu_32(rxd.wb.lower.lo_dword.data);
		/* Only valid if PKT_RX_VLAN set in pkt_flags */
		rxm->vlan_tci = rte_le_to_cpu_16(rxd.wb.upper.vlan);

		pkt_flags = rx_desc_status_to_pkt_flags(staterr, vlan_flags);
		pkt_flags = pkt_flags |
			rx_desc_error_to_pkt_flags(staterr, (uint16_t)pkt_info,
						   rxq->rx_udp_csum_zero_err);
		pkt_flags = pkt_flags |
			ixgbe_rxd_pkt_info_to_pkt_flags((uint16_t)pkt_info);
		rxm->ol_flags = pkt_flags;
		rxm->packet_type =
			ixgbe_rxd_pkt_info_to_pkt_type(pkt_info,
						       rxq->pkt_type_mask);

		if (likely(pkt_flags & PKT_RX_RSS_HASH))
			rxm->hash.rss = rte_le_to_cpu_32(
						rxd.wb.lower.hi_dword.rss);
		else if (pkt_flags & PKT_RX_FDIR) {
			rxm->hash.fdir.hash = rte_le_to_cpu_16(
					rxd.wb.lower.hi_dword.csum_ip.csum) &
					IXGBE_ATR_HASH_MASK;
			rxm->hash.fdir.id = rte_le_to_cpu_16(
					rxd.wb.lower.hi_dword.csum_ip.ip_id);
		}
		/*
		 * Store the mbuf address into the next entry of the array
		 * of returned packets.
		 */
		rx_pkts[nb_rx++] = rxm;
	}
	rxq->rx_tail = rx_id;

	/*
	 * If the number of free RX descriptors is greater than the RX free
	 * threshold of the queue, advance the Receive Descriptor Tail (RDT)
	 * register.
	 * Update the RDT with the value of the last processed RX descriptor
	 * minus 1, to guarantee that the RDT register is never equal to the
	 * RDH register, which creates a "full" ring situtation from the
	 * hardware point of view...
	 */
	nb_hold = (uint16_t) (nb_hold + rxq->nb_rx_hold);
	if (nb_hold > rxq->rx_free_thresh) {
		PMD_RX_LOG(DEBUG, "port_id=%u queue_id=%u rx_tail=%u "
			   "nb_hold=%u nb_rx=%u",
			   (unsigned) rxq->port_id, (unsigned) rxq->queue_id,
			   (unsigned) rx_id, (unsigned) nb_hold,
			   (unsigned) nb_rx);
		rx_id = (uint16_t) ((rx_id == 0) ?
				     (rxq->nb_rx_desc - 1) : (rx_id - 1));
		IXGBE_PCI_REG_WRITE(rxq->rdt_reg_addr, rx_id);
		nb_hold = 0;
	}
	rxq->nb_rx_hold = nb_hold;
	return nb_rx;
}

上面的预取写得很清楚,看一下定义,源码中有三类平台的,这里只看X86的:

static inline void rte_prefetch0(const volatile void *p)
{
    
    
	asm volatile ("prefetcht0 %[p]" : : [p] "m" (*(const volatile char *)p));
}

static inline void rte_prefetch1(const volatile void *p)
{
    
    
	asm volatile ("prefetcht1 %[p]" : : [p] "m" (*(const volatile char *)p));
}

static inline void rte_prefetch2(const volatile void *p)
{
    
    
	asm volatile ("prefetcht2 %[p]" : : [p] "m" (*(const volatile char *)p));
}

static inline void rte_prefetch_non_temporal(const volatile void *p)
{
    
    
	asm volatile ("prefetchnta %[p]" : : [p] "m" (*(const volatile char *)p));
}

其它预取的可以搜索rte_packet_prefetch,都OK了。这里不再赘述。

四、总结

其实从上面分析来看,不管采用何种手段,目的只有一个,流水线作业要尽量保证流水不停不乱。这样,就可以高效率的生产和处理数据。这种数据处理型的软件框架,最重视的就是这些,只要数据保持了流水按照意图前进,就达到了设计目的。
毕竟,只要有了外在的干预,这个干预时间对CPU来说就是一个超长的时间周期,那么效率已经就不再乎了。而在没有干预的情况下,就必须保证数据最大的流水。比如下载网络数据,在线观看视频,在线视频会议等等。
其实这也说明了一点,软件设计重点看应用场景,包括一些框架,比如Redis这种,它也清晰的定位了自己在应用中的场景,所以才如此之火。那么,我们从中可以学习到什么呢?不言而喻吧。

猜你喜欢

转载自blog.csdn.net/fpcc/article/details/132004583