JasonWang's Blog

Linux网络协议之数据发送流程

字数统计: 7.8k阅读时长: 39 min
2021/04/08

最近抽空学习下Linux网络协议栈, 读源码时总会前一天梳理完, 等两天再来看却发现又忘记的差不多, 只能再过一遍, 没有对协议栈构建一个系统性的框架. 于是想着要把看过的代码逻辑整理下来, 算是对这段时间学习的总结, 也方便后面的查阅.

在之前的一篇有关网络协议的文章从NAPI说一说Linux内核数据的接收流程简单的讲到了Linux的数据接收流程, 但主要是集中在数据链路层与设备驱动之间的交互, 并没有涉及到IP网络层以及TCP传输层的逻辑.对于Linux系统来说, TCP数据的传输大致要经历如下几个步骤:

  • 用户进程创建socket并创建相应数据的buffer, 通过send函数发送给服务端
  • 内核收到数据后, 将用户空间的buffer数据拷贝到内核协议栈
  • 内核协议栈需要经过TCP层(L4)/IP层(L3)/数据链路层(L2)最后发送到网卡
  • CPU与网卡的数据传输一般通过DMA进行, 完成后网卡发送中断告知CPU, 此时内核会释放之前分配的buffer

这篇文章, 我们着重来看下数据传输中内核部分的流程, 并梳理下Linux协议栈大致的结构与初始化步骤, 主要分为如下两个部分:

  • Linux内核协议栈的初始化流程
  • 数据是如何从TCP传输层发送到设备驱动的

本文基于Linux 4.14版本的ipv4, 没有关注ipv6部分的实现

内核协议栈的初始化

Linux内核的初始化的逻辑都放在了af_inet.c(kernel/net/ipv4)这个文件, 系统启动的时候, 会调用初始化的函数inet_init, 由这个函数负责注册socket层的接口,初始化TCP/IP层以及注册IP层与数据链路层的接口.

  • proto_register注册TCP/UDP/ICMP相关的协议, 并分配相应的slab空间; 这些传输层协议负责与socket层进行交互, 比如用户写入到socket数据后会发送到这些协议; 并接收来自网络层的数据后将数据写入到用户空间的缓冲区
  • sock_register注册接口用于创建socket对象
  • 接着把通过inet_add_protocol把传输层的各个协议如TCP/UDP/ICMP等的接口注册到网络层, 这样网络层数据回来之后就可以通过这些注册的回调接口进行处理, 并将数据写到对应应用的socket缓冲区
  • inet_register_protosw注册socket层的接口, linux内核提供了三种形式的SOCK_STREAM/SOCK_DGRAM/SOCK_RAW socket, 一般来说TCP使用的是SOCK_STREAM字节流形式的socket, 而UDP/ICMP使用的是SOCK_DGRAM数据报文形式的socket
  • xxx_init函数会依次初始化ARP/IP/TCP等各个模块, 为其分配相应的slab内存空间
  • 最后dev_add_pack向链路层添加一个数据接收的接口ip_packet_type
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

// af_inet.c

static int __init inet_init(void)
{
struct inet_protosw *q;
struct list_head *r;
int rc = -EINVAL;

sock_skb_cb_check_size(sizeof(struct inet_skb_parm));

rc = proto_register(&tcp_prot, 1);
if (rc)
goto out;

rc = proto_register(&udp_prot, 1);
if (rc)
goto out_unregister_tcp_proto;

rc = proto_register(&raw_prot, 1);
if (rc)
goto out_unregister_udp_proto;

rc = proto_register(&ping_prot, 1);
if (rc)
goto out_unregister_raw_proto;

/*
* Tell SOCKET that we are alive...
*/

(void)sock_register(&inet_family_ops);

#ifdef CONFIG_SYSCTL
ip_static_sysctl_init();
#endif

/*
* Add all the base protocols.
*/

if (inet_add_protocol(&icmp_protocol, IPPROTO_ICMP) < 0)
pr_crit("%s: Cannot add ICMP protocol\n", __func__);
if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0)
pr_crit("%s: Cannot add UDP protocol\n", __func__);
if (inet_add_protocol(&tcp_protocol, IPPROTO_TCP) < 0)
pr_crit("%s: Cannot add TCP protocol\n", __func__);
#ifdef CONFIG_IP_MULTICAST
if (inet_add_protocol(&igmp_protocol, IPPROTO_IGMP) < 0)
pr_crit("%s: Cannot add IGMP protocol\n", __func__);
#endif

/* Register the socket-side information for inet_create. */
for (r = &inetsw[0]; r < &inetsw[SOCK_MAX]; ++r)
INIT_LIST_HEAD(r);

for (q = inetsw_array; q < &inetsw_array[INETSW_ARRAY_LEN]; ++q)
inet_register_protosw(q);

/*
* Set the ARP module up
*/

arp_init();

/*
* Set the IP module up
*/

ip_init();

/* Setup TCP slab cache for open requests. */
tcp_init();

/* Setup UDP memory threshold */
udp_init();

/* Add UDP-Lite (RFC 3828) */
udplite4_register();

ping_init();

/*
* Set the ICMP layer up
*/

if (icmp_init() < 0)
panic("Failed to create the ICMP control socket.\n");

/*
* Initialise the multicast router
*/
#if defined(CONFIG_IP_MROUTE)
if (ip_mr_init())
pr_crit("%s: Cannot init ipv4 mroute\n", __func__);
#endif

if (init_inet_pernet_ops())
pr_crit("%s: Cannot init ipv4 inet pernet ops\n", __func__);
/*
* Initialise per-cpu ipv4 mibs
*/

if (init_ipv4_mibs())
pr_crit("%s: Cannot init ipv4 mibs\n", __func__);

ipv4_proc_init();

ipfrag_init();

dev_add_pack(&ip_packet_type);

...

}

fs_initcall(inet_init);

总的说来, Linux内核把网络协议栈的实现大致分为了四个层级:

  • socket层: 为用户空间提供系统调用的接口, 并负责与下一层TCP传输层进行交互
  • TCP传输层: 就是通常所说的TCP/IP协议栈对应的数据传输层, 传输层一方面要为socket层提供接口, 另一方面要为网络层的数据接收提供接口
  • IP网络层: 对应TCP/IP协议栈的网络层, 负责将传输层的数据分片/路由, 并发送到对应的网络设备; 接收到数据后发送给对应的传输层协议
  • 数据链路层: 需要负责物理设备与IP地址之间的转换, 将数据发送给物理设备, 接收来自物理设备的数据

对协议栈有了这个大致的框架层级图, 梳理整个协议栈的代码就会清晰很多.在讲具体的代码细节之前, 我们先来看一个大致的数据收发流程图:

Linux Network stack flow

沿着这个流程简图, 我们就来仔细看下数据的发送流程(基于TCP协议).

数据发送流程

