IO倒退历程1

从同步IO谈起

acceptforksocketforkaccept
readwrite

异步IO降临

​ 所谓异步IO,就是发动IO申请的线程不等IO操作实现,就继续执行随后的代码,IO后果用其余形式告诉发动IO申请的程序。

O_NONBLOCKreadwriteEAGAIN
selectpollepollselectpollepoll
EventLoopstd::function

协程退场

​ 有种说法是协程是一种轻量级的线程,这是针对它不须要操作系统调度染指、开销小而言,然而我感觉不够精确。实质上,协程就是一个能够暂停的函数,遇到阻塞的时候,就让出cpu,当条件满足了就持续复原运行。

​ 从另一个角度来说,协程和过程、线程的区别就是两点 – 是否隔离变量,是否主动切换运行上下文。满足这两点区别的,就是协程。

  • 过程:变量隔离,主动切换运行上下文
  • 线程:不变量隔离,主动切换运行上下文切换
  • 协程:不变量隔离,不主动切换运行上下文切换

在不隔离变量的状况下,主动切换运行上下文,几乎就是 bug 之源,这两件事基本就不该同时反对。当前线程这个货色,应该被扫进历史的垃圾堆,线程带来的便当,比起它挖出的坑相比,有余一提。4

​ 之所以在协程的总结小文中上来就谈到这么多IO相干内容,是因为C++中的协程和IO有很大关系。协程实用于IO密集型利用,而非CPU密集型利用。协程的 暂停期待条件满足后复原执行 的特点人造适宜IO。当文件描述符不可读/不可写时,让出cpu;可读/可写后被复原执行,持续解决相应逻辑。让出cpu的动作与同步IO中过程/线程被阻塞而挂起相似,然而防止了过程/线程调度开销,而保留了同步IO的同步开发方式。这正是协程的弱小之处: 并发 + 同步5


协程分类6

协程能够依照以下 3 个角度进行分类

  • 对称/不对称:前者提供让出执行权的原语,后者提供唤起协程和把执行权转交给调用者的原语
  • 第一类/受限:协程是不是第一类对象
  • 有栈/无栈:协程能在调用栈的任意处挂起,还是只能在协程的函数体内挂起

这里看第三个分类的区别:

此处「有栈」和「无栈」的含意不是指协程在运行时是否须要栈,对于大多数语言来说,一个函数调用另一个函数,总是存在调用栈的;

而是指协程是否能够在其任意嵌套函数中被挂起,此处的嵌套函数能够了解为子函数、匿名函数等。显然有栈协程是能够的,而无栈协程则不能够。

它们的次要区别是:有栈协程能在调用栈的任意处挂起,无栈协程只能在协程的函数体内挂起

有栈协程

很多中央又把协程称为 subroutine,subroutine 是什么,就是函数。上古期间的计算机科学家们早就给出了概念,coroutine 就是能够中断并复原执行的 subroutine,从这个角度来看协程领有调用栈并不是一个奇怪的事件。

yield/resume

也就是说把协程当做一个非凡的函数调用,有栈协程就是咱们现实中协程该有的模样。

既然把其当做一个非凡的函数调用,对咱们来说最严厉的挑战就是如何像切换函数一样去切换协程,难点在于除了像函数一样切换进来,还要在某种条件满足的时候切换回来,咱们的做法能够是 在协程外部存储本身的上下文,并在须要切换的时候把上下文切换就能够了。

Go 有栈协程

Go 语言的呈现提供了一种新的思路。

Go 语言的协程则相当于提供了一种很低成本的相似于多线程的执行体。

在Go 语言中,协程的实现与操作系统多线程十分类似。操作系统个别应用抢占的形式来调度零碎中的多线程(即主动调配调度),而 Go 语言中,依靠于操作系统的多线程,在运行时刻库中实现了一个合作式的调度器。

这里的调度真正实现了上下文的切换,简略地说,Go 零碎调用执行时,调度器可能会保留以后执行协程的上下文到堆栈中。而后将以后协程设置为睡眠,转而执行其余的协程。

这里须要留神,所谓的 Go 零碎调用并不是真正的操作系统的零碎调用,而是 Go 运行时刻库提供的对底层操作系统调用的一个封装。

举例说明:Socket recv。咱们晓得这是一个零碎调用,Go 的运行时刻库也提供了简直截然不同的调用形式,但这只是建设在 epoll 之上的模仿层,底层的 socket 是工作在非阻塞的形式,而模仿层提供给咱们了看上去是阻塞模式的 socket。读写这个模仿的 socket 会进入调度器,最终导致协程切换。

