skaiuijing

引言

没有引言就是一种引言。

思考

笔者最近在看一些哲学书籍,来个哲学范式的开头:

存在永远不可能是静态的、孤立的。所以海德格尔在存在与时间一书中说:存在的意义是去存在

在本篇的开始,笔者想向各位说明本篇要说明的一种存在,但是这种存在本身又并非静态的存在,而是此在“去”而形成的存在。

正因为这种存在难以描述且不可捉摸,因此笔者也不知道如何描述这种存在,但这种存在具有生命力,而事物的生命力只在其动态瞬间显现。

总而言之,本篇描述的是协议栈的动态过程,即协议栈本身与内存等模块的动态交互,而并不单单局限于某个静态的题材本身,通过这种动态的过程,我们得以窥见其存在。

内存管理

Linux内核的伙伴算法、slab分配器这些,网上的教程已经讲得很多了,笔者就不过多赘述了。

我们关心的重点永远是协议栈,而不是内存的管理算法是否高效。

换而言之,协议栈如何与内存交互,从而显现其存在与作为?

但是我们有一个问题需要解决,

内存从何而来?

数据包的内存从何而来?

在前文,我们介绍了数据包接收的过程,提到了ringbuf,我们知道内核协议栈接收数据包时会写入ringbuf,但是,是把数据写入到了ringbuf了吗?

其实不是,ringbuf中的只是指针(entry),真正的数据并不在ringbuf中。

以intel的e1000网卡为例:

1.在 e1000_alloc_rx_buffers() 中,驱动分配了 skb,并将 skb->data 映射为 DMA 地址。

2.网卡收到数据包后,通过 DMA 写入 skb->data 指向的地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

/**
* e1000_alloc_rx_buffers - Replace used receive buffers
* @rx_ring: Rx descriptor ring
* @cleaned_count: number to reallocate
* @gfp: flags for allocation
**/
static void e1000_alloc_rx_buffers(struct e1000_ring *rx_ring,
int cleaned_count, gfp_t gfp)
{
struct e1000_adapter *adapter = rx_ring->adapter;
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
union e1000_rx_desc_extended *rx_desc;
struct e1000_buffer *buffer_info;
struct sk_buff *skb;
unsigned int i;
unsigned int bufsz = adapter->rx_buffer_len;

i = rx_ring->next_to_use;
//指向下一个entry
buffer_info = &rx_ring->buffer_info[i];

//复用或者是分配skb,但是复用是怎么来的呢?让我们后面揭晓
while (cleaned_count--) {
skb = buffer_info->skb;
if (skb) {
skb_trim(skb, 0);
goto map_skb;
}

//分配skb及内存空间
skb = __netdev_alloc_skb_ip_align(netdev, bufsz, gfp);
if (!skb) {
/* Better luck next round */
adapter->alloc_rx_buff_failed++;
break;
}

buffer_info->skb = skb;
map_skb:
//将 skb->data 指向的虚拟地址映射为物理地址,供网卡 DMA 写入
buffer_info->dma = dma_map_single(&pdev->dev, skb->data,
adapter->rx_buffer_len,
DMA_FROM_DEVICE);
if (dma_mapping_error(&pdev->dev, buffer_info->dma)) {
dev_err(&pdev->dev, "Rx DMA map failed\n");
adapter->rx_dma_failed++;
break;
}
//获取当前 entry 的 RX 描述符,并将刚刚映射好的 DMA 地址写入 buffer_addr 字段,告诉网卡“新数据请写到这里”
rx_desc = E1000_RX_DESC_EXT(*rx_ring, i);
//这就是ringbuf的实际entry
rx_desc->read.buffer_addr = cpu_to_le64(buffer_info->dma);

if (unlikely(!(i & (E1000_RX_BUFFER_WRITE - 1)))) {
/* Force memory writes to complete before letting h/w
* know there are new descriptors to fetch. (Only
* applicable for weak-ordered memory model archs,
* such as IA-64).
*/
wmb();
if (adapter->flags2 & FLAG2_PCIM2PCI_ARBITER_WA)
e1000e_update_rdt_wa(rx_ring, i);
else
writel(i, rx_ring->tail);
}
//移动到下一个 ring entry
i++;
if (i == rx_ring->count)
i = 0;
buffer_info = &rx_ring->buffer_info[i];
}

rx_ring->next_to_use = i;
}

因此,我们总结如下:

网卡初始化时的工作:

1.为每个接收描述符分配一个 sk_buff

2. skb->data 的物理地址映射为 DMA 地址

3.将 DMA 地址写入对应的接收描述符(Rx descriptor)

4.更新网卡硬件的接收环形缓冲区,使其准备好接收下一个数据包

这一切似乎看起来都很合理,但是,有一个意外的参数:cleaned_count。它正是串联了整个协议栈的sk_buff内存起始于结束相关的一个参数。

那么内存究竟从何而来呢?