这里把整个发送流程按照TCP/IP协议栈分为三个部分来分析:

  • TCP传输层
  • IP网络层
  • 数据链路层

TCP传输层流程

在分析具体流程之前, 可以看下TCP层数据传输的具体流程简图, 方便跟踪代码:

TCP data flow

简单起见, 这里跳过了TCP socket创建与连接建立的过程(感兴趣的可以参看kernel/net/socket.c), 只着重于数据的发送过程. 在第一部分初始化的时候, 我们看到, socket层与TCP传输层的接口是tcp_prot, 这个结构体在tcp_ipv4.c中定义, 其接口包含了socket连接(connect), 连接断开(disconnect), 接受连接(accept), 接收消息(recvmsg), 发送消息sendmsg等.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

//tcp_ipv4.c

struct proto tcp_prot = {
.name = "TCP",
.owner = THIS_MODULE,
.close = tcp_close,
.connect = tcp_v4_connect,
.disconnect = tcp_disconnect,
.accept = inet_csk_accept,
.ioctl = tcp_ioctl,
.init = tcp_v4_init_sock,
.destroy = tcp_v4_destroy_sock,
.shutdown = tcp_shutdown,
.setsockopt = tcp_setsockopt,
.getsockopt = tcp_getsockopt,
.keepalive = tcp_set_keepalive,
.recvmsg = tcp_recvmsg,
.sendmsg = tcp_sendmsg,
.sendpage = tcp_sendpage,
.backlog_rcv = tcp_v4_do_rcv,
.release_cb = tcp_release_cb,
.hash = inet_hash,
.unhash = inet_unhash,
.get_port = inet_csk_get_port,
.enter_memory_pressure = tcp_enter_memory_pressure,
.leave_memory_pressure = tcp_leave_memory_pressure,
.stream_memory_free = tcp_stream_memory_free,
.sockets_allocated = &tcp_sockets_allocated,
.orphan_count = &tcp_orphan_count,
.memory_allocated = &tcp_memory_allocated,
.memory_pressure = &tcp_memory_pressure,
.sysctl_mem = sysctl_tcp_mem,
.sysctl_wmem = sysctl_tcp_wmem,
.sysctl_rmem = sysctl_tcp_rmem,
.max_header = MAX_TCP_HEADER,
.obj_size = sizeof(struct tcp_sock),
.slab_flags = SLAB_TYPESAFE_BY_RCU,
.twsk_prot = &tcp_timewait_sock_ops,
.rsk_prot = &tcp_request_sock_ops,
...
};
EXPORT_SYMBOL(tcp_prot);

socket层接收到用户进程的数据发送请求后, inet_sendmsg就是调用了tcp_protsendmsg来传输数据的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

//af_inet.c
int inet_sendmsg(struct socket *sock, struct msghdr *msg, size_t size)
{
struct sock *sk = sock->sk;

sock_rps_record_flow(sk);

/* We may need to bind the socket. */
if (!inet_sk(sk)->inet_num && !sk->sk_prot->no_autobind &&
inet_autobind(sk))
return -EAGAIN;

return sk->sk_prot->sendmsg(sk, msg, size);
}

#### tcp.c ####
int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
{
int ret;

lock_sock(sk);
ret = tcp_sendmsg_locked(sk, msg, size);
release_sock(sk);

return ret;
}


在发送数据之前, 先通过lock_sock获取到锁后, 然后调用tcp_sendmsg_locked发送数据, 这里主要做如下几个事情:

  • 查看socket标志位是否有MSG_ZEROCOPY以及MSG_FASTOPEN(在三次握手时即开始传输数据)
  • 判断当前TCP是否处于TCPF_ESTABLISHED连接状态, 否则需要等待连接成功后再发送数据
  • 如果发生了数据拷贝, 则调用tcp_push执行数据的发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215