目前 Go 调度器实现在用户空间,实质上是一种合作式的调度器。这也是为什么如果写了一个死循环在协程里,则协程永远没有机会被换出,一个 Processor 相当于就被节约掉了。

有栈的协程和操作系统多线程是很类似的。思考以下伪代码:

Go
func routine() int
{
    var a = 5
    sleep(1000)
    a += 1
    return a
}

sleep 调用时,会产生上下文的切换,以后的执行体被挂起,直到约定的工夫再被唤醒。

局部变量 a 在切换时会被保留在栈中,切换回来后从栈中复原,从而得以持续运行。

所谓有栈就是指执行体自身的栈。每次创立一个协程,须要为它调配栈空间。到底调配多大的栈的空间是一个技术活。分的多了,节约,分的少了,可能会溢出。Go在这里实现了一个协程栈扩容的机制,绝对比拟优雅的解决了这个问题。

无栈协程

相比于有栈协程间接切换栈帧的思路,无栈协程在不扭转函数调用栈的状况下,采纳相似生成器(generator)的思路实现了上下文切换。

那么所谓的无栈协程是什么呢?其实无栈协程的实质就是一个状态机(state machine),能够看做是有状态的函数(generator),每次执行时会依据以后的状态和输出参数,失去(generate)输入,但不肯定为最终后果。

yieldnext()
Js
function* anotherGenerator(i) {
    yield i + 1;
    yield i + 2;
    yield i + 3;
}

Rust 的无栈协程7

无栈协程顾名思义就是不应用栈和上下文切换来执行异步代码逻辑的机制。这里异步代码尽管是异步的,但执行起来看起来是一个同步的过程。

从这一点上来看 Rust 协程与 Go 协程也没什么两样。举例说明:

Rust
async fn routine() 
{
    let mut a = 5;
    sleep(1000).await;
    a = a + 1;
    a
}

简直是一样的流程。Sleep 会导致睡眠,当工夫已到,从新返回执行,局部变量 a 内容应该还是5。

Go协程是有栈的,所以这个局部变量保留在栈中,而 Rust 是怎么实现的呢?答案就是 Generator 生成的状态机。

Generator 和闭包相似,可能捕捉变量 a,放入一个匿名的构造中,在代码中看起来是局部变量的数据 a,会被放入构造,保留在全局(线程)栈中。

sleep.awaita= a + 1async routine().await

依照个别的说法。无栈协程有很多益处。首先不必调配栈。因为到底给协程调配多大的栈是个大问题。特地是在32位的零碎下,地址空间是无限的。每个协程都须要专门的栈,很显著会影响到能够创立的协程总数。其次,没有上下文切换,性能也更好一些。

总结8

  1. 有栈协程:用rsp栈寄存器来索引局部变量,上下文是协程公有的栈。 拜访上下文数据也就是局部变量的时候,咱们无需显式的应用栈寄存器+偏移量来拜访,而是间接拜访变量名。
  2. 无栈协程:把栈指针寄存器换成this指针,将局部变量捕捉作为对象成员,整个上下文打包成一个generator构造体对象,能够称之为 面向对象的上下文

    用this来索引对象的成员变量,上下文就是对象本人。拜访上下文数据也就是成员变量的时候,咱们无需显式的应用this+成员偏移量(或者变量名)来拜访(通过语法糖),而是间接拜访变量名。


C++协程设计9

​ 因为目前C++规范对协程反对还比拟简陋(C++20中引入了协程,然而较为底层+没有好用的库,间隔可用还有较远距离,乐观来看在C++23中 Asio进入规范库后有很大施展空间),临时没有一个规范协程库,因而市面上诞生了很多不同类型协程库,设计指标不同,实现形式也有差别。上面就进行一个简短的总结。

协程设计的不同档次

首先,依据协程库的欠缺水平,总体上能够分为四个类型:

1.API 级:

这一级别实现协程上下文切换 api,或增加一些便于应用的封装; 特点:没有协程调度。

代表作:boost.context, boost.coroutine, ucontext(unix), fiber(windows)

这一档次的协程库,仅仅提供了一个底层 api,要想拿来做我的项目,还有十分十分边远的间隔;不过这些协程 api 能够为咱们实现本人的协程库提供一个良好的根底。

2.玩具级:

这一级别实现了协程调度,无需用户手动解决协程上下文切换;特点:没有 HOOK

代表作:libmill

这一档次的协程库,实现了协程调度(相似于操作系统有了过程调度机制);稍好一些的意识到了阻塞网络 io 与协程的不协调之处,本人实现了一套网络 io 相干函数;

然而这也意味着波及网络的第三方库全副不可用了。hiredis 不能用了,mysqlclient 不能用了。放弃整个 C/C++生态全副本人轮,这个玩笑开的有点大,所以只能称之为“玩具级”。

