设备接口层之数据包发送

这篇笔记记录了设备接口层发送数据包的过程。这里不会单独列举发送过程中使用到的一些数据结构,而是直接跟踪代码,因为发送过程中使用到的很多数据结构在接收部分的描述中已经介绍过了,这里可以对比参考:设备接口层之数据包接收.

1. 设备接口层发送接口

仔细看下dev_queue_xmit()的注释很有必要。

/**
 *	dev_queue_xmit - transmit a buffer
 *	@skb: buffer to transmit
 *
 *	Queue a buffer for transmission to a network device. The caller must
 *	have set the device and priority and built the buffer before calling
 *	this function. The function can be called from an interrupt.
 *
 *	A negative errno code is returned on a failure. A success does not
 *	guarantee the frame will be transmitted as it may be dropped due
 *	to congestion or traffic shaping.
 *
 * -----------------------------------------------------------------------------------
 *      I notice this method can also return errors from the queue disciplines,
 *      including NET_XMIT_DROP, which is a positive value.  So, errors can also
 *      be positive.
 *
 *      Regardless of the return value, the skb is consumed, so it is currently
 *      difficult to retry a send to this method.  (You can bump the ref count
 *      before sending to hold a reference for retry if you are careful.)
 *
 *      When calling this method, interrupts MUST be enabled.  This is because
 *      the BH enable code must have IRQs enabled so that it will not deadlock.
 *          --BLG
 */