// tcp.c
int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
{
struct tcp_sock *tp = tcp_sk(sk);
struct ubuf_info *uarg = NULL;
struct sk_buff *skb;
struct sockcm_cookie sockc;
int flags, err, copied = 0;
int mss_now = 0, size_goal, copied_syn = 0;
bool process_backlog = false;
bool sg;
long timeo;

flags = msg->msg_flags;

if (flags & MSG_ZEROCOPY && size && sock_flag(sk, SOCK_ZEROCOPY)) {
if ((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) {
err = -EINVAL;
goto out_err;
}

skb = tcp_send_head(sk) ? tcp_write_queue_tail(sk) : NULL;
uarg = sock_zerocopy_realloc(sk, size, skb_zcopy(skb));
if (!uarg) {
err = -ENOBUFS;
goto out_err;
}

if (!(sk_check_csum_caps(sk) && sk->sk_route_caps & NETIF_F_SG))
uarg->zerocopy = 0;
}

if (unlikely(flags & MSG_FASTOPEN || inet_sk(sk)->defer_connect) &&
!tp->repair) {
err = tcp_sendmsg_fastopen(sk, msg, &copied_syn, size);
if (err == -EINPROGRESS && copied_syn > 0)
goto out;
else if (err)
goto out_err;
}
...

/* Wait for a connection to finish. One exception is TCP Fast Open
* (passive side) where data is allowed to be sent before a connection
* is fully established.
*/
if (((1 << sk->sk_state) & ~(TCPF_ESTABLISHED | TCPF_CLOSE_WAIT)) &&
!tcp_passive_fastopen(sk)) {
err = sk_stream_wait_connect(sk, &timeo);
if (err != 0)
goto do_error;
}
...

/* Ok commence sending. */
copied = 0;

restart:
mss_now = tcp_send_mss(sk, &size_goal, flags);
...

// 不断发送msg消息中的数据
while (msg_data_left(msg)) {
int copy = 0;
int max = size_goal;

skb = tcp_write_queue_tail(sk);
if (tcp_send_head(sk)) {
if (skb->ip_summed == CHECKSUM_NONE)
max = mss_now;
copy = max - skb->len;
}

if (copy <= 0 || !tcp_skb_can_collapse_to(skb)) {
bool first_skb;

new_segment:
/* Allocate new segment. If the interface is SG,
* allocate skb fitting to single page.
*/
if (!sk_stream_memory_free(sk))
goto wait_for_sndbuf;

if (process_backlog && sk_flush_backlog(sk)) {
process_backlog = false;
goto restart;
}
first_skb = skb_queue_empty(&sk->sk_write_queue);
skb = sk_stream_alloc_skb(sk,
select_size(sk, sg, first_skb),
sk->sk_allocation,
first_skb);
if (!skb)
goto wait_for_memory;

process_backlog = true;
/*
* Check whether we can use HW checksum.
*/
if (sk_check_csum_caps(sk))
skb->ip_summed = CHECKSUM_PARTIAL;

skb_entail(sk, skb);
copy = size_goal;
max = size_goal;
...
/* Try to append data to the end of skb. */
if (copy > msg_data_left(msg))
copy = msg_data_left(msg);

/* Where to copy to? */
if (skb_availroom(skb) > 0) {
/* We have some space in skb head. Superb! */
copy = min_t(int, copy, skb_availroom(skb));
err = skb_add_data_nocache(sk, skb, &msg->msg_iter, copy);
if (err)
goto do_fault;
} else if (!uarg || !uarg->zerocopy) {
bool merge = true;
int i = skb_shinfo(skb)->nr_frags;
struct page_frag *pfrag = sk_page_frag(sk);

if (!sk_page_frag_refill(sk, pfrag))
goto wait_for_memory;

if (!skb_can_coalesce(skb, i, pfrag->page,
pfrag->offset)) {
if (i >= sysctl_max_skb_frags || !sg) {
tcp_mark_push(tp, skb);
goto new_segment;
}
merge = false;
}

copy = min_t(int, copy, pfrag->size - pfrag->offset);

if (!sk_wmem_schedule(sk, copy))
goto wait_for_memory;

err = skb_copy_to_page_nocache(sk, &msg->msg_iter, skb,
pfrag->page,
pfrag->offset,
copy);
if (err)
goto do_error;

/* Update the skb. */
if (merge) {
skb_frag_size_add(&skb_shinfo(skb)->frags[i - 1], copy);
} else {
skb_fill_page_desc(skb, i, pfrag->page,
pfrag->offset, copy);
page_ref_inc(pfrag->page);
}
pfrag->offset += copy;
} else {
err = skb_zerocopy_iter_stream(sk, skb, msg, copy, uarg);
if (err == -EMSGSIZE || err == -EEXIST)
goto new_segment;
if (err < 0)
goto do_error;
copy = err;
}

if (!copied)
TCP_SKB_CB(skb)->tcp_flags &= ~TCPHDR_PSH;

tp->write_seq += copy;
TCP_SKB_CB(skb)->end_seq += copy;
tcp_skb_pcount_set(skb, 0);

copied += copy;
// 没有数据, 则中断拷贝
if (!msg_data_left(msg)) {
if (unlikely(flags & MSG_EOR))
TCP_SKB_CB(skb)->eor = 1;
goto out;
}

if (skb->len < max || (flags & MSG_OOB) || unlikely(tp->repair))
continue;

// 如果当前写入的数据超过了最大window的2倍, 就直接发送
if (forced_push(tp)) {
tcp_mark_push(tp, skb);
__tcp_push_pending_frames(sk, mss_now, TCP_NAGLE_PUSH);
} else if (skb == tcp_send_head(sk))
tcp_push_one(sk, mss_now);
continue;

wait_for_sndbuf:
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
wait_for_memory:
if (copied)
tcp_push(sk, flags & ~MSG_MORE, mss_now,
TCP_NAGLE_PUSH, size_goal);

err = sk_stream_wait_memory(sk, &timeo);
if (err != 0)
goto do_error;

mss_now = tcp_send_mss(sk, &size_goal, flags);
}

out:
// 当前有数据需要发送
if (copied) {
tcp_tx_timestamp(sk, sockc.tsflags, tcp_write_queue_tail(sk));
tcp_push(sk, flags, mss_now, tp->nonagle, size_goal);
}

....
}

tcp_push首先检查发送队列是否为空, 为空则直接返回, 接着会检查标志位flags是否包含有MSG_MORE/MSG_OOB;另外, 还会判断是否满足auto cork的条件(简单来说, auto corking是TCP针对小包发送的一种优化, 详细可以参考https://lwn.net/Articles/576263/). 最后通过__tcp_push_pending_frames传送对应的数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

// tcp.c
static void tcp_push(struct sock *sk, int flags, int mss_now,
int nonagle, int size_goal)
{
struct tcp_sock *tp = tcp_sk(sk);
struct sk_buff *skb;

if (!tcp_send_head(sk))
return;

skb = tcp_write_queue_tail(sk);
if (!(flags & MSG_MORE) || forced_push(tp))
tcp_mark_push(tp, skb);

// 是否属于OOB(out of band)紧急数据
tcp_mark_urg(tp, flags);

if (tcp_should_autocork(sk, skb, size_goal)) {

/* avoid atomic op if TSQ_THROTTLED bit is already set */
if (!test_bit(TSQ_THROTTLED, &sk->sk_tsq_flags)) {
NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPAUTOCORKING);
set_bit(TSQ_THROTTLED, &sk->sk_tsq_flags);
}
/* It is possible TX completion already happened
* before we set TSQ_THROTTLED.
*/
if (refcount_read(&sk->sk_wmem_alloc) > skb->truesize)
return;
}

if (flags & MSG_MORE)
nonagle = TCP_NAGLE_CORK;

__tcp_push_pending_frames(sk, mss_now, nonagle);
}


__tcp_push_pending_frames首先会判断当前socket是否close, 非close状态则调用tcp_write_xmit继续发送数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

// tcp_output.c
void __tcp_push_pending_frames(struct sock *sk, unsigned int cur_mss,
int nonagle)
{
/* If we are closed, the bytes will have to remain here.
* In time closedown will finish, we empty the write queue and
* all will be happy.
*/
if (unlikely(sk->sk_state == TCP_CLOSE))
return;

if (tcp_write_xmit(sk, cur_mss, nonagle, 0,
sk_gfp_mask(sk, GFP_ATOMIC)))
tcp_check_probe_timer(sk);
}


tcp_write_xmit的函数的核心逻辑就是不断取出socket发送队列中的skb_buff, 然后通过tcp_transmit_skb发送出去, 而tcp_transmit_skb实际只是调用了__tcp_transmit_skb而已:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

// tcp_output.c
static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle,
int push_one, gfp_t gfp)
{
struct tcp_sock *tp = tcp_sk(sk);
struct sk_buff *skb;
unsigned int tso_segs, sent_pkts;
int cwnd_quota;
int result;
bool is_cwnd_limited = false, is_rwnd_limited = false;
u32 max_segs;

sent_pkts = 0;

tcp_mstamp_refresh(tp);
// 如果不是发送一个段的包, 则执行PMTU
if (!push_one) {
/* Do MTU probing. */
result = tcp_mtu_probe(sk);
if (!result) {
return false;
} else if (result > 0) {
sent_pkts = 1;
}
}

max_segs = tcp_tso_segs(sk, mss_now);
while ((skb = tcp_send_head(sk))) {
unsigned int limit;

if (tcp_pacing_check(sk))
break;

tso_segs = tcp_init_tso_segs(skb, mss_now);
BUG_ON(!tso_segs);

if (unlikely(tp->repair) && tp->repair_queue == TCP_SEND_QUEUE) {
/* "skb_mstamp" is used as a start point for the retransmit timer */
skb->skb_mstamp = tp->tcp_mstamp;
goto repair; /* Skip network transmission */
}

cwnd_quota = tcp_cwnd_test(tp, skb);
if (!cwnd_quota) {
if (push_one == 2)
/* Force out a loss probe pkt. */
cwnd_quota = 1;
else
break;
}

if (unlikely(!tcp_snd_wnd_test(tp, skb, mss_now))) {
is_rwnd_limited = true;
break;
}

if (tso_segs == 1) {
if (unlikely(!tcp_nagle_test(tp, skb, mss_now,
(tcp_skb_is_last(sk, skb) ?
nonagle : TCP_NAGLE_PUSH))))
break;
} else {
if (!push_one &&
tcp_tso_should_defer(sk, skb, &is_cwnd_limited,
&is_rwnd_limited, max_segs))
break;
}

limit = mss_now;
if (tso_segs > 1 && !tcp_urg_mode(tp))
limit = tcp_mss_split_point(sk, skb, mss_now,
min_t(unsigned int,
cwnd_quota,
max_segs),
nonagle);

if (skb->len > limit &&
unlikely(tso_fragment(sk, skb, limit, mss_now, gfp)))
break;

if (test_bit(TCP_TSQ_DEFERRED, &sk->sk_tsq_flags))
clear_bit(TCP_TSQ_DEFERRED, &sk->sk_tsq_flags);
if (tcp_small_queue_check(sk, skb, 0))
break;

/* Argh, we hit an empty skb(), presumably a thread
* is sleeping in sendmsg()/sk_stream_wait_memory().
* We do not want to send a pure-ack packet and have
* a strange looking rtx queue with empty packet(s).
*/
if (TCP_SKB_CB(skb)->end_seq == TCP_SKB_CB(skb)->seq)
break;

// 发送数据
if (unlikely(tcp_transmit_skb(sk, skb, 1, gfp)))
break;

repair:
/* Advance the send_head. This one is sent out.
* This call will increment packets_out.
*/
tcp_event_new_data_sent(sk, skb);

tcp_minshall_update(tp, mss_now, skb);
sent_pkts += tcp_skb_pcount(skb);

if (push_one)
break;
}

if (is_rwnd_limited)
tcp_chrono_start(sk, TCP_CHRONO_RWND_LIMITED);
else
tcp_chrono_stop(sk, TCP_CHRONO_RWND_LIMITED);

if (likely(sent_pkts)) {
if (tcp_in_cwnd_reduction(sk))
tp->prr_out += sent_pkts;

/* Send one loss probe per tail loss episode. */
if (push_one != 2)
tcp_schedule_loss_probe(sk, false);
is_cwnd_limited |= (tcp_packets_in_flight(tp) >= tp->snd_cwnd);
tcp_cwnd_validate(sk, is_cwnd_limited);
return false;
}
return !tp->packets_out && tcp_send_head(sk);
}

__tcp_transmit_skb是TCP传输层与IP网络层的分界函数, 在组装完TCP头后, 就调用IP层的接口queue_xmit往网络层继续发送数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148

// tcp_output.c
static int __tcp_transmit_skb(struct sock *sk, struct sk_buff *skb,
int clone_it, gfp_t gfp_mask, u32 rcv_nxt)
{
const struct inet_connection_sock *icsk = inet_csk(sk);
struct inet_sock *inet;
struct tcp_sock *tp;
struct tcp_skb_cb *tcb;
struct tcp_out_options opts;
unsigned int tcp_options_size, tcp_header_size;
struct sk_buff *oskb = NULL;
struct tcp_md5sig_key *md5;
struct tcphdr *th;
int err;

BUG_ON(!skb || !tcp_skb_pcount(skb));
tp = tcp_sk(sk);

if (clone_it) {
TCP_SKB_CB(skb)->tx.in_flight = TCP_SKB_CB(skb)->end_seq
- tp->snd_una;
oskb = skb;
if (unlikely(skb_cloned(skb)))
skb = pskb_copy(skb, gfp_mask);
else
skb = skb_clone(skb, gfp_mask);
if (unlikely(!skb))
return -ENOBUFS;
}
skb->skb_mstamp = tp->tcp_mstamp;

inet = inet_sk(sk);
tcb = TCP_SKB_CB(skb);
memset(&opts, 0, sizeof(opts));

if (unlikely(tcb->tcp_flags & TCPHDR_SYN))
tcp_options_size = tcp_syn_options(sk, skb, &opts, &md5);
else
tcp_options_size = tcp_established_options(sk, skb, &opts,
&md5);
tcp_header_size = tcp_options_size + sizeof(struct tcphdr);

/* if no packet is in qdisc/device queue, then allow XPS to select
* another queue. We can be called from tcp_tsq_handler()
* which holds one reference to sk_wmem_alloc.
*
* TODO: Ideally, in-flight pure ACK packets should not matter here.
* One way to get this would be to set skb->truesize = 2 on them.
*/
skb->ooo_okay = sk_wmem_alloc_get(sk) < SKB_TRUESIZE(1);

/* If we had to use memory reserve to allocate this skb,
* this might cause drops if packet is looped back :
* Other socket might not have SOCK_MEMALLOC.
* Packets not looped back do not care about pfmemalloc.
*/
skb->pfmemalloc = 0;

skb_push(skb, tcp_header_size);
skb_reset_transport_header(skb);

skb_orphan(skb);
skb->sk = sk;
skb->destructor = skb_is_tcp_pure_ack(skb) ? __sock_wfree : tcp_wfree;
skb_set_hash_from_sk(skb, sk);
refcount_add(skb->truesize, &sk->sk_wmem_alloc);

skb_set_dst_pending_confirm(skb, sk->sk_dst_pending_confirm);

/* Build TCP header and checksum it. */
th = (struct tcphdr *)skb->data;
th->source = inet->inet_sport;
th->dest = inet->inet_dport;
th->seq = htonl(tcb->seq);
th->ack_seq = htonl(rcv_nxt);
*(((__be16 *)th) + 6) = htons(((tcp_header_size >> 2) << 12) |
tcb->tcp_flags);

th->check = 0;
th->urg_ptr = 0;

/* The urg_mode check is necessary during a below snd_una win probe */
if (unlikely(tcp_urg_mode(tp) && before(tcb->seq, tp->snd_up))) {
if (before(tp->snd_up, tcb->seq + 0x10000)) {
th->urg_ptr = htons(tp->snd_up - tcb->seq);
th->urg = 1;
} else if (after(tcb->seq + 0xFFFF, tp->snd_nxt)) {
th->urg_ptr = htons(0xFFFF);
th->urg = 1;
}
}

tcp_options_write((__be32 *)(th + 1), tp, &opts);
skb_shinfo(skb)->gso_type = sk->sk_gso_type;
if (likely(!(tcb->tcp_flags & TCPHDR_SYN))) {
th->window = htons(tcp_select_window(sk));
tcp_ecn_send(sk, skb, th, tcp_header_size);
} else {
/* RFC1323: The window in SYN & SYN/ACK segments
* is never scaled.
*/
th->window = htons(min(tp->rcv_wnd, 65535U));
}
....

// 计算checksum
icsk->icsk_af_ops->send_check(sk, skb);

if (likely(tcb->tcp_flags & TCPHDR_ACK))
tcp_event_ack_sent(sk, tcp_skb_pcount(skb), rcv_nxt);

if (skb->len != tcp_header_size) {
tcp_event_data_sent(tp, sk);
tp->data_segs_out += tcp_skb_pcount(skb);
tcp_internal_pacing(sk, skb);
}

if (after(tcb->end_seq, tp->snd_nxt) || tcb->seq == tcb->end_seq)
TCP_ADD_STATS(sock_net(sk), TCP_MIB_OUTSEGS,
tcp_skb_pcount(skb));

tp->segs_out += tcp_skb_pcount(skb);
/* OK, its time to fill skb_shinfo(skb)->gso_{segs|size} */
skb_shinfo(skb)->gso_segs = tcp_skb_pcount(skb);
skb_shinfo(skb)->gso_size = tcp_skb_mss(skb);

/* Our usage of tstamp should remain private */
skb->tstamp = 0;

/* Cleanup our debris for IP stacks */
memset(skb->cb, 0, max(sizeof(struct inet_skb_parm),
sizeof(struct inet6_skb_parm)));

//将当前数据发送给IP网络层
err = icsk->icsk_af_ops->queue_xmit(sk, skb, &inet->cork.fl);

if (unlikely(err > 0)) {
tcp_enter_cwr(sk);
err = net_xmit_eval(err);
}
if (!err && oskb) {
oskb->skb_mstamp = tp->tcp_mstamp;
tcp_rate_skb_sent(sk, oskb);
}
return err;
}

那么, queue_xmit这个接口又是在何时赋值的? 实际在socket创建时初始化TCP部分时, 在tcp_v4_init_sock这个函数传入了网络层的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

// tcp_ipv4.c
static int tcp_v4_init_sock(struct sock *sk)
{
struct inet_connection_sock *icsk = inet_csk(sk);

tcp_init_sock(sk);

icsk->icsk_af_ops = &ipv4_specific;

return 0;
}

const struct inet_connection_sock_af_ops ipv4_specific = {
.queue_xmit = ip_queue_xmit,
.send_check = tcp_v4_send_check,
.rebuild_header = inet_sk_rebuild_header,
.sk_rx_dst_set = inet_sk_rx_dst_set,
.conn_request = tcp_v4_conn_request,
.syn_recv_sock = tcp_v4_syn_recv_sock,
.net_header_len = sizeof(struct iphdr),
.setsockopt = ip_setsockopt,
.getsockopt = ip_getsockopt,
.addr2sockaddr = inet_csk_addr2sockaddr,
.sockaddr_len = sizeof(struct sockaddr_in),
#ifdef CONFIG_COMPAT
.compat_setsockopt = compat_ip_setsockopt,
.compat_getsockopt = compat_ip_getsockopt,
#endif
.mtu_reduced = tcp_v4_mtu_reduced,
};
EXPORT_SYMBOL(ipv4_specific);

IP网络层数据路由与寻址

IP层主要负责数据的路由,寻址, 同时还会对数据报文基于设定的iptables规则进行过滤. 函数ip_queue_xmit首先要做的就是找到需要发送数据的路由出口struct rtable, 如果没有找到路由,则直接返回EHOSTUNREACH的错误; 有了路由信息之后, 接着会构造IP协议头, 然后尝试通过本地的NETFILETER模块ip_local_out对包进行过滤处理. 下图是对应的流程简图:

IP data flow

Linux内核路由实现原理的可以参考Linux路由实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91

// ip_output.c
int ip_queue_xmit(struct sock *sk, struct sk_buff *skb, struct flowi *fl)
{
struct inet_sock *inet = inet_sk(sk);
struct net *net = sock_net(sk);
struct ip_options_rcu *inet_opt;
struct flowi4 *fl4;
struct rtable *rt;
struct iphdr *iph;
int res;

/* Skip all of this if the packet is already routed,
* f.e. by something like SCTP.
*/
rcu_read_lock();
inet_opt = rcu_dereference(inet->inet_opt);
fl4 = &fl->u.ip4;
rt = skb_rtable(skb);
if (rt)
goto packet_routed;

/* Make sure we can route this packet. */
rt = (struct rtable *)__sk_dst_check(sk, 0);
if (!rt) {
__be32 daddr;

/* Use correct destination address if we have options. */
daddr = inet->inet_daddr;
if (inet_opt && inet_opt->opt.srr)
daddr = inet_opt->opt.faddr;

/* If this fails, retransmit mechanism of transport layer will
* keep trying until route appears or the connection times
* itself out.
*/
rt = ip_route_output_ports(net, fl4, sk,
daddr, inet->inet_saddr,
inet->inet_dport,
inet->inet_sport,
sk->sk_protocol,
RT_CONN_FLAGS(sk),
sk->sk_bound_dev_if);
if (IS_ERR(rt))
goto no_route;
sk_setup_caps(sk, &rt->dst);
}
skb_dst_set_noref(skb, &rt->dst);

packet_routed:
if (inet_opt && inet_opt->opt.is_strictroute && rt->rt_uses_gateway)
goto no_route;

/* OK, we know where to send it, allocate and build IP header. */
skb_push(skb, sizeof(struct iphdr) + (inet_opt ? inet_opt->opt.optlen : 0));
skb_reset_network_header(skb);
iph = ip_hdr(skb);
*((__be16 *)iph) = htons((4 << 12) | (5 << 8) | (inet->tos & 0xff));
if (ip_dont_fragment(sk, &rt->dst) && !skb->ignore_df)
iph->frag_off = htons(IP_DF);
else
iph->frag_off = 0;
iph->ttl = ip_select_ttl(inet, &rt->dst);
iph->protocol = sk->sk_protocol;
ip_copy_addrs(iph, fl4);

/* Transport layer set skb->h.foo itself. */

if (inet_opt && inet_opt->opt.optlen) {
iph->ihl += inet_opt->opt.optlen >> 2;
ip_options_build(skb, &inet_opt->opt, inet->inet_daddr, rt, 0);
}

ip_select_ident_segs(net, skb, sk,
skb_shinfo(skb)->gso_segs ?: 1);

/* TODO : should we use skb->sk here instead of sk ? */
skb->priority = sk->sk_priority;
skb->mark = sk->sk_mark;

res = ip_local_out(net, sk, skb);
rcu_read_unlock();
return res;

no_route:
rcu_read_unlock();
IP_INC_STATS(net, IPSTATS_MIB_OUTNOROUTES);
kfree_skb(skb);
return -EHOSTUNREACH;
}

ip_local_out首先要通过LOCAL_OUT的模块进行包过滤, 如果可以通过, 则会调直接通过dst_out将数据发送出去

有关Linux Netfilter框架的介绍可以参考Wiki百科的文章https://en.wikipedia.org/wiki/Netfilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

// ip_output.c
int ip_local_out(struct net *net, struct sock *sk, struct sk_buff *skb)
{
int err;

err = __ip_local_out(net, sk, skb);
if (likely(err == 1))
err = dst_output(net, sk, skb);

return err;
}


int __ip_local_out(struct net *net, struct sock *sk, struct sk_buff *skb)
{
struct iphdr *iph = ip_hdr(skb);

iph->tot_len = htons(skb->len);
ip_send_check(iph);

/* if egress device is enslaved to an L3 master device pass the
* skb to its handler for processing
*/
skb = l3mdev_ip_out(sk, skb);
if (unlikely(!skb))
return 0;

skb->protocol = htons(ETH_P_IP);

// nf_hook如果返回1, 则表示包可以正常通过, 否则被拒绝, 无法发送出去
return nf_hook(NFPROTO_IPV4, NF_INET_LOCAL_OUT,
net, sk, skb, NULL, skb_dst(skb)->dev,
dst_output);
}


dst_output实际调用了skb->dst_entryoutput函数把包发出去而已, 那么, struct dst_entry这个接口又是在何时初始化的? 从struct dst_entry的定义来看, 我们知道这个是表示struct skb_buff的出去的路由信息, 因此可以推断出dst_entry->output这个接口应该是在路由查找的过程中赋值的.

1
2
3
4
5
6
7
8

// dst.h
static inline int dst_output(struct net *net, struct sock *sk, struct sk_buff *skb)
{
return skb_dst(skb)->output(net, sk, skb);
}


继续看下route.c中的rt_dst_alloc函数, 可以看到这个output函数就是在路由初始化的时候根据目标地址的类型来赋值的, 如果是本地的数据, 则赋值为ip_local_deliver; 如果是是广播数据, 则是ip_mc_output, 其他的则为ip_output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

// route.c
struct rtable *rt_dst_alloc(struct net_device *dev,
unsigned int flags, u16 type,
bool nopolicy, bool noxfrm, bool will_cache)
{
struct rtable *rt;

rt = dst_alloc(&ipv4_dst_ops, dev, 1, DST_OBSOLETE_FORCE_CHK,
(will_cache ? 0 : DST_HOST) |
(nopolicy ? DST_NOPOLICY : 0) |
(noxfrm ? DST_NOXFRM : 0));

if (rt) {
rt->rt_genid = rt_genid_ipv4(dev_net(dev));
rt->rt_flags = flags;
rt->rt_type = type;
rt->rt_is_input = 0;
rt->rt_iif = 0;
rt->rt_pmtu = 0;
rt->rt_mtu_locked = 0;
rt->rt_gateway = 0;
rt->rt_uses_gateway = 0;
rt->rt_table_id = 0;
INIT_LIST_HEAD(&rt->rt_uncached);

rt->dst.output = ip_output;
if (flags & RTCF_LOCAL)
rt->dst.input = ip_local_deliver;
}

return rt;
}

ip_output在确定好链路层的协议ETH_P_IP后, 通过NF_HOOK_COND进入POSTROUTING的NETFILETER模块, 如果包可以正常通过过滤, 则会调用ip_finish_output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

// ip_output.c
int ip_output(struct net *net, struct sock *sk, struct sk_buff *skb)
{
struct net_device *dev = skb_dst(skb)->dev;

IP_UPD_PO_STATS(net, IPSTATS_MIB_OUT, skb->len);

skb->dev = dev;
skb->protocol = htons(ETH_P_IP);

return NF_HOOK_COND(NFPROTO_IPV4, NF_INET_POST_ROUTING,
net, sk, skb, NULL, dev,
ip_finish_output,
!(IPCB(skb)->flags & IPSKB_REROUTED));
}

ip_finish_output主要做两个事情:

  • 确认是否有SNAT, 如果有就进行调用dst_output进行处理(SNAT一般在POSTROUTING过滤阶段完成)
  • skb的长度是否超过了当前网络的MTU, 如果是则需要进行分片处理, 否则通过ip_finish_output2将包发送出去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

// ip_output.c
static int ip_finish_output(struct net *net, struct sock *sk, struct sk_buff *skb)
{
unsigned int mtu;
int ret;

ret = BPF_CGROUP_RUN_PROG_INET_EGRESS(sk, skb);
if (ret) {
kfree_skb(skb);
return ret;
}

#if defined(CONFIG_NETFILTER) && defined(CONFIG_XFRM)
/* Policy lookup after SNAT yielded a new policy */
if (skb_dst(skb)->xfrm) {
IPCB(skb)->flags |= IPSKB_REROUTED;
return dst_output(net, sk, skb);
}
#endif
mtu = ip_skb_dst_mtu(sk, skb);
if (skb_is_gso(skb))
return ip_finish_output_gso(net, sk, skb, mtu);

if (skb->len > mtu || (IPCB(skb)->flags & IPSKB_FRAG_PMTU))
return ip_fragment(net, sk, skb, mtu, ip_finish_output2);

return ip_finish_output2(net, sk, skb);
}

ip_finish_output2这里实际已经到了数据链路层了, 其主要的作用是找到数据需要发送的下一个邻居: 简单来说就是要根据下一跳的路由信息找到数据包发送下一个目标节点的MAC地址, 比如默认网关, 路由器等MAC地址; 对于IPv4协议来说, IP地址与MAC的对应关系是通过ARP(Address Resolution Protocol)来实现的; 而IPv6则是通过NDP(Neighbour Discovery Protocol)协议来实现的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

// ip_output.c
static int ip_finish_output2(struct net *net, struct sock *sk, struct sk_buff *skb)
{
struct dst_entry *dst = skb_dst(skb);
struct rtable *rt = (struct rtable *)dst;
struct net_device *dev = dst->dev;
unsigned int hh_len = LL_RESERVED_SPACE(dev);
struct neighbour *neigh;
u32 nexthop;

if (rt->rt_type == RTN_MULTICAST) {
IP_UPD_PO_STATS(net, IPSTATS_MIB_OUTMCAST, skb->len);
} else if (rt->rt_type == RTN_BROADCAST)
IP_UPD_PO_STATS(net, IPSTATS_MIB_OUTBCAST, skb->len);

/* Be paranoid, rather than too clever. */
if (unlikely(skb_headroom(skb) < hh_len && dev->header_ops)) {
struct sk_buff *skb2;

skb2 = skb_realloc_headroom(skb, LL_RESERVED_SPACE(dev));
if (!skb2) {
kfree_skb(skb);
return -ENOMEM;
}
if (skb->sk)
skb_set_owner_w(skb2, skb->sk);
consume_skb(skb);
skb = skb2;
}

if (lwtunnel_xmit_redirect(dst->lwtstate)) {
int res = lwtunnel_xmit(skb);

if (res < 0 || res == LWTUNNEL_XMIT_DONE)
return res;
}

rcu_read_lock_bh();
// 通过nexthop来找到对应的neighbour, 如果没有则创建一个新的, 并发起ARP请求
nexthop = (__force u32) rt_nexthop(rt, ip_hdr(skb)->daddr);
neigh = __ipv4_neigh_lookup_noref(dev, nexthop);
if (unlikely(!neigh))
neigh = __neigh_create(&arp_tbl, &nexthop, dev, false);
if (!IS_ERR(neigh)) {
int res;

sock_confirm_neigh(skb, neigh);
res = neigh_output(neigh, skb);

rcu_read_unlock_bh();
return res;
}
rcu_read_unlock_bh();

kfree_skb(skb);
return -EINVAL;
}

如果struct neighbour已经做了ARP解析, 并且是可用状态, 则通过neigh_hh_output直接把数据发送给对应的网络设备, 如果不可用, 则先要通过n->output执行ARP解析后才能发送. 有关struct neighbour中的接口初始化可以参考arp.c.

1
2
3
4
5
6
7
8
9
10
11
12

// neighbour.h
static inline int neigh_output(struct neighbour *n, struct sk_buff *skb)
{
const struct hh_cache *hh = &n->hh;

if ((n->nud_state & NUD_CONNECTED) && hh->hh_len)
return neigh_hh_output(hh, skb);
else
return n->output(n, skb);
}

函数neigh_hh_output只是把网络设备的MAC地址保存到链路层协议头上, 然后通过dev_queue_xmit把数据发送给驱动层. 至此数据还需要经过队列优先级(QoS)处理, 然后就可以通过驱动发送出去了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

// neighbour.h
static inline int neigh_hh_output(const struct hh_cache *hh, struct sk_buff *skb)
{
unsigned int hh_alen = 0;
unsigned int seq;
unsigned int hh_len;

do {
seq = read_seqbegin(&hh->hh_lock);
hh_len = READ_ONCE(hh->hh_len);
if (likely(hh_len <= HH_DATA_MOD)) {
hh_alen = HH_DATA_MOD;

/* skb_push() would proceed silently if we have room for
* the unaligned size but not for the aligned size:
* check headroom explicitly.
*/
if (likely(skb_headroom(skb) >= HH_DATA_MOD)) {
/* this is inlined by gcc */
memcpy(skb->data - HH_DATA_MOD, hh->hh_data,
HH_DATA_MOD);
}
} else {
hh_alen = HH_DATA_ALIGN(hh_len);

if (likely(skb_headroom(skb) >= hh_alen)) {
memcpy(skb->data - hh_alen, hh->hh_data,
hh_alen);
}
}
} while (read_seqretry(&hh->hh_lock, seq));

if (WARN_ON_ONCE(skb_headroom(skb) < hh_alen)) {
kfree_skb(skb);
return NET_XMIT_DROP;
}

__skb_push(skb, hh_len);
return dev_queue_xmit(skb);
}


最后的旅程: 优先级队列QoS与软中断

发送的数据包在达到网络设备之前, 还需要经过一个优先级队列进行流量控制与调度, 然后再通过触发软中断才会最终发送出去.优先级队列的作用是对发送数据包进行流量整形, 调度, 确保数据包按照设定的流控机制(根据优先级/数据包的标签等信息)发送出去. 有关Linux中的流量控制可以参考:

Linux内核默认使用了pfifo队列来实现流量控制, pfifo类似与fifo(先进先出), 只是在此基础上把队列根据数据包的优先级(这个优先级就是根据IP协议头中的TOS映射过来的)将包分别放到三个优先级队列中, 然后再根据对应的优先级进行出队的操作.

函数dev_queue_xmit实际调用了另外一个函数__dev_queue_xmit进行数据的发送; __dev_queue_xmit首先确认Qdisc是否有定义了入队enqueue函数, 如果有定义则通过__dev_xmit_skb压入队列;如果没有, 则直接发送到网络设备驱动dev_hard_start_xmit.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107

// dev.c
int dev_queue_xmit(struct sk_buff *skb)
{
return __dev_queue_xmit(skb, NULL);
}


static int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
{
struct net_device *dev = skb->dev;
struct netdev_queue *txq;
struct Qdisc *q;
int rc = -ENOMEM;

skb_reset_mac_header(skb);

if (unlikely(skb_shinfo(skb)->tx_flags & SKBTX_SCHED_TSTAMP))
__skb_tstamp_tx(skb, NULL, skb->sk, SCM_TSTAMP_SCHED);

/* Disable soft irqs for various locks below. Also
* stops preemption for RCU.
*/
rcu_read_lock_bh();

// 更新skbuff的优先级(根据cgroup定义的优先级)
skb_update_prio(skb);

qdisc_pkt_len_init(skb);

/* If device/qdisc don't need skb->dst, release it right now while
* its hot in this cpu cache.
*/
if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
skb_dst_drop(skb);
else
skb_dst_force(skb);

txq = netdev_pick_tx(dev, skb, accel_priv);
q = rcu_dereference_bh(txq->qdisc);

trace_net_dev_queue(skb);
if (q->enqueue) {
rc = __dev_xmit_skb(skb, q, dev, txq);
goto out;
}

/* The device has no queue. Common case for software devices:
* loopback, all the sorts of tunnels...

* Really, it is unlikely that netif_tx_lock protection is necessary
* here. (f.e. loopback and IP tunnels are clean ignoring statistics
* counters.)
* However, it is possible, that they rely on protection
* made by us here.

* Check this and shot the lock. It is not prone from deadlocks.
*Either shot noqueue qdisc, it is even simpler 8)
*/
if (dev->flags & IFF_UP) {
int cpu = smp_processor_id(); /* ok because BHs are off */

if (txq->xmit_lock_owner != cpu) {
if (unlikely(__this_cpu_read(xmit_recursion) >
XMIT_RECURSION_LIMIT))
goto recursion_alert;

skb = validate_xmit_skb(skb, dev);
if (!skb)
goto out;

HARD_TX_LOCK(dev, txq, cpu);

if (!netif_xmit_stopped(txq)) {
__this_cpu_inc(xmit_recursion);
skb = dev_hard_start_xmit(skb, dev, txq, &rc);
__this_cpu_dec(xmit_recursion);
if (dev_xmit_complete(rc)) {
HARD_TX_UNLOCK(dev, txq);
goto out;
}
}
HARD_TX_UNLOCK(dev, txq);
net_crit_ratelimited("Virtual device %s asks to queue packet!\n",
dev->name);
} else {
/* Recursion is detected! It is possible,
* unfortunately
*/
recursion_alert:
net_crit_ratelimited("Dead loop on virtual device %s, fix it urgently!\n",
dev->name);
}
}

rc = -ENETDOWN;
rcu_read_unlock_bh();

atomic_long_inc(&dev->tx_dropped);
kfree_skb_list(skb);
return rc;
out:
rcu_read_unlock_bh();
return rc;
}


__dev_xmit_skb根据队列的状态来执行数据的发送:

  • 如果队列处于非激活状态, 比如网卡处于down状态, 则直接丢弃数据
  • 如果当前队列为空且没有在执行状态, 则选择直接发送数据到网卡驱动
  • 否则先将数据入队, 然后执行__qdisc_run, 把数据从队列中一一取出, 触发软中断数据发送流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

// dev.c
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
struct net_device *dev,
struct netdev_queue *txq)
{
spinlock_t *root_lock = qdisc_lock(q);
struct sk_buff *to_free = NULL;
bool contended;
int rc;

qdisc_calculate_pkt_len(skb, q);
/*
* Heuristic to force contended enqueues to serialize on a
* separate lock before trying to get qdisc main lock.
* This permits qdisc->running owner to get the lock more
* often and dequeue packets faster.
*/
contended = qdisc_is_running(q);
if (unlikely(contended))
spin_lock(&q->busylock);

spin_lock(root_lock);
if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
__qdisc_drop(skb, &to_free);
rc = NET_XMIT_DROP;
} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
qdisc_run_begin(q)) {
/*
* This is a work-conserving queue; there are no old skbs
* waiting to be sent out; and the qdisc is not running -
* xmit the skb directly.
*/

qdisc_bstats_update(q, skb);

if (sch_direct_xmit(skb, q, dev, txq, root_lock, true)) {
if (unlikely(contended)) {
spin_unlock(&q->busylock);
contended = false;
}
__qdisc_run(q);
} else
qdisc_run_end(q);

rc = NET_XMIT_SUCCESS;
} else {
rc = q->enqueue(skb, q, &to_free) & NET_XMIT_MASK;
if (qdisc_run_begin(q)) {
if (unlikely(contended)) {
spin_unlock(&q->busylock);
contended = false;
}
__qdisc_run(q);
}
}
spin_unlock(root_lock);
if (unlikely(to_free))
kfree_skb_list(to_free);
if (unlikely(contended))
spin_unlock(&q->busylock);
return rc;
}