3.工业框架级:

这一级别以正确的形式 HOOK 了网络 io 相干的 syscall,能够不改代码的兼容大多数第三方库

代表作:libco、libgo

这一档次的协程库,可能模仿被 hook 的 syscall 的行为,可能兼容任何网络 io 行为的同步模型的第三方库。但因为 C++的灵活性,用户行为是不受限的,所以仍然存在几个边边角角的难点须要开发者留神:没有 gc(开发者要理解协程的调度机会和生命期),TLS 的问题,用户不按套路出牌、把逻辑代码 run 在协程之外,粗粒度的线程锁等等。

ps : libco的开源社区很不沉闷,我还提了个pr 10

4.语言级

代表作:golang

这一档次的协程库,开发者的所有行为都是受限行为,能够实现无死角的欠缺的协程。

C++协程实现计划

pthread_create
  1. 首次调用协程函数,会从堆中调配一个协程上下文,调用方的返回地址、入口函数、交出控制权等信息保留在协程上下文中

    ​ 实际上,这个协程上下文很值得一提。对于一个管制流程而言,所须要的环境不外乎 CPU上下文(寄存器信息)、栈、堆以及所要执行的代码 四个因素,代码执行地位信息由RIP寄存器决定(以x86-64为例),而且从堆中调配出的内存地址信息个别也会间接或间接的保留在运行栈中,所以,这四个因素能够简化为寄存器和栈两个因素(这也正是垃圾回收算法之一的可达性剖析算法的原理,以栈中和寄存器中对象作为垃圾回收的终点,遍历所有援用链,不可达的对象就视作垃圾予以回收。而另一种垃圾回收算法是咱们相熟的援用计数算法12)。

    上面是libco中(x86-64架构)协程上下文的设计,也正是咱们刚刚所探讨的情景。

    struct coctx_t
    {
        void *regs[ 14 ]; // 寄存器信息
        size_t ss_size;   // 协程栈的大小
        char *ss_sp;      // 协程栈低地址

    };
  1. 当协程中途交出控制权后,协程上下文不会删除(相似于线程切换时候保留上下文环境)。
  2. 当协程再次取得控制权后,会主动从协程上下文复原调用环境,而后从上一次交出控制权的下一条语句继续执行。
  3. 协程函数运行结束后,协程上下文将被删除。

联合协程函数的运行特点,依据C++中协程的不同实现形式,能够分为 有栈协程无栈协程

有栈协程:

​ 技术路线:一个线程能够创立大量协程,每个协程都会保留一个公有栈(实际上,不肯定每个协程都有一个独立栈,也可能是若干个协程共用一个栈以节俭内存),协程一旦执行到异步IO处,就会被动交出控制权。同一线程的所有协程以串行形式合作执行,没有数据争用的问题。

​ 一般来说,一个线程能够蕴含多个有栈协程,而且线程自身也不外乎一个控制流,也能够视作一个协程,线程栈即为协程栈,个别称作主协程。

{get,set,make,swap}contextucontext
协程栈的实现计划

协程栈的实现也有很多计划可供选择。

int main(){
    int *p = new int;
    p += 10;              //挪动指针
    p -= 10;
    *p = 10;            
}
  1. 共享栈(Shared Stack)

    申请一块大内存作为共享栈(比方:8MB),每次开始运行协程之前,先把协程栈的内存 copy 到共享栈中,运行完结后再计算协程栈真正应用的内存,copy 进去保存起来,这样每次只需保留真正应用到的栈内存量即可。

    这种计划极大水平上防止了内存的节约,做到了用多少占多少,等同内存条件下,能够启动的协程数量更多,libco中采纳的就是这种办法。不过毛病是memcpy保留复原协程栈须要额定的工夫。

协程调度计划

依据调度形式能够将调度计划分为对称调度和非对称调度。

非对称调度

非对称调度是指所有创立的协程都必须和调用者进行切换,有着明确的调用和返回关系,门路如下:

主协程->协程A->协程B->协程A->主协程->协程C.....

如果不进行协程嵌套的话,所有协程都必须和主协程进行切换,因而,主协程也被称作调度协程。

对称调度

对称调度是指所有协程之间能够自在切换,个别须要调度器的参加,让出CPU后由调度器决定下一个运行哪个协程,如golang中的goroutine调度。

这两种形式仅仅是单个线程上进行的最根本的协程调度形式,如果谋求多核扩展性还能够配合工作窃取(Work Stealing18)设计更简单的调度形式。

无栈协程

