go语言调度器

调度器就是将goroutine分配到工作线程中运行
涉及3种类型的对象:
G - goroutine
M - 工作线程即os线程
P - 处理器,一种用来运行go代码的抽象资源,最大数目不能超过GOMAXPROCS,在运行go代码时需要关联一个M

全局的运行队列:

G *runtime·sched.runqhead;
G *runtime·sched.runqtail;
int runtime·sched.runqsize;

P结构的主要成员:包含一个本地运行队列

struct P
{
    uint32	status;		// one of Pidle/Prunning/...
    uint32	schedtick;	// incremented on every scheduler call
 
    M*	m;		// back-link to associated M (nil if idle)
 
    // Queue of runnable goroutines.
    uint32	runqhead;
    uint32	runqtail;
    G*	runq[256];
};

schedule函数主要部分代码

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
static void
schedule(void)
{
    G *gp;
    uint32 tick;
 
    ......
top:
    ......
    
    gp = nil;
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
    tick = g->m->p->schedtick;
    // This is a fancy way to say tick%61==0,
    // it uses 2 MUL instructions instead of a single DIV and so is faster on modern processors.
    if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime·sched.runqsize > 0) {
    	runtime·lock(&runtime·sched.lock);
    	gp = globrunqget(g->m->p, 1);
    	runtime·unlock(&runtime·sched.lock);
    	if(gp)
    		resetspinning();
    }
    if(gp == nil) {
    	gp = runqget(g->m->p);
    	if(gp && g->m->spinning)
    		runtime·throw("schedule: spinning with local work");
    }
    if(gp == nil) {
    	gp = findrunnable();  // blocks until work is available
    	resetspinning();
    }
 
    execute(gp);
}

调度器在超过一定间隔时间的情况下,为了公平原则,首先会从全局的运行队列获取G
从本地的运行队列中获取G
等待新的G进入运行队列

globrunqget从全局运行队列获取G,同时它还会将一定数量的G转移到P的本地运行队列中.

runqget从本地运行队列获取G,本地运行队列的实现是无锁的:

// Get g from local runnable queue.
// Executed only by the owner P.
static G*
runqget(P *p)
{
    G *gp;
    uint32 t, h;
 
    for(;;) {
    	h = runtime·atomicload(&p->runqhead);  // load-acquire, synchronize with other consumers
    	t = p->runqtail;
    	if(t == h)
    		return nil;
    	gp = p->runq[h%nelem(p->runq)];
    	if(runtime·cas(&p->runqhead, h, h+1))  // cas-release, commits consume
    		return gp;
    }
}

findrunnable阻塞等待可运行的G

  • 检查本地运行队列
  • 检查全局运行队列
  • 以non-blocking的模式poll network
  • 检查其它P的本地运行队列
  • 如果最后依旧无法在系统内获取到G,那么就以blocking的模式poll network

    // Finds a runnable goroutine to execute.
    // Tries to steal from other P's, get g from global queue, poll network.
    static G
    findrunnable(void)
    {
    G
    gp;
    P *p;
    int32 i;

    top:
    if(runtime·sched.gcwaiting) {
    gcstopm();
    goto top;
    }
    if(runtime·fingwait && runtime·fingwake && (gp = runtime·wakefing()) != nil)
    runtime·ready(gp);
    // local runq
    gp = runqget(g->m->p);
    if(gp)
    return gp;
    // global runq
    if(runtime·sched.runqsize) {
    runtime·lock(&runtime·sched.lock);
    gp = globrunqget(g->m->p, 0);
    runtime·unlock(&runtime·sched.lock);
    if(gp)
    return gp;
    }
    // poll network
    gp = runtime·netpoll(false); // non-blocking
    if(gp) {
    injectglist(gp->schedlink);
    runtime·casgstatus(gp, Gwaiting, Grunnable);
    return gp;
    }
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
    if(!g->m->spinning && 2 * runtime·atomicload(&runtime·sched.nmspinning) >= runtime·gomaxprocs - runtime·atomicload(&runtime·sched.npidle)) // TODO: fast atomic
    goto stop;
    if(!g->m->spinning) {
    g->m->spinning = true;
    runtime·xadd(&runtime·sched.nmspinning, 1);
    }
    // random steal from other P's
    for(i = 0; i < 2*runtime·gomaxprocs; i++) {
    if(runtime·sched.gcwaiting)
    goto top;
    p = runtime·allp[runtime·fastrand1()%runtime·gomaxprocs];
    if(p == g->m->p)
    gp = runqget(p);
    else
    gp = runqsteal(g->m->p, p);
    if(gp)
    return gp;
    }
    stop:
    // return P and block
    runtime·lock(&runtime·sched.lock);
    if(runtime·sched.gcwaiting) {
    runtime·unlock(&runtime·sched.lock);
    goto top;
    }
    if(runtime·sched.runqsize) {
    gp = globrunqget(g->m->p, 0);
    runtime·unlock(&runtime·sched.lock);
    return gp;
    }
    p = releasep();
    pidleput(p);
    runtime·unlock(&runtime·sched.lock);
    if(g->m->spinning) {
    g->m->spinning = false;
    runtime·xadd(&runtime·sched.nmspinning, -1);
    }
    // check all runqueues once again
    for(i = 0; i < runtime·gomaxprocs; i++) {
    p = runtime·allp[i];
    if(p && p->runqhead != p->runqtail) {
    runtime·lock(&runtime·sched.lock);
    p = pidleget();
    runtime·unlock(&runtime·sched.lock);
    if(p) {
    acquirep(p);
    goto top;
    }
    break;
    }
    }
    // poll network
    if(runtime·xchg64(&runtime·sched.lastpoll, 0) != 0) {
    if(g->m->p)
    runtime·throw("findrunnable: netpoll with p");
    if(g->m->spinning)
    runtime·throw("findrunnable: netpoll with spinning");
    gp = runtime·netpoll(true); // block until new work is available
    runtime·atomicstore64(&runtime·sched.lastpoll, runtime·nanotime());
    if(gp) {
    runtime·lock(&runtime·sched.lock);
    p = pidleget();
    runtime·unlock(&runtime·sched.lock);
    if(p) {
    acquirep(p);
    injectglist(gp->schedlink);
    runtime·casgstatus(gp, Gwaiting, Grunnable);
    return gp;
    }
    injectglist(gp);
    }
    }
    stopm();
    goto top;
    }

本文来自:博客园

感谢作者:richmonkey

查看原文:go语言调度器

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。