• 网络缓冲区


            windows下的体系,我不是特别了解。以下所有的内容都是在Linux下的理解,如果不对的地方,评论区欢迎留言。

    Linux收发数据

     接收数据

    大体的流程如上图所示,接下来我们对图中一些名词进行解释。

    -------------------------------------------------------------------------------------------------------------------------

    DMA:

    • 一种计算机系统中的技术,用于在外设和内存之间直接传输数据,而不需要CPU的干预。DMA技术通过在外设和内存之间建立直接的数据传输通路,绕过CPU直接进行数据传输。DMA控制器负责管理数据传输的过程,而CPU可以继续执行其他任务,提高了系统的并发性和效率。

    扩展1:

    • 在传统的计算机系统中,数据的传输通常需要通过CPU来完成。

    扩展2:

    • DMA技术可以应用于多种外设。例如:硬盘、音频设备等,当外设需要进行数据传输时,DMA控制器会向CPU发送请求,CPU将数据传输的任务交给DMA控制器处理。DMA控制器会直接读取或写入数据而不需要CPU干预。数据传输完成后,DMA控制器会向CPU发送中断信号,通知操作完成。

    -------------------------------------------------------------------------------------------------------------------------

    硬中断:

    • 向CPU发起硬中断的作用是通知CPU发生了一个重要的事件或需要CPU的处理。硬中断时由硬件设备触发的中断信号,用于引起CPU的注意并中断当前正在执行的程序。
    • 当硬件设备需要CPU的处理时,它会向CPU发送一个中断请求。CPU会暂停当前正在执行的程序,并跳转至一个预定义的中断处理程序来处理中断。中断处理程序是操作系统或设备驱动程序提供的,用于对中断事件进行处理。

    -------------------------------------------------------------------------------------------------------------------------

    屏蔽硬中断:

    • 处理器屏蔽硬中断是为了确保关键代码的执行不会被中断打断。

    -------------------------------------------------------------------------------------------------------------------------

    软中断

    • CPU发起软中断的作用是触发操作系统中的中断处理程序,从而实现与操作系统进行交互和执行特定的系统功能。

    -------------------------------------------------------------------------------------------------------------------------

    总述:

            网卡是计算机里的一个硬件,专门负责接收和发送网络包,当网卡接收到一个网络包后,会通过DNA技术将网络包写入到指定的内存地址,也就是写入到RingBuffer中,这是一个环形缓冲区。接着网卡向CPU发起硬中断,当CPU收到硬件中断请求后,根据中断注册表,调用已经注册的中断处理函数(中断处理函数:暂时屏蔽硬中断、发起软中断-->通知内核里的ksoftirqd线程进行轮询从ringbuffer中读数据、恢复中断)。ksoftirqd线程从ringbuffer中获取一个数据帧之后保存在sk_buffer中,并且交给网络协议栈进行数据帧处理,经过协议栈处理完后的数据被放到socket读缓冲区中,等待应用程序通过系统调用读取数据。


    发送数据

            这个过程没有好说的,跟接收数据的返过程。需要注意的是TCP保证可靠交付,需要超时重传,需要备份。UDP是不需要的,不需要备份。

    用户层角度对数据包的处理流程

    用户态网络缓冲区

    为什么需要用户态网络缓冲区?(为什么需要为每条连接准备一个发送缓冲区和一个接收缓冲区?)

    • 从业务层面来说,我们可以看到read和处理数据包,组成数据包和write写入之间存在处理速度上的差异,也就是说当需要产生数据的能力比处理数据的能力强时,防止数据丢失,需要将数据未处理的数据进行短暂的存储。(生产者的速度大于消费者的速度是)
    • 从posix api接口(read、write)层面(粘包),不能确定一次性接收数据,也不能保证一次性发送数据。

            由于存在这样的问题,我们需要设计一个缓冲区解决这样的问题...

    不同的网络模型会不会影响用户态网络缓冲区的设计?

            显然不会,上面两个矛盾发生在读取数据之后和处理数据之间,而不同的网络模型是对IO的不同处理方式。两个过程不是在同一阶段。换句话说就是一个是作用于读数据的方式不同,一个是作用于处理数据和读取快慢的问题上,而不是读取方式的问题上。

    UDP和TCP协议是否影响用户态网络缓冲区的设计?

            显然不会,很明显可以看出,UDP和TCP已经完成了将数据包写入socket缓冲区,而上面两个矛盾发生在处理数据和读写数据之间。两个不在通过阶段。换句话说就是UDP和TCP协议都存在上面的两个矛盾。

    用户态网络缓冲区的设计 

       这里的代码不要求完全自己实现,能看懂就行,知道其原理和缺点和优点就可以了。

    • 定长buffer:

            就是一个固定长度的数组,然后增加一个记录当前空闲的起始位置。

            优点:

            结构简单,易于实现。

            缺点:

            需要频繁的腾挪数据。

            需要实现扩缩容机制。

    • ringBuffer:

            就是一个环形队列,我们可以做到更好一点。就是采用2的能次幂大小的队列,这样可以把取余操作转成位操作,提高处理速度。

            具体的情况这里就不做分析了,为了方便大家理解代码,我把各种会出现的情况图罗列出来,不明白的话,评论区见。

            优点:

            不需要频繁的腾挪数据。

            缺点:

            需要实现扩缩容机制,扩缩容时需要腾挪数据。

            造成不连续的空间,可能引发多次系统调用。

    • chainBuffer:

            就是通过队列的将一块块空间连在一起,说白了就是把数组的连续空间变成了链表的方式,然后记录第一个块位置和最后一个块的位置,错觉上看起来像个连续的空间。

            优点:
            不腾挪数据
            动态扩缩容,不腾挪数据

            缺点:

            造成不连续的空间,可能引发多次系统调用

    RingBuffer代码 

    1. #ifndef _ringbuffer_h
    2. #define _ringbuffer_h
    3. #include
    4. #include
    5. #include
    6. // #include // for uint_max
    7. #include
    8. #include
    9. typedef struct ringbuffer_s buffer_t;
    10. buffer_t * buffer_new(uint32_t sz);
    11. uint32_t buffer_len(buffer_t *r);
    12. void buffer_free(buffer_t *r);
    13. int buffer_add(buffer_t *r, const void *data, uint32_t sz);
    14. int buffer_remove(buffer_t *r, void *data, uint32_t sz);
    15. int buffer_drain(buffer_t *r, uint32_t sz);
    16. int buffer_search(buffer_t *r, const char* sep, const int seplen);
    17. uint8_t * buffer_write_atmost(buffer_t *r);
    18. #endif
    1. #include
    2. #include
    3. #include
    4. #include
    5. #include "buffer.h"
    6. struct ringbuffer_s {
    7. uint32_t size;
    8. uint32_t tail;
    9. uint32_t head;
    10. uint8_t * buf;
    11. };
    12. #define min(lth, rth) ((lth)<(rth)?(lth):(rth))
    13. static inline int is_power_of_two(uint32_t num) {
    14. if (num < 2) return 0;
    15. return (num & (num - 1)) == 0;
    16. }
    17. static inline uint32_t roundup_power_of_two(uint32_t num) {
    18. if (num == 0) return 2;
    19. int i = 0;
    20. for (; num != 0; i++)
    21. num >>= 1;
    22. return 1U << i;
    23. }
    24. buffer_t * buffer_new(uint32_t sz) {
    25. if (!is_power_of_two(sz)) sz = roundup_power_of_two(sz);
    26. buffer_t * buf = (buffer_t *)malloc(sizeof(buffer_t) + sz);
    27. if (!buf) {
    28. return NULL;
    29. }
    30. buf->size = sz;
    31. buf->head = buf->tail = 0;
    32. buf->buf = (uint8_t *)(buf + 1);
    33. return buf;
    34. }
    35. void buffer_free(buffer_t *r) {
    36. free(r);
    37. r = NULL;
    38. }
    39. static uint32_t
    40. rb_isempty(buffer_t *r) {
    41. return r->head == r->tail;
    42. }
    43. static uint32_t
    44. rb_isfull(buffer_t *r) {
    45. return r->size == (r->tail - r->head);
    46. }
    47. static uint32_t
    48. rb_len(buffer_t *r) {
    49. return r->tail - r->head;
    50. }
    51. static uint32_t
    52. rb_remain(buffer_t *r) {
    53. return r->size - r->tail + r->head;
    54. }
    55. int buffer_add(buffer_t *r, const void *data, uint32_t sz) {
    56. if (sz > rb_remain(r)) {
    57. return -1;
    58. }
    59. uint32_t i;
    60. i = min(sz, r->size - (r->tail & (r->size - 1)));
    61. memcpy(r->buf + (r->tail & (r->size - 1)), data, i);
    62. memcpy(r->buf, data+i, sz-i);
    63. r->tail += sz;
    64. return 0;
    65. }
    66. int buffer_remove(buffer_t *r, void *data, uint32_t sz) {
    67. assert(!rb_isempty(r));
    68. uint32_t i;
    69. sz = min(sz, r->tail - r->head);
    70. i = min(sz, r->size - (r->head & (r->size - 1)));
    71. memcpy(data, r->buf+(r->head & (r->size - 1)), i);
    72. memcpy(data+i, r->buf, sz-i);
    73. r->head += sz;
    74. return sz;
    75. }
    76. int buffer_drain(buffer_t *r, uint32_t sz) {
    77. if (sz > rb_len(r))
    78. sz = rb_len(r);
    79. r->head += sz;
    80. return sz;
    81. }
    82. // 找 buffer 中 是否包含特殊字符串(界定数据包的)
    83. int buffer_search(buffer_t *r, const char* sep, const int seplen) {
    84. int i;
    85. for (i = 0; i <= rb_len(r)-seplen; i++) {
    86. int pos = (r->head + i) & (r->size - 1);
    87. if (pos + seplen > r->size) {
    88. if (memcmp(r->buf+pos, sep, r->size-pos))
    89. return 0;
    90. if (memcmp(r->buf, sep+r->size-pos, pos+seplen-r->size) == 0) {
    91. return i+seplen;
    92. }
    93. }
    94. if (memcmp(r->buf+pos, sep, seplen) == 0) {
    95. return i+seplen;
    96. }
    97. }
    98. return 0;
    99. }
    100. uint32_t buffer_len(buffer_t *r) {
    101. return rb_len(r);
    102. }
    103. uint8_t * buffer_write_atmost(buffer_t *r) {
    104. uint32_t rpos = r->head & (r->size - 1);
    105. uint32_t wpos = r->tail & (r->size - 1);
    106. if (wpos < rpos) {
    107. uint8_t* temp = (uint8_t *)malloc(r->size * sizeof(uint8_t));
    108. memcpy(temp, r->buf+rpos, r->size - rpos);
    109. memcpy(temp+r->size-rpos, r->buf, wpos);
    110. free(r->buf);
    111. r->buf = temp;
    112. return r->buf;
    113. }
    114. return r->buf + rpos;
    115. }

    chainBuffer代码 

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include "buffer.h"
    6. struct buf_chain_s {
    7. struct buf_chain_s *next;
    8. uint32_t buffer_len;
    9. uint32_t misalign;
    10. uint32_t off;
    11. uint8_t *buffer;
    12. };
    13. struct buffer_s {
    14. buf_chain_t *first;
    15. buf_chain_t *last;
    16. buf_chain_t **last_with_datap;
    17. uint32_t total_len;
    18. uint32_t last_read_pos; // for sep read
    19. };
    20. #define CHAIN_SPACE_LEN(ch) ((ch)->buffer_len - ((ch)->misalign + (ch)->off))
    21. #define MIN_BUFFER_SIZE 1024
    22. #define MAX_TO_COPY_IN_EXPAND 4096
    23. #define BUFFER_CHAIN_MAX_AUTO_SIZE 4096
    24. #define MAX_TO_REALIGN_IN_EXPAND 2048
    25. #define BUFFER_CHAIN_MAX 16*1024*1024 // 16M
    26. #define BUFFER_CHAIN_EXTRA(t, c) (t *)((buf_chain_t *)(c) + 1)
    27. #define BUFFER_CHAIN_SIZE sizeof(buf_chain_t)
    28. uint32_t
    29. buffer_len(buffer_t *buf) {
    30. return buf->total_len;
    31. }
    32. buffer_t *
    33. buffer_new(uint32_t sz) {
    34. (void)sz;
    35. buffer_t * buf = (buffer_t *) malloc(sizeof(buffer_t));
    36. if (!buf) {
    37. return NULL;
    38. }
    39. memset(buf, 0, sizeof(*buf));
    40. buf->last_with_datap = &buf->first;
    41. return buf;
    42. }
    43. static buf_chain_t *
    44. buf_chain_new(uint32_t size) {
    45. buf_chain_t *chain;
    46. uint32_t to_alloc;
    47. if (size > BUFFER_CHAIN_MAX - BUFFER_CHAIN_SIZE)
    48. return (NULL);
    49. size += BUFFER_CHAIN_SIZE;
    50. if (size < BUFFER_CHAIN_MAX / 2) {
    51. to_alloc = MIN_BUFFER_SIZE;
    52. while (to_alloc < size) {
    53. to_alloc <<= 1;
    54. }
    55. } else {
    56. to_alloc = size;
    57. }
    58. if ((chain = malloc(to_alloc)) == NULL)
    59. return (NULL);
    60. memset(chain, 0, BUFFER_CHAIN_SIZE);
    61. chain->buffer_len = to_alloc - BUFFER_CHAIN_SIZE;
    62. chain->buffer = BUFFER_CHAIN_EXTRA(uint8_t, chain);
    63. return (chain);
    64. }
    65. static void
    66. buf_chain_free_all(buf_chain_t *chain) {
    67. buf_chain_t *next;
    68. for (; chain; chain = next) {
    69. next = chain->next;
    70. free(chain);
    71. }
    72. }
    73. void
    74. buffer_free(buffer_t *buf) {
    75. buf_chain_free_all(buf->first);
    76. }
    77. static buf_chain_t **
    78. free_empty_chains(buffer_t *buf) {
    79. buf_chain_t **ch = buf->last_with_datap;
    80. while ((*ch) && (*ch)->off != 0)
    81. ch = &(*ch)->next;
    82. if (*ch) {
    83. buf_chain_free_all(*ch);
    84. *ch = NULL;
    85. }
    86. return ch;
    87. }
    88. static void
    89. buf_chain_insert(buffer_t *buf, buf_chain_t *chain) {
    90. if (*buf->last_with_datap == NULL) {
    91. buf->first = buf->last = chain;
    92. } else {
    93. buf_chain_t **chp;
    94. chp = free_empty_chains(buf);
    95. *chp = chain;
    96. if (chain->off)
    97. buf->last_with_datap = chp;
    98. buf->last = chain;
    99. }
    100. buf->total_len += chain->off;
    101. }
    102. static inline buf_chain_t *
    103. buf_chain_insert_new(buffer_t *buf, uint32_t datlen) {
    104. buf_chain_t *chain;
    105. if ((chain = buf_chain_new(datlen)) == NULL)
    106. return NULL;
    107. buf_chain_insert(buf, chain);
    108. return chain;
    109. }
    110. static int
    111. buf_chain_should_realign(buf_chain_t *chain, uint32_t datlen)
    112. {
    113. return chain->buffer_len - chain->off >= datlen &&
    114. (chain->off < chain->buffer_len / 2) &&
    115. (chain->off <= MAX_TO_REALIGN_IN_EXPAND);
    116. }
    117. static void
    118. buf_chain_align(buf_chain_t *chain) {
    119. memmove(chain->buffer, chain->buffer + chain->misalign, chain->off);
    120. chain->misalign = 0;
    121. }
    122. int buffer_add(buffer_t *buf, const void *data_in, uint32_t datlen) {
    123. buf_chain_t *chain, *tmp;
    124. const uint8_t *data = data_in;
    125. uint32_t remain, to_alloc;
    126. int result = -1;
    127. if (datlen > BUFFER_CHAIN_MAX - buf->total_len) {
    128. goto done;
    129. }
    130. if (*buf->last_with_datap == NULL) {
    131. chain = buf->last;
    132. } else {
    133. chain = *buf->last_with_datap;
    134. }
    135. if (chain == NULL) {
    136. chain = buf_chain_insert_new(buf, datlen);
    137. if (!chain)
    138. goto done;
    139. }
    140. remain = chain->buffer_len - chain->misalign - chain->off;
    141. if (remain >= datlen) {
    142. memcpy(chain->buffer + chain->misalign + chain->off, data, datlen);
    143. chain->off += datlen;
    144. buf->total_len += datlen;
    145. // buf->n_add_for_cb += datlen;
    146. goto out;
    147. } else if (buf_chain_should_realign(chain, datlen)) {
    148. buf_chain_align(chain);
    149. memcpy(chain->buffer + chain->off, data, datlen);
    150. chain->off += datlen;
    151. buf->total_len += datlen;
    152. // buf->n_add_for_cb += datlen;
    153. goto out;
    154. }
    155. to_alloc = chain->buffer_len;
    156. if (to_alloc <= BUFFER_CHAIN_MAX_AUTO_SIZE/2)
    157. to_alloc <<= 1;
    158. if (datlen > to_alloc)
    159. to_alloc = datlen;
    160. tmp = buf_chain_new(to_alloc);
    161. if (tmp == NULL)
    162. goto done;
    163. if (remain) {
    164. memcpy(chain->buffer + chain->misalign + chain->off, data, remain);
    165. chain->off += remain;
    166. buf->total_len += remain;
    167. // buf->n_add_for_cb += remain;
    168. }
    169. data += remain;
    170. datlen -= remain;
    171. memcpy(tmp->buffer, data, datlen);
    172. tmp->off = datlen;
    173. buf_chain_insert(buf, tmp);
    174. // buf->n_add_for_cb += datlen;
    175. out:
    176. result = 0;
    177. done:
    178. return result;
    179. }
    180. static uint32_t
    181. buf_copyout(buffer_t *buf, void *data_out, uint32_t datlen) {
    182. buf_chain_t *chain;
    183. char *data = data_out;
    184. uint32_t nread;
    185. chain = buf->first;
    186. if (datlen > buf->total_len)
    187. datlen = buf->total_len;
    188. if (datlen == 0)
    189. return 0;
    190. nread = datlen;
    191. while (datlen && datlen >= chain->off) {
    192. uint32_t copylen = chain->off;
    193. memcpy(data,
    194. chain->buffer + chain->misalign,
    195. copylen);
    196. data += copylen;
    197. datlen -= copylen;
    198. chain = chain->next;
    199. }
    200. if (datlen) {
    201. memcpy(data, chain->buffer + chain->misalign, datlen);
    202. }
    203. return nread;
    204. }
    205. static inline void
    206. ZERO_CHAIN(buffer_t *dst) {
    207. dst->first = NULL;
    208. dst->last = NULL;
    209. dst->last_with_datap = &(dst)->first;
    210. dst->total_len = 0;
    211. }
    212. int buffer_drain(buffer_t *buf, uint32_t len) {
    213. buf_chain_t *chain, *next;
    214. uint32_t remaining, old_len;
    215. old_len = buf->total_len;
    216. if (old_len == 0)
    217. return 0;
    218. if (len >= old_len) {
    219. len = old_len;
    220. for (chain = buf->first; chain != NULL; chain = next) {
    221. next = chain->next;
    222. free(chain);
    223. }
    224. ZERO_CHAIN(buf);
    225. } else {
    226. buf->total_len -= len;
    227. remaining = len;
    228. for (chain = buf->first; remaining >= chain->off; chain = next) {
    229. next = chain->next;
    230. remaining -= chain->off;
    231. if (chain == *buf->last_with_datap) {
    232. buf->last_with_datap = &buf->first;
    233. }
    234. if (&chain->next == buf->last_with_datap)
    235. buf->last_with_datap = &buf->first;
    236. free(chain);
    237. }
    238. buf->first = chain;
    239. chain->misalign += remaining;
    240. chain->off -= remaining;
    241. }
    242. // buf->n_del_for_cb += len;
    243. return len;
    244. }
    245. int buffer_remove(buffer_t *buf, void *data_out, uint32_t datlen) {
    246. uint32_t n = buf_copyout(buf, data_out, datlen);
    247. if (n > 0) {
    248. if (buffer_drain(buf, n) < 0)
    249. n = -1;
    250. }
    251. return (int)n;
    252. }
    253. static bool
    254. check_sep(buf_chain_t * chain, int from, const char *sep, int seplen) {
    255. for (;;) {
    256. int sz = chain->off - from;
    257. if (sz >= seplen) {
    258. return memcmp(chain->buffer + chain->misalign + from, sep, seplen) == 0;
    259. }
    260. if (sz > 0) {
    261. if (memcmp(chain->buffer + chain->misalign + from, sep, sz)) {
    262. return false;
    263. }
    264. }
    265. chain = chain->next;
    266. sep += sz;
    267. seplen -= sz;
    268. from = 0;
    269. }
    270. }
    271. int buffer_search(buffer_t *buf, const char* sep, const int seplen) {
    272. buf_chain_t *chain;
    273. int i;
    274. chain = buf->first;
    275. if (chain == NULL)
    276. return 0;
    277. int bytes = chain->off;
    278. while (bytes <= buf->last_read_pos) {
    279. chain = chain->next;
    280. if (chain == NULL)
    281. return 0;
    282. bytes += chain->off;
    283. }
    284. bytes -= buf->last_read_pos;
    285. int from = chain->off - bytes;
    286. for (i = buf->last_read_pos; i <= buf->total_len - seplen; i++) {
    287. if (check_sep(chain, from, sep, seplen)) {
    288. buf->last_read_pos = 0;
    289. return i+seplen;
    290. }
    291. ++from;
    292. --bytes;
    293. if (bytes == 0) {
    294. chain = chain->next;
    295. from = 0;
    296. if (chain == NULL)
    297. break;
    298. bytes = chain->off;
    299. }
    300. }
    301. buf->last_read_pos = i;
    302. return 0;
    303. }
    304. uint8_t * buffer_write_atmost(buffer_t *p) {
    305. buf_chain_t *chain, *next, *tmp, *last_with_data;
    306. uint8_t *buffer;
    307. uint32_t remaining;
    308. int removed_last_with_data = 0;
    309. int removed_last_with_datap = 0;
    310. chain = p->first;
    311. uint32_t size = p->total_len;
    312. if (chain->off >= size) {
    313. return chain->buffer + chain->misalign;
    314. }
    315. remaining = size - chain->off;
    316. for (tmp=chain->next; tmp; tmp=tmp->next) {
    317. if (tmp->off >= (size_t)remaining)
    318. break;
    319. remaining -= tmp->off;
    320. }
    321. if (chain->buffer_len - chain->misalign >= (size_t)size) {
    322. /* already have enough space in the first chain */
    323. size_t old_off = chain->off;
    324. buffer = chain->buffer + chain->misalign + chain->off;
    325. tmp = chain;
    326. tmp->off = size;
    327. size -= old_off;
    328. chain = chain->next;
    329. } else {
    330. if ((tmp = buf_chain_new(size)) == NULL) {
    331. return NULL;
    332. }
    333. buffer = tmp->buffer;
    334. tmp->off = size;
    335. p->first = tmp;
    336. }
    337. last_with_data = *p->last_with_datap;
    338. for (; chain != NULL && (size_t)size >= chain->off; chain = next) {
    339. next = chain->next;
    340. if (chain->buffer) {
    341. memcpy(buffer, chain->buffer + chain->misalign, chain->off);
    342. size -= chain->off;
    343. buffer += chain->off;
    344. }
    345. if (chain == last_with_data)
    346. removed_last_with_data = 1;
    347. if (&chain->next == p->last_with_datap)
    348. removed_last_with_datap = 1;
    349. free(chain);
    350. }
    351. if (chain != NULL) {
    352. memcpy(buffer, chain->buffer + chain->misalign, size);
    353. chain->misalign += size;
    354. chain->off -= size;
    355. } else {
    356. p->last = tmp;
    357. }
    358. tmp->next = chain;
    359. if (removed_last_with_data) {
    360. p->last_with_datap = &p->first;
    361. } else if (removed_last_with_datap) {
    362. if (p->first->next && p->first->next->off)
    363. p->last_with_datap = &p->first->next;
    364. else
    365. p->last_with_datap = &p->first;
    366. }
    367. return tmp->buffer + tmp->misalign;
    368. }
  • 相关阅读:
    Linux - 进程
    APP破解去广告
    多环境镜像晋级/复用最佳实践
    springBoot2笔记
    linux驱动34:tasklet小任务机制
    【图与网络数学模型】3.Ford-Fulkerson算法求解网络最大流问题
    【双指针】有效三角形的个数
    群狼调研(长沙学校满意度调查)开展长沙游客满意度调查
    Golang 中的静态类型和动态类型
    jar包加入本地maven仓库
  • 原文地址:https://blog.csdn.net/weixin_46120107/article/details/134236107