矢量包处理(VPP) 一. 简介 VPP是专门为极限性能 和灵活网络功能 而生的用户态软件转发 引擎.
本文介绍它的两个核心设计思路
当然还有一些其他的设计技巧,本文暂不细说
内存和缓存优化(自己管理内存,NUMA近核内存)
无锁化设计避免多线程锁竞争
二. 设计思路 2.1 批量处理
VPP helps FD.io push extreme limits of performance and scale. Independent testing shows that, at scale, VPP-powered FD.io is two orders of magnitude faster than currently available technologies.
批量处理是VPP能够在高负载情况下,比现有技术速度提升两个数量级的关键.
一个粗略的日常类比是考虑一堆木材的问题,每块木材都需要切割、打磨和钻孔。你一次只能拿着一种工具(类似于指令缓存)。如果你先拿起锯子完成所有的切割工作,然后拿起砂光机完成所有的打磨工作,最后拿起钻头完成所有的钻孔工作,你就能更快地完成木材的切割、打磨和钻孔。如果按顺序为每块木材拿起每个工具,速度就会慢得多。
在传统linux网络协议栈中,一个数据包从触发中断,到__netif_receive_skb
,到ip_rcv
,到ip_forward
等等,会经历一个完整的处理流程后,才会继续处理下一个数据包。如果网络环境复杂时,这期间的函数调用链极为复杂,但很多情况下,下一个数据包的处理路径也极为相似。
且,当代码路径长度超过微处理器指令缓存(I-cache)的大小时,由于微处理器不断加载新指令,就会发生指令抖动。在这种模型下,每个数据包都会引发一组相同的I-cache未命中。
1 2 3 4 +---> fooA(packet1) +---> fooB(packet1) +---> fooC(packet1) +---> fooA(packet2) +---> fooB(packet2) +---> fooC(packet2) ... +---> fooA(packet3) +---> fooB(packet3) +---> fooC(packet3)
VPP利用了这一特性,一次处理多个数据包,此时,第一个数据包会预热指令缓存,因此剩余的数据包的处理性能往往会达到极致。
1 2 3 4 +---> fooA([packet1, +---> fooB([packet1, +---> fooC([packet1, +---> packet2, packet2, packet2, ... ... ... packet256]) packet256]) packet256])
2.2 图节点调度 个人认为,成熟的图节点调度框架使得开发人员可以轻易地在两个业务节点之间插入定制化的处理,每两个图节点间都可以添加广义上的HOOK节点。
惯常的业务处理逻辑是通过逻辑判断
加函数嵌套
完成的。要完成一个业务逻辑,会从顶层的入口函数,一直调用内部的函数,直到返回。这使得在处理复杂业务时,调用栈会极深,有使用过trace工具的同学一定很有画面。
1 2 3 4 5 6 7 8 generate { dispatch { if (A) func1(); if (B) func2(); } }
VPP使用图节点,将每一个业务抽象为一个节点,将业务逻辑抽象为有向边。这样,在一个业务处理过程中,不再使用函数嵌套的方法来执行子业务。而是通过节点之前跳转的方式,极大的减小调用栈深度,同时又使得业务逻辑变得极为清晰(之所以清晰,是因为在每个节点中,都显式地指明了next节点的名字。这种方法太牛了,我感觉应当扩展到其他应用软件)。
1 2 3 4 5 |------>[func1 node] | [generate node] --->[dispatch node] --->| | |------>[func2 node]
请注意,这每一个node在都运行在图节点调度函数dispatch_node()
下,属于并列关系,没有嵌套调用关系,这一点我会在后续的小demo程序中做更为形象的展示。
1 2 3 4 5 dispatch_node { /* generate(); dispatch(); func1(); func2();*/ node->function(); }
每一个节点可以看作一个小线程(并不真是),在节点执行期间,会依据业务情况,明确图节点调度框架应该调用的下一个节点。这一点我会在后续的demo程序中详细展开。
三.源码分析 本文旨在详解批处理数据包 和图节点调度 ,过高或过低层级的细节,这里暂不提及。
我们可以先假设VPP是按照标量包处理逻辑(每次只处理一个数据包)来在有向图之间进行节点调度,理解了以后,把每个节点入参扩展为一个向量(包含多个数据包),就可以很快理解VPP的核心设计思路了。
3.1 demo业务逻辑 为了简便理解,不妨丢掉复杂的网络协议栈处理业务,把场景简单化,我们不再处理网络数据包,而是处理一个个的字符串,每个字符串中的内容为buffer %d
,其中%d是这个buffer的唯一标识。
设计要求: 我需要每10s生成一个或多个字符串 依据字符串的唯一标识按照奇偶分类 奇数标识字符串进入逻辑func1;偶数标识字符串进入逻辑func2
1 2 3 4 5 |-----奇数--->[func1] | [生成buffer] --->[奇偶分类] --->| | |-----偶数--->[func2]
这个demo逻辑简单,其实则也对应了接收(拷贝)数据包,解析数据包,处理数据包等复杂了网络协议栈动作。
3.2 图节点调度 1. 孤立节点 我们首先需要创建四个节点,分别为[生成buffer]、[奇偶分类]、[奇数处理]、[偶数处理].
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 VLIB_REGISTER_NODE (gen_buffer_node,static ) = { .function = gen_buffer, .name = "gen-buffer" , }; VLIB_REGISTER_NODE (my_dispatch_node) = { .function = my_dispatch, .name = "my-dispatch" , }; VLIB_REGISTER_NODE (func1) = { .function = nodex_fn, .name = "func1" , }; VLIB_REGISTER_NODE (node2) = { .function = nodex_fn, .name = "func2" , };
此时,节点会在vpp初始化时,注册到图节点调度器中,他们此时是孤立的四个节点:
1 [gen-buffer] [my-dispatch] [func1] [func2]
2. 业务为边 在本节首部,我们介绍了demo程序想要做什么。根据这个业务逻辑,可以知道,我们希望生成buffer后选中my-dispatch节点,my-disaptch根据奇数偶数进行分发,如果是奇数,则选中func1节点,如果是偶数,选中func2节点。
所以,我们可以以业务为边,画出这个有向图了:
1 2 3 4 5 |-----奇数--->[func1] | [gen-buffer] --->[my-dispatch] --->| | |-----偶数--->[func2]
可以看到,gen-buffer和my-dispatch两个节点是有next node的。且my-dispatch有两个next node。
在VPP中注册节点时,我们可以使用next_nodes注册这种业务调用关系。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 VLIB_REGISTER_NODE (gen_buffer_node,static ) = { .function = gen_buffer, .name = "gen-buffer" , .next_nodes = { [0 ] = "my-dispatch" , }, }; VLIB_REGISTER_NODE (my_dispatch_node) = { .function = my_dispatch, .name = "my-dispatch" , .next_nodes = { [0 ] = "func1" , [1 ] = "func2" , }, };
3. 驱动进程 我将gen-buffer节点类型配置为VLIB_NODE_TYPE_PROCESS
,简单解释,这个节点将被注册为PROCESS类型,可以把VPP理解为操作系统,这个gen-buffer节点理解为一个会被尽可能频繁调用的一个线程,我会在这个节点的funcion回调中每10s生成一个/多个buffer。并将这个buffer推送进入图节点调度器。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static uword gen_buffer (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f) { int i = 0 ; while (1 ) { vlib_frame_t *nf = GRN_BUFFER(i++); vlib_put_frame_to_node (vm, my_dispatch_node.index, nf); vlib_process_suspend (vm, 10e0 ); } return i; }
4. 待办队列 上一节提到,gen-buffer节点将生成的buffer推送进入了图节点调度器。似乎有点抽象,实际中,是把下一个节点和下一个节点的入参做关联,把他们注册到待办队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 typedef struct { vlib_frame_t *frame; u32 node_runtime_index; u32 next_frame_index; #define VLIB_PENDING_FRAME_NO_NEXT_FRAME ((u32) ~0) } vlib_pending_frame_t ; void vlib_put_frame_to_node (vlib_main_t * vm, u32 to_node_index, vlib_frame_t * f) { vlib_pending_frame_t *p; vlib_node_t *to_node; to_node = vlib_get_node (vm, to_node_index); p->frame = f; p->node_runtime_index = to_node->runtime_index; vec_add2 (vm->node_main.pending_frames, p, 1 ); }
frame:可以先理解为下一个node的入参(此例中存放着生成的buffer)
node_runtime_index:节点的运行时状态索引,可以快速找到节点的业务函数
5. 执行待办 在主循环vlib_main_or_worker_loop()
中,每次循环,都会检查待办队列,如果有节点,则执行dispatch_node()
1 2 3 4 5 6 7 8 vlib_main_or_worker_loop() { while (1 ) { for (i = 0 ; i < _vec_len (nm->pending_frames); i++) cpu_time_now = dispatch_pending_node (vm, i, cpu_time_now); } }
在dispatch_node()
中,就会调用我们在注册节点时,注册的function。
1 2 3 4 5 6 7 8 9 10 11 12 dispatch_node () { n = node->function (vm, node, frame); }
而在我们的例子中,就会执行my_dispatch()
。
所以这个图节点调度框架只是不停地在执行待办队列中的节点回调函数。调度的任务其实是在我们的回调函数中,依据业务来指定下一个节点。
在这个过程中,执行回调其实简单,但是如何通过待办来找到节点的回调函数值的一看,也就说,如何通过vlib_panding_frame.node_runtime_index,快速找到节点的运行时状态vlib_node_runtime_t 。
6. 节点注册 本节将更细节的分析我们注册的节点在VPP中的存在形式。
回到我们注册节点时的动作:
1 2 3 4 5 6 7 8 9 VLIB_REGISTER_NODE (gen_buffer_node,static ) = { .function = gen_buffer, .type = VLIB_NODE_TYPE_PROCESS, .name = "gen-buffer" , .n_next_nodes = 1 , .next_nodes = { [0 ] = "my-dispatch" , }, };
这个宏定义展开还挺长,我截取一部分,它所作的事情就是创建一个static vlib_node_registration_t gen_buffer_node
,对它进行初始化,并加入到vgm->node_registerations中,可以看出,通过这种注册方式,vgm->node_registerations这个链表中会保存着所有节点的注册信息。
1 2 3 4 5 6 7 8 static vlib_node_registration_t gen_buffer_node;static void __vlib_add_node_registration_gen_buffer_node (void ) __attribute__ ((__constructor__)) { vlib_global_main_t *vgm = vlib_get_global_main (); gen_buffer_node.next_registration = vgm->node_registrations; vgm->node_registrations = &gen_buffer_node; }
在VPP初始化阶段,vlib_register_all_static_nodes()
是,会遍历这个链表,也就是说每个节点都会执行vlib_register_node()
;
在这个函数vlib_register_node()
中,会创建vlib_node_t类型的结构体来存储每个节点的信息,并且为每个节点分配单独的idnex。这些节点会存储在vm->node_main.nodes
vector中。而这些vlib_node_t实例的index,也会依据节点的名字,存在vm->node_main.node_by_name
哈希表中。
7. 节点运行时状态 在vlib_register_node()
中,除了为vlib_node_t进行实例化外,还会为每个node创建一个运行时状态的实例vlib_node_runtime_t
。这个运行时状态存储在vm->node_main.nodes_by_types[n->type]中。
同时,这个运行时状态的偏移(或者叫下标)也会保存在n->runtime_index中。以便快速通过node找到相关的运行时状态。
1 2 3 4 5 6 vlib_node_runtime_t *rt;vec_add2_aligned (nm->nodes_by_type[n->type], rt, 1 , CLIB_CACHE_LINE_BYTES); n->runtime_index = rt - nm->nodes_by_type[n->type];
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 typedef struct vlib_node_runtime_t { vlib_node_function_t *function; u32 next_frame_index; vlib_node_state_t state; vlib_node_dispatch_reason_t dispatch_reason; u16 cached_next_index; } vlib_node_runtime_t ;
在节点调度期间,会更新运行时状态,还会缓存下一个调度节点的index。
8. next_frame_t 在gen-buffer节点中,我们使用vlib_put_frame_to_node (vm, my_dispatch_node.index, nf);
指明下一个应当被调度的节点my-dispatch
。本节会从my-dispatch节点开始,介绍另一种指定下一节点的方法vlib_put_next_frame()
。
我们回顾下my-dispatch
节点是怎么进行注册的:
1 2 3 4 5 6 7 8 9 10 VLIB_REGISTER_NODE (my_dispatch_node, static ) = { .function = my_dispatch, .name = "my-dispatch" , .vector_size = sizeof (u32), .n_next_nodes = 2 , .next_nodes = { [0 ] = "func1" , [1 ] = "func2" , }, };
我们使用next_nodes记录了my-dispatch
节点所有可能的下一个节点。就可以使用0/1来指明下一个应当被调度的节点:
1 2 3 4 5 vlib_put_next_frame (vm, rt, 0 , n_left_to_next_node1); vlib_put_next_frame (vm, rt, 1 , n_left_to_next_node2);
vlib_put_frame_to_node (vm, my_dispatch_node.index, nf);
的第三个参数nf,暂时可以理解为下一个节点的入参(存储buffer)。但是在vlib_put_next_frame()
中,并没有显式地把下一个节点所需要处理的入参传入。
此时,就要引入另一个关键的结构体vlib_next_frame_t
:
1 2 3 4 5 6 7 8 9 10 11 12 13 typedef struct { vlib_frame_t *frame; u32 node_runtime_index; u32 flags; } vlib_next_frame_t ;
这个结构体中的frame字段,就存储着下一个节点的入参起始地址。
所以在vlib_put_next_frame()
中,会通过nf = vlib_node_runtime_get_next_frame (vm, r, next_index);
来找到入参,然后继续构造pending帧,加入待办队列。
3.3 批量处理 在了解了各个节点是如何进行调度的条件下(依据业务逻辑,指明下一节点,并把下一节点加入待办队列)。我们继续分析VPP是怎么做到在一个节点中(业务环节)同时处理多个数据包的。
其实,说来也简单,在linux网络协议栈中,一个业务函数通常只处理一个skb,例如int ip_forward(struct sk_buffer *skb)
,但是如果把入参扩展为一个skb数组,是不是就可以达成在一个调用栈中处理多个数据包?int ip_forward(struct sk_buffer **vecter)
。
VPP就是这么做的。但是考虑通用的业务逻辑,VPP为一个矢量入参创建了一个抽象度较高的结构体vlib_frame_t
。
1. 节点回调函数 我们重新观察一下节点回调函数:
1 2 3 4 5 6 7 8 9 10 11 typedef uword (vlib_node_function_t ) (struct vlib_main_t * vm, struct vlib_node_runtime_t * node, struct vlib_frame_t * frame) ;typedef struct _vlib_node_registration { vlib_node_function_t *function; }
vlib_node_runtime_t *node我们在上一节已经介绍过了,其中包含节点运行时状态,例如回调函数、被调度的原因、下一个节点的缓存,当前节点的各种信息等。
这一节,我们重点关注第三个入参vlib_frame_t *frame
。目前,我们可以把它理解为类似struct sk_buffer **vector
这种,指向多个数据包的首地址信息即可。
2. 节点帧 每个函数调用栈,都有自己的栈帧,其中栈帧记录返回地址,入参等。
在VPP中,每个节点也有类似的对象,这里简称为节点帧。在这个结构中,描述了一个节点的入参信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 typedef struct vlib_frame_t { u16 frame_flags; u16 flags; u16 scalar_offset, vector_offset, aux_offset; u16 n_vectors; u16 frame_size_index; u8 arguments[0 ]; } vlib_frame_t ;
说来,我们总能把入参划分为两种状态,一个是固定大小的标量型入参(只会存在一份),另一个是可以有多份的矢量型入参(例如,多个数据包)。
我们总能预先分配一段连续内存,以vlib_frame_t
为标头,紧接着存放标量型入参,再存放矢量型入参。
而vlib_frame_t
就是记录这一段连续内存的标量入参的偏移、矢量入参的偏移、矢量个数的一个结构体。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 +----------------------------------------------------------+ | vlib_frame_t 头部固定字段 (frame_flags, flags, ...) | +----------------------------------------------------------+ | arguments[] 数组区域 | | | | +-----------------------------------------------+ | | | scalar 参数们(例如:下一个节点的特定控制信息) | <--- scalar_offset | +-----------------------------------------------+ | | | vector 参数们(例如:packet index 列表) | <--- vector_offset | | - pkt_index_0 | | | | - pkt_index_1 | | | | - ... | | | | - pkt_index_(n_vectors-1) | | | +-----------------------------------------------+ | | | +----------------------------------------------------------+ 说明: - n_vectors = vector参数中有效元素的个数
3. 节点帧大小 我们又需要回到注册节点时的那个函数vlib_register_node()
此节,我们重点关注n->frame_size
1 2 3 4 5 6 7 n->frame_size = size = round_pow2 (size, CLIB_CACHE_LINE_BYTES); vlib_frame_size_t *fs = 0 ;vec_add2 (nm->frame_sizes, fs, 1 ); fs->frame_size = size; n->frame_size_index = fs - nm->frame_sizes;
在注册节点时,会依据标量结构体大小,矢量结构体大小*VLIB_FRAME_SIZE(256),计算出一个最终的入参大小,然后创建一个节点帧大小的实例,注册到vm->node_main.frame_sizes中,节点可以通过n->frame_size_index来快速访问这个节点的帧大小。请注意,此时并没有真正为这个节点分配fs->frame_size大小的空间,只是为了届时创建帧入参时方便,注册了一个帧大小的管理实例。
4. 创建节点入参 在我们的gen-buffer节点中,通过调用vlib_frame_t *nf = vlib_get_frame_to_node (vm, my_dispatch_node.index);
来创建了一个入参实例。
其实不难想象,这个函数中会在帧大小列表中,查找my-dispatch节点需要的帧大小,然后创建一块连续内存,并拷贝标量偏移、矢量偏移,最后返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 static vlib_frame_t *vlib_frame_alloc_to_node (vlib_main_t * vm, u32 to_node_index, u32 frame_flags) { vlib_node_main_t *nm = &vm->node_main; vlib_frame_size_t *fs; vlib_node_t *to_node; vlib_frame_t *f; u32 n; to_node = vlib_get_node (vm, to_node_index); fs = vec_elt_at_index (nm->frame_sizes, to_node->frame_size_index); n = fs->frame_size; f = clib_mem_alloc_aligned_no_fail (n, CLIB_CACHE_LINE_BYTES); f->n_vectors = 0 ; f->scalar_offset = to_node->scalar_offset; f->vector_offset = to_node->vector_offset; f->frame_size_index = to_node->frame_size_index; return f; }
5. 节点帧的所有权 这里有一个细节,我们在gen-buffer
节点中创建的帧的所有权是下一个节点my-dispatch
的。
这也就解释了,为什么我们在图节点调度中,似乎并没有显式地将当前节点的处理结果传递给下一个节点作为入参。原因就是,我们从一开始,就是在下一个节点的入参的内存空间中做业务处理 。
四. demo源码 为了简便理解,不妨丢掉复杂的网络协议栈处理业务,把场景简单化,我们不再处理网络数据包,而出处理一个个的字符串,每个字符串中的内容为buffer %d
,其中%d是这个buffer的唯一标识。
设计要求: 我需要每10s生成一个或多个字符串 依据字符串的唯一标识按照奇偶分类 奇数标识字符串进入逻辑func1;偶数标识字符串进入逻辑func2
1 2 3 4 5 |-----奇数--->[func1] | [生成buffer] --->[奇偶分类] --->| | |-----偶数--->[func2]
这个demo逻辑简单,其实则也对应了接收(拷贝)数据包,解析数据包,处理数据包等复杂了网络协议栈动作。
4.1 源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 static uwordnodex_fn (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f) { vlib_node_t *node = vlib_get_node (vm, rt->node_index); u32 *from = vlib_frame_vector_args (f); u32 n_left_from = f->n_vectors; u32 pkts_processed = 0 ; while (n_left_from > 0 ) { u32 bi0 = from[0 ]; from += 1 ; n_left_from -= 1 ; vlib_buffer_t *b0 = vlib_get_buffer (vm, bi0); vlib_cli_output (vm, "%s processing buffer %s" , node->name, b0->data); pkts_processed++; } return pkts_processed; } VLIB_REGISTER_NODE (func1) = { .function = nodex_fn, .name = "func-1" , .vector_size = sizeof (u32), .format_trace = NULL , .type = VLIB_NODE_TYPE_INTERNAL, }; VLIB_REGISTER_NODE (func2) = { .function = nodex_fn, .name = "func-2" , .vector_size = sizeof (u32), .format_trace = NULL , .type = VLIB_NODE_TYPE_INTERNAL, }; static uwordmy_dispatch (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f) { u32 *from = vlib_frame_vector_args (f); u32 n_left_from = f->n_vectors; u32 *to_next_node1, *to_next_node2; u32 n_left_to_next_node1, n_left_to_next_node2; vlib_cli_output (vm, "Dispatching %d buffers" , n_left_from); vlib_get_next_frame (vm, rt, 0 , to_next_node1, n_left_to_next_node1); vlib_get_next_frame (vm, rt, 1 , to_next_node2, n_left_to_next_node2); while (n_left_from > 0 ) { u32 bi0 = from[0 ]; from += 1 ; n_left_from -= 1 ; if (bi0 % 2 == 0 ) { to_next_node2[0 ] = bi0; to_next_node2 += 1 ; n_left_to_next_node2 -= 1 ; } else { to_next_node1[0 ] = bi0; to_next_node1 += 1 ; n_left_to_next_node1 -= 1 ; } } vlib_put_next_frame (vm, rt, 0 , n_left_to_next_node1); vlib_put_next_frame (vm, rt, 1 , n_left_to_next_node2); return f->n_vectors; } VLIB_REGISTER_NODE (my_dispatch_node, static ) = { .function = my_dispatch, .name = "my-dispatch" , .vector_size = sizeof (u32), .n_next_nodes = 2 , .next_nodes = { [0 ] = "func-1" , [1 ] = "func-2" , }, }; static uwordgen_buffer (vlib_main_t *vm, vlib_node_runtime_t *rt, vlib_frame_t *f) { vlib_node_t *node; u32 i = 0 ; char tmpbuf[64 ]; node = vlib_get_node (vm, rt->node_index); vlib_cli_output (vm, "%v: call frame %p" , node->name, f); while (1 ) { vlib_buffer_t *b[4 ]; u32 bi[4 ]; u32 allocated = vlib_buffer_alloc (vm, bi, 4 ); if (allocated != 4 ) { vlib_cli_output (vm, "Buffer allocation failed" ); break ; } for (u32 j = 0 ; j < 4 ; j++) { b[j] = vlib_get_buffer (vm, bi[j]); snprintf (tmpbuf, sizeof (tmpbuf), "buffer %d" , i + j); clib_memcpy_fast (b[j]->data, tmpbuf, sizeof (tmpbuf)); } vlib_frame_t *nf = vlib_get_frame_to_node (vm, my_dispatch_node.index); u32 *to_next = vlib_frame_vector_args (nf); for (u32 j = 0 ; j < 4 ; j++) { to_next[j] = bi[j]; } nf->n_vectors = 4 ; vlib_put_frame_to_node (vm, my_dispatch_node.index, nf); vlib_cli_output (vm, "%v: Buffer %d-%d sent" , node->name, i, i + 3 ); i += 4 ; vlib_process_suspend (vm, 10e0 ); } vlib_cli_output (vm, "%v: return frame %p" , node->name, f); return i; } VLIB_REGISTER_NODE (gen_buffer_node,static ) = { .function = gen_buffer, .type = VLIB_NODE_TYPE_PROCESS, .name = "gen-buffer" , .n_next_nodes = 1 , .next_nodes = { [0 ] = "my-dispatch" , }, };
4.2 运行log 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 gen-buffer: call frame 0x0 gen-buffer: Buffer 0-3 sent Dispatching 4 buffers func-1 processing buffer buffer 1 func-1 processing buffer buffer 3 func-2 processing buffer buffer 0 func-2 processing buffer buffer 2 gen-buffer: Buffer 4-7 sent Dispatching 4 buffers func-1 processing buffer buffer 5 func-1 processing buffer buffer 7 func-2 processing buffer buffer 4 func-2 processing buffer buffer 6 gen-buffer: Buffer 8-11 sent Dispatching 4 buffers func-1 processing buffer buffer 9 func-1 processing buffer buffer 11 func-2 processing buffer buffer 8 func-2 processing buffer buffer 10
五. vnet 我们的测试程序就只涉及四个节点,下图是VPP网络协议栈的所有节点
1 2 3 4 # 以下命令行可以导出有向图 DBGvpp# show vlib graphviz file vlib vlib graph dumped into `/tmp/vlib'. Run eg. `fdp -Tsvg -O /tmp/vlib'. DBGvpp#