这其实就是伙伴算法、slab分配器的事情了,但是,不管我们是使用alloc_page还是什么,我们的目的其实就是为了得到一块内存,而得到内存本身内存本身的事情,而不是协议栈与内存交互的事情,因为协议栈只关心如何得到一块内存。

得到内存本身,在乎的是得到内存本身的过程是否高效。

得到一块内存,仅仅在乎是否能得到一块内存。

协议栈子系统与内存子系统的交流就是这么简单。

数据包与内存的起始与结束

在黑格尔的小逻辑学里,任何事物本身逻辑是自洽的,也就是说,不管事情如何向着高处螺旋式发展,最终,终点也必定会成为起点。

而数据包也是一样。

初始化

驱动开机/重启时,e1000e_setup_rx_resources() 循环调用 e1000_alloc_rx_buffers(),给所有 entry 填满 skb->data映射的DMA 地址

结束时

e1000_clean_rx_irq() 处理完若干个 entry 之后,会带着 cleaned_count 再次调用 e1000_alloc_rx_buffers()

这里再次给用掉的 entry 重新分配 skb 并写入 descriptor

这样每次网卡想写包时,descriptor 都已经指向了一块 sk_buff 的 data 缓冲区。

这样,sk_buff分配的结束,却是sk_buff新一轮分配的开始。

sk_buff分配的结束,却是进入协议栈的开始,e1000_clean_rx_irq会将数据包送入协议栈。

数据包如何被送入协议栈?

buffer_info 就是驱动在 RX ring 结构里维护的一块“元数据”数组,用来记录每个 descriptor(ring entry)对应的实际接收缓冲区(sk_buff 或 page fragment)以及它的 DMA 映射信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165

/**
* e1000_clean_rx_irq - Send received data up the network stack
* @rx_ring: Rx descriptor ring
* @work_done: output parameter for indicating completed work
* @work_to_do: how many packets we can clean
*
* the return value indicates whether actual cleaning was done, there
* is no guarantee that everything was cleaned
**/
static bool e1000_clean_rx_irq(struct e1000_ring *rx_ring, int *work_done,
int work_to_do)
{
//初始化指针与状态,rx_desc 是网卡写入的描述符,包含状态位和数据长度
struct e1000_adapter *adapter = rx_ring->adapter;
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
struct e1000_hw *hw = &adapter->hw;
union e1000_rx_desc_extended *rx_desc, *next_rxd;
struct e1000_buffer *buffer_info, *next_buffer;
u32 length, staterr;
unsigned int i;
//清理了多少个的同时也意味着要重新构造多少个
int cleaned_count = 0;
bool cleaned = false;
unsigned int total_rx_bytes = 0, total_rx_packets = 0;

i = rx_ring->next_to_clean;
rx_desc = E1000_RX_DESC_EXT(*rx_ring, i);
staterr = le32_to_cpu(rx_desc->wb.upper.status_error);
buffer_info = &rx_ring->buffer_info[i];

while (staterr & E1000_RXD_STAT_DD) {
struct sk_buff *skb;

if (*work_done >= work_to_do)
break;
(*work_done)++;
dma_rmb(); /* read descriptor and rx_buffer_info after status DD */
//获取skb
skb = buffer_info->skb;
buffer_info->skb = NULL;

prefetch(skb->data - NET_IP_ALIGN);

i++;
if (i == rx_ring->count)
i = 0;
next_rxd = E1000_RX_DESC_EXT(*rx_ring, i);
prefetch(next_rxd);

next_buffer = &rx_ring->buffer_info[i];

cleaned = true;
cleaned_count++;
//清理DMA映射
dma_unmap_single(&pdev->dev, buffer_info->dma,
adapter->rx_buffer_len, DMA_FROM_DEVICE);
buffer_info->dma = 0;
//获取数据长度
length = le16_to_cpu(rx_desc->wb.upper.length);

/* !EOP means multiple descriptors were used to store a single
* packet, if that's the case we need to toss it. In fact, we
* need to toss every packet with the EOP bit clear and the
* next frame that _does_ have the EOP bit set, as it is by
* definition only a frame fragment
*/
if (unlikely(!(staterr & E1000_RXD_STAT_EOP)))
adapter->flags2 |= FLAG2_IS_DISCARDING;

if (adapter->flags2 & FLAG2_IS_DISCARDING) {
/* All receives must fit into a single buffer */
e_dbg("Receive packet consumed multiple buffers\n");
/* recycle */
buffer_info->skb = skb;
if (staterr & E1000_RXD_STAT_EOP)
adapter->flags2 &= ~FLAG2_IS_DISCARDING;
goto next_desc;
}

if (unlikely((staterr & E1000_RXDEXT_ERR_FRAME_ERR_MASK) &&
!(netdev->features & NETIF_F_RXALL))) {
/* recycle */
buffer_info->skb = skb;
goto next_desc;
}

/* adjust length to remove Ethernet CRC */
if (!(adapter->flags2 & FLAG2_CRC_STRIPPING)) {
/* If configured to store CRC, don't subtract FCS,
* but keep the FCS bytes out of the total_rx_bytes
* counter
*/
if (netdev->features & NETIF_F_RXFCS)
total_rx_bytes -= 4;
else
length -= 4;
}

total_rx_bytes += length;
total_rx_packets++;

/* code added for copybreak, this should improve
* performance for small packets with large amounts
* of reassembly being done in the stack
*/
if (length < copybreak) {
//长度过小
struct sk_buff *new_skb =
//如果数据包小于256字节,分配小包专用的skb
napi_alloc_skb(&adapter->napi, length);
if (new_skb) {
//拷贝数据(含 NET_IP_ALIGN 前缀对齐)
skb_copy_to_linear_data_offset(new_skb,
-NET_IP_ALIGN,
(skb->data -
NET_IP_ALIGN),
(length +
NET_IP_ALIGN));
/* save the skb in buffer_info as good */看看,这里又把大的skb重新放回去了
buffer_info->skb = skb;
//使用小包
skb = new_skb;
}
/* else just continue with the old one */
}
/* end copybreak code */
skb_put(skb, length);

/* Receive Checksum Offload */
e1000_rx_checksum(adapter, staterr, skb);

e1000_rx_hash(netdev, rx_desc->wb.lower.hi_dword.rss, skb);
//提交给协议栈
e1000_receive_skb(adapter, netdev, skb, staterr,
rx_desc->wb.upper.vlan);

next_desc:
rx_desc->wb.upper.status_error &= cpu_to_le32(~0xFF);

/* return some buffers to hardware, one at a time is too slow */
if (cleaned_count >= E1000_RX_BUFFER_WRITE) {
adapter->alloc_rx_buf(rx_ring, cleaned_count,
GFP_ATOMIC);
cleaned_count = 0;
}

/* use prefetched values */
rx_desc = next_rxd;
buffer_info = next_buffer;

staterr = le32_to_cpu(rx_desc->wb.upper.status_error);
}
rx_ring->next_to_clean = i;

cleaned_count = e1000_desc_unused(rx_ring);
if (cleaned_count)
adapter->alloc_rx_buf(rx_ring, cleaned_count, GFP_ATOMIC);

adapter->total_rx_bytes += total_rx_bytes;
adapter->total_rx_packets += total_rx_packets;
return cleaned;
}