​ 技术路线:将异步IO封装到协程函数中,协程函数发动异步IO后,立刻保留执行环境,而后吧控制权交给调用方(Caller),调用方持续往下执行;异步IO实现后,负责解决IO实现事件的回调函数取得控制权,回调函数再把控制权转交给发动IO的协程,发动IO的协程首先复原执行环境,而后从上一次交出控制权的下一条语句持续往下执行

​ 无栈协程中每个协程的执行环境,仅须要蕴含调用栈(Call Stack)的顶层栈帧(Top Call Stack Frame),而非整个调用栈,空间开销极小,然而因而不能嵌套调用。另外,无栈协程须要语言规范和编译器的反对(编译器将协程翻译成状态机),C++20中引入的就是无栈协程。


HOOK模块

基本原理与实现形式

readwrite

​ 基本原理是应用LD_PRELOAD环境变量,影响程序运行时的链接。因为linux下的动静连接器存在规定:当一个符号退出全局符号表的时候,如果雷同的符号名曾经存在,则前面退出的符号被疏忽。 因为glic是最初链接的,那么通过定义在程序运行前优先加载的动态链接库,就能够达到用本人的.so函数替换原有的函数。

dlsym
RTLD_NEXTmallocLD_PRELOADsolibc.so.6next
#include<dlfcn.h>
void* (*g_sys_malloc)(size_t) = dlsym(RTLD_NEXT, "malloc");

void* malloc(size_t size)
{
    void *p = g_sys_malloc(size);
    printf("in malloc hook function ...\n");
    return p;
}

根本守则与具体实际

​ HOOK中有一条重要的根本守则:HOOK接口体现进去的行为与被HOOK的接口放弃100%统一。因为在协程中还是须要兼容应用第三方的同步网络库,接口行为不统一会导致第三方库莫名其妙的谬误。

​ HOOK的根本思维是但凡有可能引起阻塞的零碎调用或库函数都要被hook住,因为协程自身是不能被阻塞的,一旦阻塞运行协程的线程也会跟着阻塞,其余协程也就无奈失去运行。

read、recv、write、send、connectreadreadreadg_sys_read
ssize_t read(int fd, void *buf, size_t count){
  将以后协程注册到定时器上,注册回调函数用于未来解决 g_sys_read() 函数的读超时
      
  调用 epoll_ctl() 将本人(op = EPOLLIN, fd = current fd)注册到以后执行环境的 epoll 实例上,注册回调函数用于解决可读事件
      
  调用 co_yield_env 函数让出 CPU
      
  等到该协程被主协程从新“唤醒”后继续执行
}

此外,为了满足用户的不同需要,咱们还须要进行判断

enable hookg_sys_readenable hookNonBlockg_sys_readreadg_sys_read

补充5

纤程

boostboost::fiberboost::coroutine2

正确应用姿态

​ 协程实用于IO密集的场景,次要是一直的创立协程,让调度器调度运行,在协程运行过程对于一些会阻塞的条件,做一个非阻塞的检测中,发现条件不满足就让出cpu,这就是常见轮转+非阻塞同步。

无栈协程的实现参考

​ 无栈协程因为每个中断点都是确定,那其实只须要将函数的代码再进行细分,保留好局部变量,做好调用过程的状态变动, 上面就将一个协程函数fn进行切分后变成一个Struct,这样的实现绝对于有栈协程而言应用的内存更少。

​ 从上面代码就不难理解,为什么咱们说无栈协程实质上是一个状态机了,也可能了解,为什么无栈协程须要语言和编译器的反对(须要编译器将代码翻译成状态机)。

void fn(){
    int a, b, c;
    a = b + c;
    yield();
    b = c + a;
    yield();
    c = a + b;
}

----------------------------分割线---------------------------------
Struct fn{
    int a, b, c;
    int __state = 0;
    
    void resume(){
        switch(__state) {
        case 0:
             return fn1();
        case 1:
             return fn2();
        case 2:
             return fn3();
        }
    }
    
    void fn1(){
        a = b + c;
    }
    
    void fn2(){
        b = c + a;
    }
    
    void fn3(){
        c = a + b;
    }
};

libco实现剖析

以libco源码为例,作一个简短的剖析

libco采纳共享栈作为协程栈,栈的大小128KB,每次运行协程前将保留的协程栈拷贝到共享栈下面运行。

如下是共享栈的构造,能够看到,一个共享栈中实际上并非只有一个栈,而是能够指定数目。如果有多个共享栈,在调配的时候采纳round-robin算法抉择一个。

struct stShareStack_t
{
    unsigned int alloc_idx;  //以后下标,用于round-robin算法
    int stack_size;    //共享栈大小
    int count;    // 共享栈数量
    stStackMem_t** stack_array; //若干个共享栈
};