//经过层三路由,该数据包已经确定要从该设备发送出去,所以唯一的入参就是要发送的数据包
int dev_queue_xmit(struct sk_buff *skb)
{
	//dev指向要发送该数据包的设备
	struct net_device *dev = skb->dev;
	struct Qdisc *q;
	int rc = -ENOMEM;

	/* GSO will handle the following emulations directly. */
	//如果网络设备本身可以处理GSO数据包,那么后面可能的线性化、校验和计算都可以由硬件执行
	if (netif_needs_gso(dev, skb))
		goto gso;

	//数据包有分片但是硬件不支持自动处理分片,那么需要软件将数据包线性化(即将所有数据放
	//到一个地址连续的缓存区中)
	if (skb_shinfo(skb)->frag_list &&
	    !(dev->features & NETIF_F_FRAGLIST) &&
	    __skb_linearize(skb))
		goto out_kfree_skb;

	/* Fragmented skb is linearized if device does not support SG,
	 * or if at least one of fragments is in highmem and device
	 * does not support DMA from it.
	 */
	//硬件不支持SG,或者不支持非连续的DMA(不懂),同样要软件进行线性化处理
	if (skb_shinfo(skb)->nr_frags &&
	    (!(dev->features & NETIF_F_SG) || illegal_highdma(dev, skb)) &&
	    __skb_linearize(skb))
		goto out_kfree_skb;

	/* If packet is not checksummed and device does not support
	 * checksumming for this protocol, complete checksumming here.
	 */
	//高层协议没有进行完整的校验和计算,那么需要根据硬件是否支持自动校验,调用
	//skb_checksum_help()进行软件校验
	if (skb->ip_summed == CHECKSUM_PARTIAL) {
		skb_set_transport_header(skb, skb->csum_start -
					      skb_headroom(skb));

		if (!(dev->features & NETIF_F_GEN_CSUM) &&
		    !((dev->features & NETIF_F_IP_CSUM) &&
		      skb->protocol == htons(ETH_P_IP)) &&
		    !((dev->features & NETIF_F_IPV6_CSUM) &&
		      skb->protocol == htons(ETH_P_IPV6)))
			if (skb_checksum_help(skb))
				goto out_kfree_skb;
	}

gso:
	spin_lock_prefetch(&dev->queue_lock);
	/* Disable soft irqs for various locks below. Also
	 * stops preemption for RCU.
	 */
	rcu_read_lock_bh();

	/* Updates of qdisc are serialized by queue_lock.
	 * The struct Qdisc which is pointed to by qdisc is now a
	 * rcu structure - it may be accessed without acquiring
	 * a lock (but the structure may be stale.) The freeing of the
	 * qdisc will be deferred until it's known that there are no
	 * more references to it.
	 *
	 * If the qdisc has an enqueue function, we still need to
	 * hold the queue_lock before calling it, since queue_lock
	 * also serializes access to the device queue.
	 */
	//获取到Qdisc
	q = rcu_dereference(dev->qdisc);
#ifdef CONFIG_NET_CLS_ACT
	skb->tc_verd = SET_TC_AT(skb->tc_verd,AT_EGRESS);
#endif
	//如果设备定义了输出排队规则,说明支持Qos,需要使用流量控制机制进行数据包发送
	//实际上,从目前的代码实现上看,排队规则是一定会设置的,层二框架总是会为设备指定一个默认的排队规则
	if (q->enqueue) {
		//持有Qdisc的保护锁
		spin_lock(&dev->queue_lock);
		q = dev->qdisc;
		if (q->enqueue) {
			/* reset queue_mapping to zero */
			//这步忽略
			skb_set_queue_mapping(skb, 0);
			//把该数据包入队列,具体何时发送由流量控制机制决定
			rc = q->enqueue(skb, q);
			//调用流控机制发送。可以这么理解这次发送,这里仅仅是一次尝试,能成功固然是好,如果
			//不满足发送条件(流控限速、队列状态不对等等),也没有关系,还会由发送软中断发送
			qdisc_run(dev);
			spin_unlock(&dev->queue_lock);
			//根据入队列的结果返回是否发送成功。再次强调,入队列只是代表网络设备接收了这包数据,
			//并不能说明该数据已经被发送了,发送时机以及发送哪一包数据由流量控制机制决定
			rc = rc == NET_XMIT_BYPASS ? NET_XMIT_SUCCESS : rc;
			goto out;
		}
		spin_unlock(&dev->queue_lock);
	}

	/* The device has no queue. Common case for software devices:
	   loopback, all the sorts of tunnels...

	   Really, it is unlikely that netif_tx_lock protection is necessary
	   here.  (f.e. loopback and IP tunnels are clean ignoring statistics
	   counters.)
	   However, it is possible, that they rely on protection
	   made by us here.

	   Check this and shot the lock. It is not prone from deadlocks.
	   Either shot noqueue qdisc, it is even simpler 8)
	 */
	//这里处理网络设备没有开启流量控制的情形。这时只能尝试直接发送,如果发送时机不对
	//或者是别的其它导致没有发送成功,那么只能丢弃数据包,因为设备没有队列保存数据

	//设备必须已经别打开,即执行过dev_open()
	if (dev->flags & IFF_UP) {
		int cpu = smp_processor_id(); /* ok because BHs are off */
		//dev_queue_xmit()函数在同一CPU上不能被递归调用
		if (dev->xmit_lock_owner != cpu) {
			HARD_TX_LOCK(dev, cpu);
			//设备状态ok,可以发送数据
			if (!netif_queue_stopped(dev) &&
			    !netif_subqueue_stopped(dev, skb)) {
				rc = 0;
				//调用驱动提供的dev_hard_start_xmit()接口发送数据,此外还处理GSO的情况
				if (!dev_hard_start_xmit(skb, dev)) {
					HARD_TX_UNLOCK(dev);
					goto out;
				}
			}
			HARD_TX_UNLOCK(dev);
			if (net_ratelimit())
				printk(KERN_CRIT "Virtual device %s asks to "
				       "queue packet!\n", dev->name);
		} else {
			/* Recursion is detected! It is possible,
			 * unfortunately */
			if (net_ratelimit())
				printk(KERN_CRIT "Dead loop on virtual device "
				       "%s, fix it urgently!\n", dev->name);
		}
	}

	rc = -ENETDOWN;
	rcu_read_unlock_bh();

out_kfree_skb:
	kfree_skb(skb);
	return rc;
out:
	rcu_read_unlock_bh();
	return rc;
}

从dev_queue_xmit()的实现来看,发送过程根据设备是否有队列(即有流量控制机制)分为两种情况,下面分别看这两种情况的发送过程。

1.1 两种情况的分别

可以看到,区别使用哪一种发送过程很简单,就是根据dev->qdics中是否指定了入队列函数enqueue(),如果指定了就是有队列发送,要使用流量控制机制;否则就是无队列发送。

1.2 RCU锁

这里还要注意的是在整个dev_queue_xmit()期间,使用了rcu_read_lock_bh()和rcu_read_unlock_bh()保护了对dev->qdics指针的访问,确保在函数执行期间该指针不会被修改。修改这个指针的地方见其它笔记介绍。

2. 有队列设备发送数据

这种情况下,首先调用qdisc->enqueue()将当前要发送的数据包入队列,然后调用qdisc_run()函数。enqueue()并不会真正发送,而且该函数与流量控制机制强相关,不同的流控机制使用的入队列函数不一样,我们暂时不分析该函数的实现,这里重点看下瑞队列之后的qdisc_run()到底干了些什么。