小包优化

在检测到是小包后,仅仅设置了这么一句:

1
buffer_info->skb = skb;

那么这个大的skb到底是怎么被复用的呢?

还记得我们的e1000_alloc_rx_buffers开头分配skb有复用吗?

其实就在于这里了:

1
2
3
4
5
6
7
8
9
10
11
static void e1000_alloc_rx_buffers(struct e1000_ring *rx_ring,
int cleaned_count, gfp_t gfp)
{
省略一堆
//这里的复用就包含了这个大的skb(相对于小包来说太大了)
while (cleaned_count--) {
skb = buffer_info->skb;
if (skb) {
skb_trim(skb, 0);
goto map_skb;
}

总结

现在对内容进行总结:

1
2
3
4
graph LR
Aa(驱动初始化时调用alloc_rx_buffers方法分配rinfbuf)-->Ab(完成数据包分配及DMA映射)
Ba(网卡传输数据)-->Bb(直接DMA找到ringbuf映射的地址写入数据)
Ca(ksoftirqd进程工作)-->Cc(调用e1000_clean_rx_irq)-->Cb(把数据包丢给上层并复用skb)

ringbuf的实质是什么呢?其实就是一个存放指针及状态的环形数组而已。但是,它让分配内存与分配数据包实现了解耦。这就是中间件的哲学设计所在。

而在FreeBSD中,是直接使用队列完成的。数据包的内存也不并不是全是预先分配的,而是按需动态分配的,当然,虽然也有cluster这样的存在。

在FreeBSD中,buf结构有四种,并不都像Linux这样直接使用指针,而是使用混合策略。

其中有一种mbuf,是直接使用c语言的动态数组,把数据包内容复制到mbuf后面,然后再转交给应用层的。而使用cluster时,其实与Linux大差不差。

关于内核与内存的交互,是如何进行性能优化的,其实就两点:

  1. 一个是使用指针及中间数据结构,避免拷贝。

  2. 另一个就是:根据实际情况合理复用内存。

在实际开发中,我们可以通过ethtool等工具调整ringbuf大小,优化网络性能:

1
ethtool -G <iface> rx <SIZE> tx <SIZE>

我们也可以控制软中断接收队列最大积压包数:

1
echo 8000 > /proc/sys/net/core/netdev_max_backlog

也可以调整中断频率,这样就可以一次性处理多个包。

就留给读者慢慢理解了,笔者已经讲完了。