函数__qdisc_run需要做的就是不断从优先级队列中取出包, 然后通过sch_direct_xmit直接发送到驱动(这里不再贴出具体代码, 有兴趣的可以参考dev.c/sch_generic.c两个文件的实现. 如果发送的数据达到了设备设定的配额(Linux默认网卡数据发送配额为64), 或者其他CPU被其他优先级高的任务抢占后, 则调用__netif_schedule执行软中断任务.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

// sch_generic.c
void __qdisc_run(struct Qdisc *q)
{
int quota = dev_tx_weight;
int packets;

while (qdisc_restart(q, &packets)) {
/*
* Ordered by possible occurrence: Postpone processing if
* 1. we've exceeded packet quota
* 2. another process needs the CPU;
*/
quota -= packets;
if (quota <= 0 || need_resched()) {
__netif_schedule(q);
break;
}
}

qdisc_run_end(q);
}

__netif_schedule首先检查到优先级队列Qdisc是否处于非调度状态, 然后就通过__netif_reschedule执行软中触发调度:

  • 保存并禁止本地中断local_irq_save
  • 获取当前CPU的struct softnet_data数据, 把需要调度的Qdisc优先级队列放到发送队列末尾
  • 发送一个软中断信号raise_softirq_irqoff(NET_TX_SOFTIRQ), 恢复本地中断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

// dev.c
void __netif_schedule(struct Qdisc *q)
{
if (!test_and_set_bit(__QDISC_STATE_SCHED, &q->state))
__netif_reschedule(q);
}

static void __netif_reschedule(struct Qdisc *q)
{
struct softnet_data *sd;
unsigned long flags;

local_irq_save(flags);
sd = this_cpu_ptr(&softnet_data);
q->next_sched = NULL;
*sd->output_queue_tailp = q;
sd->output_queue_tailp = &q->next_sched;
raise_softirq_irqoff(NET_TX_SOFTIRQ);
local_irq_restore(flags);
}

软中断的初始化在内核启动的时候初始化完成的, 可以参考dev.c; net_tx_action对应发送软中断的处理函数, 其主要做两件事情:

  • 检查发送完成队列completion_queue是否有需要释放的skb_buf, 如果有就释放掉对应的内存
  • 从本地CPUstruct softnet_data中获取对应的发送队列output_queue数据, 不断通过qdisc_run发送到驱动中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

// dev.c
static __latent_entropy void net_tx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);