static stStackMem_t* co_get_stackmem(stShareStack_t* share_stack)
{
    if (!share_stack)
    {
        return NULL;
    }
    int idx = share_stack->alloc_idx % share_stack->count;  //采纳round-robin算法调配共享栈
    share_stack->alloc_idx++;

    return share_stack->stack_array[idx];
}

接下来去看共享栈具体内存的构造,也没有什么非凡之处。

struct stStackMem_t
{
    stCoRoutine_t* occupy_co; // 以后运行的协程              stack_bp ---  高地址
    int stack_size;    //栈大小,默认128KB                                            |
    char* stack_bp; //stack_buffer + stack_size                    |
    char* stack_buffer; // 栈顶(低地址)                    stack_buffer --- 低地址
};

上面就是协程的主体构造,基本上都是最根本的信息,没有什么额定性能(例如信号掩码之类)。将必要信息加在了正文中不便了解,不再一一赘述。

struct stCoRoutine_t
{
    stCoRoutineEnv_t *env;         // 以后线程的协程运行环境
    pfn_co_routine_t pfn;          // 协程函数   ==  typedef void* (*pfn_co_routine_t)(void*);
    void *arg;                     // 函数参数
    coctx_t ctx;                // 协程上下文

    char cStart;                // 标记位,1为协程曾经启动,否则为0
    char cEnd;                    // 标记为,1为协程曾经完结,否则为0
    char cIsMain;                // 是否是主协程
    char cEnableSysHook;        // 是否启用hook
    char cIsShareStack;            // 是否应用的是共享栈

    void *pvEnv;                // 零碎环境变量无关

    //char sRunStack[ 1024 * 128 ];
    stStackMem_t* stack_mem;     // 以后线程应用的共享栈


    //save satck buffer while confilct on same stack_buffer;
    char* stack_sp;             // 保留协程栈
    unsigned int save_size;
    char* save_buffer;

    stCoSpec_t aSpec[1024];
};
struct stCoRoutineEnv_t
{
    stCoRoutine_t *pCallStack[ 128 ];             // 作为协程调用栈应用,反对协程嵌套调用。能够看到,最多反对128个协程嵌套
    int iCallStackSize;                            // 记录入栈的协程数量,便于resume()(总是resume()栈顶的协程)
    stCoEpoll_t *pEpoll;                        // 封装的epoll构造体,每个线程一个,阻塞时将以后协程监听的事件注册到该epoll下面,同时让出CPU

    //for copy stack log lastco and nextco
    stCoRoutine_t* pending_co;                    // 要执行的下个协程
    stCoRoutine_t* occupy_co;                    // 要执行的上个协程
};
struct coctx_t
{

    void *regs[ 14 ];                            // 保留寄存器组
    size_t ss_size;                                // 协程理论用到的栈的大小
    char *ss_sp;                                // 保留协程栈的首地址
    
};

上面以一个最简略的、什么都不做、只输入echo信息的demo来看libco的次要执行流程

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <unistd.h>
#include <sys/time.h>
#include <errno.h>
#include <string.h>
#include "../coctx.h"
#include "../co_routine.h"
#include "../co_routine_inner.h"


void* RoutineFunc(void* args){
    co_enable_hook_sys();
    int* routineid = (int*)args;
    while (true)
    {
        printf("echo from routine %d\n", *routineid + 1);
        co_yield_ct();                                                // 协程暂停执行
    }
    return NULL;
}


int main()
{
    stShareStack_t* share_stack= co_alloc_sharestack(1, 1024 * 128);  // 1. 创立共享栈
    stCoRoutineAttr_t attr;
    attr.stack_size = 0;
    attr.share_stack = share_stack;

    stCoRoutine_t* co[2];                                            
    int routineid[2];
    for (int i = 0; i < 2; i++)
    {
        routineid[i] = i;
        co_create(&co[i], &attr, RoutineFunc, routineid + i);          // 2. 创立协程
    }

    while(true){
        std::cout << "before resume1" << std::endl;
        co_resume(co[0]);                                              // 3. 执行协程0
        std::cout << "after resume1 and before resume2" << std::endl;
        co_resume(co[1]);                                              // 4. 执行协程1
        std::cout << "after resume2" << std::endl;
    }
    return 0;
}

输入为

before resume1
echo from routine 1
after resume1 and before resume2
echo from routine 2
after resume2
...

第一步是创立共享栈