2.1 qdisc_run()

static inline void qdisc_run(struct net_device *dev)
{
	//1. 设备的发送队列开启(__LINK_STATE_XOFF标志位没有设置)
	//2. 设备没有被其它CPU调度发送(__LINK_STATE_QDISC_RUNNING标志位没有置位)
	//如果满足上述两个条件,那么设置调度标记,然后调用__qdisc_run()
	if (!netif_queue_stopped(dev) &&
	    !test_and_set_bit(__LINK_STATE_QDISC_RUNNING, &dev->state))
		__qdisc_run(dev);
}

qdisc_run()只是一个包裹函数,判断条件后调用__qdisc_run()。

void __qdisc_run(struct net_device *dev)
{
	unsigned long start_time = jiffies;

	while (qdisc_restart(dev)) {
		//如果设备的发送队列被关闭了,结束本次发送
		if (netif_queue_stopped(dev))
			break;

		/*
		 * Postpone processing if
		 * 1. another process needs the CPU;
		 * 2. we've been doing it for too long.
		 */
		//如注释所述,这里是为了避免发送过程持续太长。这种场景下,
		//设备还有数据要发,所以调用netif_schedule()激活发送软中断继续处理
		if (need_resched() || jiffies != start_time) {
			netif_schedule(dev);
			break;
		}
	}
	//退出该函数时清除__LINK_STATE_QDISC_RUNNING标志位
	clear_bit(__LINK_STATE_QDISC_RUNNING, &dev->state);
}

同样的,核心在qdisc_restart()的实现上。

/*
 * NOTE: Called under dev->queue_lock with locally disabled BH.
 *
 * __LINK_STATE_QDISC_RUNNING guarantees only one CPU can process this
 * device at a time. dev->queue_lock serializes queue accesses for
 * this device AND dev->qdisc pointer itself.
 *
 *  netif_tx_lock serializes accesses to device driver.
 *
 *  dev->queue_lock and netif_tx_lock are mutually exclusive,
 *  if one is grabbed, another must be free.
 *
 * Note, that this procedure can be called by a watchdog timer
 *
 * Returns to the caller:
 *				0  - queue is empty or throttled.
 *				>0 - queue is not empty.
 *
 */
//仔细看注释
static inline int qdisc_restart(struct net_device *dev)
{
	struct Qdisc *q = dev->qdisc;
	struct sk_buff *skb;
	int ret = NETDEV_TX_BUSY;

	//从发送队列中出队列一个数据包
	if (unlikely((skb = dev_dequeue_skb(dev, q)) == NULL))
		return 0;

	//关于锁见后面解释
	spin_unlock(&dev->queue_lock);

	HARD_TX_LOCK(dev, smp_processor_id());
	//只有配置支持使用多个发送队列的时候该函数才可能返回非0值,这里不考虑
	//调用dev_hard_start_xmit()使用驱动提供的发送回调发送这包数据
	if (!netif_subqueue_stopped(dev, skb))
		ret = dev_hard_start_xmit(skb, dev);
	HARD_TX_UNLOCK(dev);

	spin_lock(&dev->queue_lock);
	q = dev->qdisc;
	//根据发送返回结果进行处理
	switch (ret) {
	case NETDEV_TX_OK:
		//发送成功,返回当前发送队列中剩余数据包数目
		ret = qdisc_qlen(q);
		break;
	case NETDEV_TX_LOCKED:
		//设备持锁失败,那么进行CPU冲突相关处理。主要是进行统计,并将
		//数据包重新入队列,然后返回队列中剩余数据包数目
		ret = handle_dev_cpu_collision(skb, dev, q);
		break;
	default:
		/* Driver returned NETDEV_TX_BUSY - requeue skb */
		if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))
			printk(KERN_WARNING "BUG %s code %d qlen %d\n",
			       dev->name, ret, q->q.qlen);
		//其它情况都属于本次发送失败,那么将数据包重新入队列,然后返回队列中剩余数据包数目
		ret = dev_requeue_skb(skb, dev, q);
		break;
	}
	return ret;
}

2.1.1 锁

有队列设备发送过程还涉及另外两个锁:dev_queue_lock和HARD_TX_LOCK().

自旋锁dev_queue_lock用于控制同时只有一个流程在调用设备的流控机制发送,即保证入队列、出队列的操作序列化。

HARD_TX_LOCK()持有期间,要调用驱动提供的发送回调进行发送,这个过程要操作硬件,相对来讲比较耗时。为了保证:

扫描二维码关注公众号,回复: 4118635 查看本文章
1) 发送过程必须序列化,不能同时有两个流程调用驱动提供的发送回调;
2) 设备驱动发送过程中不能休眠;
3) 设备驱动发送期间,可以执行入队列操作。

框架设计了这个HARD_TX_LOCK(),先来看看这个锁的操作:

#define HARD_TX_LOCK(dev, cpu) {			\
	if ((dev->features & NETIF_F_LLTX) == 0) {	\
		__netif_tx_lock(dev, cpu);			\
	}						\
}

#define HARD_TX_UNLOCK(dev) {				\
	if ((dev->features & NETIF_F_LLTX) == 0) {	\
		netif_tx_unlock(dev);			\
	}						\
}

/**
 *	netif_tx_lock - grab network device transmit lock
 *	@dev: network device
 *	@cpu: cpu number of lock owner
 *
 * Get network device transmit lock
 */
static inline void __netif_tx_lock(struct net_device *dev, int cpu)
{
	spin_lock(&dev->_xmit_lock);
	dev->xmit_lock_owner = cpu;
}

static inline void netif_tx_unlock(struct net_device *dev)
{
	dev->xmit_lock_owner = -1;
	spin_unlock(&dev->_xmit_lock);
}

如果驱动程序在dev->feature字段中设置了NETIF_F_LLTX标记,表示驱动自己会实现这种锁,这时驱动会保证在提供的发送回调内部实现时,正确的持有该锁。

如果驱动没有设置NETIF_F_LLTX标记,那么就是用net_device中提供的自旋锁__xmit_lock,持有该锁时,将dev->xmit_lock_owner为当前持锁的CPU的ID。

需要强调下这种设计为什么能够实现上面列举的三点:首先自旋锁(无论是dev->_xmit_lock还是驱动自己提供的)可以保证1和2;因为在HARD_TX_LOCK()之前已经释放了dev->dev_queue_lock,所以可以执行入队列操作,但是因为没有清除标记__LINK_STATE_QDISC_RUNNING标志位,所以其它CPU根本无法调用qdisc_run()启动流控机制来执行出队列操作。

2.2 dev_hard_start_xmit()

看上面代码,可以看到最终的发送是dev_hard_start_xmit(),该函数处理GSO、支持勾包、调用驱动的发送回调发送数据包。

int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev)
{
	if (likely(!skb->next)) {
		//这里提供了一种在设备接口层的勾包机制,向注册在
		//ptype_all中的所有接口到发送一份数据包
		if (!list_empty(&ptype_all))
			dev_queue_xmit_nit(skb, dev);

		if (netif_needs_gso(dev, skb)) {
			if (unlikely(dev_gso_segment(skb)))
				goto out_kfree_skb;
			if (skb->next)
				goto gso;
		}
		//调用驱动的发送回调发送数据包
		return dev->hard_start_xmit(skb, dev);
	}

gso:
	//处理GSO情形
	do {
		struct sk_buff *nskb = skb->next;
		int rc;

		skb->next = nskb->next;
		nskb->next = NULL;
		//调用驱动的发送回调分别发送每一片
		rc = dev->hard_start_xmit(nskb, dev);
		if (unlikely(rc)) {
			nskb->next = skb->next;
			skb->next = nskb;
			return rc;
		}
		if (unlikely((netif_queue_stopped(dev) ||
			     netif_subqueue_stopped(dev, skb)) &&
			     skb->next))
			return NETDEV_TX_BUSY;
	} while (skb->next);

	skb->destructor = DEV_GSO_CB(skb)->destructor;

out_kfree_skb:
	kfree_skb(skb);
	return 0;
}

2.3 激活发送软中断

在__qdisc_run()中看到,如果设备还有数据要发送,那么就会调用netif_schedule(),看下这个过程的实现:

static inline void netif_schedule(struct net_device *dev)
{
	//发送队列没有关闭的情况下执行调度
	if (!test_bit(__LINK_STATE_XOFF, &dev->state))
		__netif_schedule(dev);
}

void __netif_schedule(struct net_device *dev)
{
	//如果__LINK_STATE_SCHED没有置位,说明设备还没有被发送软中断调度,
	//那么设置标志位,然后将设备放入当前CPU的输出队列中并激活软中断
	if (!test_and_set_bit(__LINK_STATE_SCHED, &dev->state)) {
		unsigned long flags;
		struct softnet_data *sd;

		local_irq_save(flags);
		//获取当前CPU的收发队列,将设备接入该队列
		sd = &__get_cpu_var(softnet_data);
		dev->next_sched = sd->output_queue;
		sd->output_queue = dev;
		//激活发送软中断
		raise_softirq_irqoff(NET_TX_SOFTIRQ);
		local_irq_restore(flags);
	}
}
EXPORT_SYMBOL(__netif_schedule);