if (sd->completion_queue) {
struct sk_buff *clist;

local_irq_disable();
clist = sd->completion_queue;
sd->completion_queue = NULL;
local_irq_enable();

while (clist) {
struct sk_buff *skb = clist;

clist = clist->next;

WARN_ON(refcount_read(&skb->users));
if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
trace_consume_skb(skb);
else
trace_kfree_skb(skb, net_tx_action);

if (skb->fclone != SKB_FCLONE_UNAVAILABLE)
__kfree_skb(skb);
else
__kfree_skb_defer(skb);
}

__kfree_skb_flush();
}

if (sd->output_queue) {
struct Qdisc *head;

local_irq_disable();
head = sd->output_queue;
sd->output_queue = NULL;
sd->output_queue_tailp = &sd->output_queue;
local_irq_enable();

while (head) {
struct Qdisc *q = head;
spinlock_t *root_lock;

head = head->next_sched;

root_lock = qdisc_lock(q);
spin_lock(root_lock);
/* We need to make sure head->next_sched is read
* before clearing __QDISC_STATE_SCHED
*/
smp_mb__before_atomic();
clear_bit(__QDISC_STATE_SCHED, &q->state);
qdisc_run(q);
spin_unlock(root_lock);
}
}
}


至此, 数据的发送就完成了. 感兴趣的话,可以选择某个特定设备的驱动看下驱动层是如何处理数据的发送的. 相信跟着Linux源代码流程, 我们对Linux内核协议栈也有了大致的了解, 如果想要深入某个模块进行了解, 读源码的同时结合相关协议的文档, 相信会有更多的收获.

参考文献

原文作者:Jason Wang

更新日期:2022-01-30, 11:10:36

版权声明:本文采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可

CATALOG
  1. 1. 内核协议栈的初始化
  2. 2. 数据发送流程
    1. 2.1. TCP传输层流程
    2. 2.2. IP网络层数据路由与寻址
    3. 2.3. 最后的旅程: 优先级队列QoS与软中断
  3. 3. 参考文献