stShareStack_t* co_alloc_sharestack(int count, int stack_size)
{
    stShareStack_t* share_stack = (stShareStack_t*)malloc(sizeof(stShareStack_t));             // 1. 创立共享栈构造体
    share_stack->alloc_idx = 0;
    share_stack->stack_size = stack_size;

    //alloc stack array
    share_stack->count = count;
    stStackMem_t** stack_array = (stStackMem_t**)calloc(count, sizeof(stStackMem_t*));        // 2. 创立共享栈数组
    for (int i = 0; i < count; i++)
    {
        stack_array[i] = co_alloc_stackmem(stack_size);                                        // 3. 创立共享栈理论空间
    }
    share_stack->stack_array = stack_array;
    return share_stack;
}

第二步就是创立协程了, 进入co_create函数

int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
    if( !co_get_curr_thread_env() ) 
    {
        co_init_curr_thread_env();                                                    // 如果是第一次创立协程,尚未初始化以后线程的协程运行环境,
    }                                                                                // 那么首先初始化协程运行环境
    stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );   // 创立协程
    *ppco = co;
    return 0;
}

首先判断是不是曾经筹备好了以后线程环境(第一个协程负责初始化线程环境)

stCoRoutineEnv_t
struct stCoRoutineEnv_t
{
    stCoRoutine_t *pCallStack[ 128 ];             // 作为协程调用栈应用,反对协程嵌套调用。能够看到,最多反对128个协程嵌套
    int iCallStackSize;                            // 记录入栈的协程数量,便于resume()(总是resume()栈顶的协程)
    stCoEpoll_t *pEpoll;                        // 封装的epoll构造体,每个线程一个,阻塞时将以后协程监听的事件注册到该epoll下面,同时让出CPU

    //for copy stack log lastco and nextco
    stCoRoutine_t* pending_co;                    // 要执行的下个协程
    stCoRoutine_t* occupy_co;                    // 要执行的上个协程
};

如果是创立第一个协程,那么首先初始化协程运行环境

void co_init_curr_thread_env()
{
    gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
    stCoRoutineEnv_t *env = gCoEnvPerThread;

    env->iCallStackSize = 0;
    struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );                // 创立主协程(即调度协程),也就是将线程自身视作协程
    self->cIsMain = 1;

    env->pending_co = NULL;
    env->occupy_co = NULL;

    coctx_init( &self->ctx );                                                        // 初始化协程构造体  ==>  memset(ctx, 0, sizeof(*ctx));

    env->pCallStack[ env->iCallStackSize++ ] = self;                                // 将主协程入栈,因而任何时候,无论如何出栈,最终都会返回主协程,而主协程永不退出(永不yield)

    stCoEpoll_t *ev = AllocEpoll();                                                    // 注册线程的epoll实例
    SetEpoll( env,ev );
}

接下来真正开始创立协程

struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
        pfn_co_routine_t pfn,void *arg )
{

    // 设置协程属性,次要是设置协程栈的大小
    stCoRoutineAttr_t at;
    if( attr )
    {
        memcpy( &at,attr,sizeof(at) );
    }
    if( at.stack_size <= 0 )
    {
        at.stack_size = 128 * 1024;                                            // 栈最小为128KB
    }
    else if( at.stack_size > 1024 * 1024 * 8 )
    {
        at.stack_size = 1024 * 1024 * 8;                                    // 栈最大为8MB
    }

    if( at.stack_size & 0xFFF )                                             // 栈大小4K对齐
    {
        at.stack_size &= ~0xFFF;
        at.stack_size += 0x1000;
    }

    stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
    
    memset( lp,0,(long)(sizeof(stCoRoutine_t))); 


    lp->env = env;                                                            // 设置协程环境
    lp->pfn = pfn;                                                            // 设置协程函数
    lp->arg = arg;                                                            // 设置协程参数

    stStackMem_t* stack_mem = NULL;
    if( at.share_stack )
    {
        stack_mem = co_get_stackmem( at.share_stack);                        // 应用round-robin算法调配一个共享栈
        at.stack_size = at.share_stack->stack_size;
    }
    else
    {
        stack_mem = co_alloc_stackmem(at.stack_size);                        // 或者应用独立栈
    }
    lp->stack_mem = stack_mem;

    lp->ctx.ss_sp = stack_mem->stack_buffer;                                // 给协程上下文注册栈地址以及栈大小
    lp->ctx.ss_size = at.stack_size;                                        

    lp->cStart = 0;
    lp->cEnd = 0;
    lp->cIsMain = 0;
    lp->cEnableSysHook = 0;
    lp->cIsShareStack = at.share_stack != NULL;

    lp->save_size = 0;
    lp->save_buffer = NULL;

    return lp;
}
co_resume
void co_resume( stCoRoutine_t *co )
{
    stCoRoutineEnv_t *env = co->env;
    stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];   // 获得以后运行的协程(如果是第一次运行协程,那么是主协程)
    if( !co->cStart )
    {
        coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );                     // 如果是第一次运行,那么初始化要运行的协程
        co->cStart = 1;
    }
    env->pCallStack[ env->iCallStackSize++ ] = co;                                 // 要运行的协程存入协程调用栈
    co_swap( lpCurrRoutine, co );                                                 // 将要运行的协程和以后协程进行切换
}