这里,output_queue的使用和接收过程中poll_list的使用完全相同。

3. 无队列设备发送数据

看完有队列发送数据,再回头看看dev_queue_xmit()中无队列情况的处理,会发现所有的细节都已经涉及到,这里不再赘述。

4. 发送软中断net_tx_action()

从上面的代码中看到,当dev_queue_xmit()没有将队列中的数据发送完毕时,就会激活发送软中断继续发送。

实际上,大多数的数据发送都是由发送软中断驱动的,如果一次发送不完,会再次激活,在下一次软中断处理中继续发送,直到所有数据都发送完毕。

static void net_tx_action(struct softirq_action *h)
{
	//获取本CPU上面的发送队列
	struct softnet_data *sd = &__get_cpu_var(softnet_data);

	//completion_queue队列的目的是让驱动程序将数据发送完毕后,如果想要快速退出,
	//延后执行数据包的free动作,那么可以将待释放的数据包放入该队列,将释放动作放到
	//发送软中断中处理

	if (sd->completion_queue) {
		struct sk_buff *clist;

		//注意这种优化写法:先关中断,把数据从队列中摘下来,再开中断,由于free动作比较耗时,
		//而该队列是所有设备共享的,这种设计可以保证关中断的时间尽可能的短
		local_irq_disable();
		clist = sd->completion_queue;
		sd->completion_queue = NULL;
		local_irq_enable();

		//释放这些SKB
		while (clist) {
			struct sk_buff *skb = clist;
			clist = clist->next;
			//注意,放入完成队列的SKB的引用计数应该已经不为0
			BUG_TRAP(!atomic_read(&skb->users));
			__kfree_skb(skb);
		}
	}

	//处理output_queue,尝试发送
	if (sd->output_queue) {
		struct net_device *head;

		//和上面completetion_queue类似的处理办法,只是这里的队列元素是net_device
		local_irq_disable();
		head = sd->output_queue;
		sd->output_queue = NULL;
		local_irq_enable();

		while (head) {
			struct net_device *dev = head;
			//head指向待轮询列表的下一个设备
			head = head->next_sched;

			smp_mb__before_clear_bit();
			//清除当前设备的正在轮询状态
			clear_bit(__LINK_STATE_SCHED, &dev->state);
			//尝试通过流量控制机制发送,这里之所以能肯定使用了流量控制机制,是因为如果不使用,
			//那么在dev_queue_xmit()中就直接调用驱动的发送接口发送了,根本不会激活发送软中断
			//另外,注意这里为了尽可能的块,使用的是trylock()版本的持锁方式,也表达了这里只是
			//尽最大努力发送的意思,如果本次不能发送,那么就再次调度,等待下次软中断机会
			if (spin_trylock(&dev->queue_lock)) {
				qdisc_run(dev);
				spin_unlock(&dev->queue_lock);
			} else {
				//本次未进行发送,再次将该设备加入到output_queue,并激活发送软中断
				netif_schedule(dev);
			}
		}
	}
}

关于发送队列的描述,可以参考设备接口层之数据包接收.

5. 小结

下面来回顾下整个设备接口层的整个数据包发送过程:

  1. 高层协议将数据包准备好后,调用dev_queue_xmit()将数据包发送给设备接口层,这里面有个非常重要的前提是高层协议已经确定设备要从哪一个网络设备发出(设置了skb->dev字段);
  2. 在dev_queue_xmit()中处理线性化、校验和等处理后,根据设备上是否配置了发送队列,将发送过程分为两种:有队列发送和无队列发送,下面我们在只看有队列发送;
  3. 对于有队列发送,先将数据包入队列,然后调用qdisc_run()启动流量控制机制进行一次数据包发送尝试,如果发送失败,会将该设备加入到当前CPU的发送轮询队列output_queue中,然后激活网络发送软中断;
  4. 在网络发送软中断中,依次处理发送轮询队列ouput_queue中的待发送设备,处理时同样是启动流量控制机制进行数据包发送,如果一次软中断中无法发送完毕,那么就继续启动下一次软中断,如此反复,知道该设备的发送队列情况为止。

猜你喜欢

转载自blog.csdn.net/fanxiaoyu321/article/details/84184915