在咱们持续摸索之前,须要回顾一下函数调用和汇编的常识。同时看一下14个寄存器的规定20

push param // 压入函数参数
call // 压入调用函数的下一条指令的地址
push ebp  // 栈帧开始
mov ebp, esp // esp 指向栈顶元素的凑近栈的的地址
push ... // 保留寄存器 (分为调用者保留和被调用者保留两局部, 依据指令集的不同)
//创立局部变量
pop ... //弹出寄存器
mov esp, ebp
pop ebp
ret 
    
// low | regs[0]: r15 |
//    | regs[1]: r14 |
//    | regs[2]: r13 |
//    | regs[3]: r12 |
//    | regs[4]: r9  |
//    | regs[5]: r8  |
//    | regs[6]: rbp |
//    | regs[7]: rdi |
//    | regs[8]: rsi |
//    | regs[9]: ret |  //ret func addr
//    | regs[10]: rdx |
//    | regs[11]: rcx |
//    | regs[12]: rbx |
// hig | regs[13]: rsp |
 
    在64位下保留14个寄存器。咱们晓得X86架构下有8个通用寄存器,X64则有16个寄存器,那么为什么64位只应用保留14个寄存器呢?咱们能够在coctx_swap.S中看到64位下短少了对%r10, %r11寄存器的备份,
x86-64的16个64位寄存器别离是:%rax, %rbx, %rcx, %rdx, %esi, %edi, %rbp, %rsp, %r8-%r15。其中:

%rax 作为函数返回值应用
%rsp栈指针寄存器,指向栈顶
%rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,顺次对应第1参数,第2参数
%rbx,%rbp,%r12,%r13,%14,%15 用作数据存储,遵循被调用者爱护规定,简略说就是轻易用,调用子函数之前要备份它,以防被批改
%r10,%r11 用作数据存储,遵循调用者爱护规定,简略说就是应用之前要先保留原值
咱们来看看两个生疏的名词调用者爱护&被调用者爱护:

调用者爱护:示意这些寄存器上存储的值,须要调用者(父函数)本人想方法先备份好,否则过会子函数间接应用这些寄存器将有情的笼罩。如何备份?当然是实现压栈(pushl),等子函数调用实现,再通过栈复原(popl)
被调用者爱护:即示意须要由被调用者(子函数)想方法帮调用者(父函数)进行备份

这些就足够了!

int coctx_make(coctx_t* ctx, coctx_pfn_t pfn, const void* s, const void* s1) {
  char* sp = ctx->ss_sp + ctx->ss_size - sizeof(void*);
  sp = (char*)((unsigned long)sp & -16LL);                                            // 获得栈底地址(高地址,栈是从高向低成长)

  memset(ctx->regs, 0, sizeof(ctx->regs));
  void** ret_addr = (void**)(sp);                                                    
  *ret_addr = (void*)pfn;                                                            // 为什么coctx_make()中须要将返回函数的地址赋值给rsp?
                                                                                    // 前面会进行解答
  ctx->regs[kRSP] = sp;                                                                // 设置寄存器

  ctx->regs[kRETAddr] = (char*)pfn;                                                    // 设置返回地址,即协程函数地址

  ctx->regs[kRDI] = (char*)s;                                                        // 设置函数参数
  ctx->regs[kRSI] = (char*)s1;
  return 0;    
}

接下来就真正开始进行切换!

void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
     stCoRoutineEnv_t* env = co_get_curr_thread_env();

    //get curr stack sp
    char c;                                                    // 这里是想取得以后协程的应用的栈的大小,为此,咱们须要取得栈顶的地址。
    curr->stack_sp= &c;                                        // 这里用了一个小trick,在栈上申明了一个char变量,其地址对齐后就是栈顶地址。
                                                            // 如果不了解,请看一下下面的函数调用栈帧图
    
    // get curr stack sp 以及对应的汇编
    // char c;                                                // lea 0x7(%rsp), %rdx    这里rsp + 7 是因为char变量无需对齐,而rsp须要8字节对齐
    // curr->stack_sp= &c;                                    // mov %rdx, 0xb0(%rbp)
    
    
    
    if (!pending_co->cIsShareStack)                            // 如果应用独立栈就没必要保留栈了
    {
        env->pending_co = NULL;
        env->occupy_co = NULL;
    }
    else 
    {
        env->pending_co = pending_co;                    
        //get last occupy co on the same stack mem        
        stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
        //set pending co to occupy thest stack mem;
        pending_co->stack_mem->occupy_co = pending_co;

        env->occupy_co = occupy_co;
        if (occupy_co && occupy_co != pending_co)
        {
            save_stack_buffer(occupy_co);                        // 保留要被笼罩的栈
        }
    }

    //swap context
    coctx_swap(&(curr->ctx),&(pending_co->ctx) );                // 进行寄存器上下文的切换

    //stack buffer may be overwrite, so get again;
    stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
    stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
    stCoRoutine_t* update_pending_co = curr_env->pending_co;
    
    if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
    {
        //resume stack buffer
        if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
        {
            memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);  // 复原要运行的协程的栈
        }
    }
}
void save_stack_buffer(stCoRoutine_t* occupy_co)
{
    ///copy out
    stStackMem_t* stack_mem = occupy_co->stack_mem;
    int len = stack_mem->stack_bp - occupy_co->stack_sp;              // 栈底(高地址) - 栈顶(方才通过定义char变量取得的低地址) 即为理论应用的大小

    if (occupy_co->save_buffer)
    {
        free(occupy_co->save_buffer), occupy_co->save_buffer = NULL;
    }

    occupy_co->save_buffer = (char*)malloc(len); //malloc buf;
    occupy_co->save_size = len;

    memcpy(occupy_co->save_buffer, occupy_co->stack_sp, len);
}
.globl coctx_swap
#if !defined( __APPLE__ )
.type  coctx_swap, @function
#endif
coctx_swap:
    leaq (%rsp),%rax                    // 首先保留以后寄存器上下文到current->ctx中,current->ctx也就是第一个参数,依据传参规定第一个参数通过rdi传入 
    movq %rax, 104(%rdi)
    movq %rbx, 96(%rdi)
    movq %rcx, 88(%rdi)
    movq %rdx, 80(%rdi)
    movq 0(%rax), %rax
    movq %rax, 72(%rdi) 
    movq %rsi, 64(%rdi)
    movq %rdi, 56(%rdi)
    movq %rbp, 48(%rdi)
    movq %r8, 40(%rdi)
    movq %r9, 32(%rdi)
    movq %r12, 24(%rdi)
    movq %r13, 16(%rdi)
    movq %r14, 8(%rdi)
    movq %r15, (%rdi)
    xorq %rax, %rax

    movq 48(%rsi), %rbp                     // 从要运行的pending->ctx中复原寄存器的值,pending->ctx也就是第二个参数,依据传参规定通过rsi传入
    movq 104(%rsi), %rsp
    movq (%rsi), %r15
    movq 8(%rsi), %r14
    movq 16(%rsi), %r13
    movq 24(%rsi), %r12
    movq 32(%rsi), %r9
    movq 40(%rsi), %r8
    movq 56(%rsi), %rdi
    movq 80(%rsi), %rdx
    movq 88(%rsi), %rcx
    movq 96(%rsi), %rbx
    leaq 8(%rsp), %rsp
    pushq 72(%rsi)                            // regs[9] 中保留了协程函数运行地址,即压入栈,ret时即可胜利跳转

    movq 64(%rsi), %rsi
    ret

读到这里,你可能会有一个疑难,为什么保留寄存器上下文的时候将rsp保留到了regs[kRETAddr]?换言之,为什么rsp保留了返回地址? 同时,我在coctx_make()的正文中也留下了问题 “为什么coctx_make()中须要将返回函数的地址赋值给rsp?“ 这须要函数调用的常识来解答。

---------
ret addr
---------
old rbp          <--- RBP
---------
local var1 
---------
local var2 
---------
fcn param #2
---------
fcn param #1
---------
ret addr          <--- RSP
---------
old rbp
---------

如图是coctx_swap执行前的栈帧,能够看到,RBP依然指向上个栈帧的基地址,RSP指向函数返回地址。

如果作为失常的函数调用,那么第一件事件就是要开拓本人的栈帧,也就是

push rbp
mov rbp, rsp

这两个动作由编译器主动插入,而后才将rsp增长到栈底。然而如果没有认真思考的话,很容易误以为rsp默认就指向栈顶,从而产生为什么rsp指向的是函数地址的纳闷。

显然在coctx_swap中,咱们并没有开拓本人的栈帧,因而rsp指向的依然是函数的返回地址,所有都失去了解释!

在协程中让出cpu,操作就简略的多啦

void co_yield_env( stCoRoutineEnv_t *env )
{
    
    stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];    // 实际上是主协程
    stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];     // 以后正在运行的协程

    env->iCallStackSize--;

    co_swap( curr, last);
}

参考资料