type timer struct { i int// heap index when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr }
go1.13
type timer struct { tb *timersBucket // the bucket the timer lives in i int// heap index when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr }
// timersLen is the length of timers array. // // Ideally, this would be set to GOMAXPROCS, but that would require // dynamic reallocation // // The current value is a compromise between memory usage and performance // that should cover the majority of GOMAXPROCS values used in the wild. const timersLen = 64
// timersLen is the length of timers array. // // Ideally, this would be set to GOMAXPROCS, but that would require // dynamic reallocation // // The current value is a compromise between memory usage and performance // that should cover the majority of GOMAXPROCS values used in the wild. var timers [timersLen]struct { timersBucket
// The padding should eliminate false sharing // between timersBucket values. pad [cpu.CacheLinePadSize - unsafe.Sizeof(timersBucket{})%cpu.CacheLinePadSize]byte }
//go:notinheap type timersBucket struct { lock mutex gp *g created bool sleeping bool rescheduling bool sleepUntil int64 waitnote note t []*timer }
在添加 timer 对象时逻辑变成了,根据当前 p 的 id 对 timersLen 取模,得到了 p 对应的 timersBucket id := uint8(getg().m.p.ptr().id) % _timersLen_ 从这个优化的方法来看,以前是每个p去抢同一把锁,现在变成,每个p只会操作对应的 timersBucket(大多数情况下)。
在超过 64 个 p 的时候,就会出现取模到同一个 bucket 中,这种情况在多核 cpu > 64 上是没办法避免的
// 咱们只把重点放在与以前不同的地方上 func(tb *timersBucket) addtimerLocked(t *timer) bool { if t.when < 0 { t.when = 1<<63 - 1 } t.i = len(tb.t) tb.t = append(tb.t, t) // 添加到p对应的timersBucket中,而不是全局的 timers 中了 if !siftupTimer(tb.t, t.i) { returnfalse } if t.i == 0 { // siftup moved to top: new earliest deadline. if tb.sleeping && tb.sleepUntil > t.when { tb.sleeping = false notewakeup(&tb.waitnote) } if tb.rescheduling { tb.rescheduling = false goready(tb.gp, 0) } if !tb.created { // 判断属于这个 tb 的 timerproc 是否启动了, // 区别于1.8版本是一个全局变量控制的,只有一个消费者,这里是每一个 tb 都有一个消费者 tb.created = true go timerproc(tb) } } returntrue }
timer 的“消费者”
// 主要的逻辑还是同 1.8 版本中一致的,不同的地方就是针对每个tb进行的操作,不是全局的 timers functimerproc(tb *timersBucket) { tb.gp = getg() for { lock(&tb.lock) tb.sleeping = false now := nanotime() delta := int64(-1) for { iflen(tb.t) == 0 { delta = -1 break } t := tb.t[0] delta = t.when - now if delta > 0 { break } ok := true if t.period > 0 { // leave in heap but adjust next time to fire t.when += t.period * (1 + -delta/t.period) if !siftdownTimer(tb.t, 0) { ok = false } } else { // remove from heap last := len(tb.t) - 1 if last > 0 { tb.t[0] = tb.t[last] tb.t[0].i = 0 } tb.t[last] = nil tb.t = tb.t[:last] if last > 0 { if !siftdownTimer(tb.t, 0) { ok = false } } t.i = -1// mark as removed } f := t.f arg := t.arg seq := t.seq unlock(&tb.lock) if !ok { badTimer() } if raceenabled { raceacquire(unsafe.Pointer(t)) } f(arg, seq) lock(&tb.lock) } if delta < 0 || faketime > 0 { // No timers left - put goroutine to sleep. tb.rescheduling = true goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1) continue } // At least one timer pending. Sleep until then. tb.sleeping = true tb.sleepUntil = now + delta noteclear(&tb.waitnote) unlock(&tb.lock) notetsleepg(&tb.waitnote, delta) } }
相比于 1.8,1.13版本中还添加了一个 modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) modtimer 函数主要做了,将 t 从 tb 中删除,然后有 重新给它 加入进去
funcmodtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) { tb := t.tb
lock(&tb.lock) _, ok := tb.deltimerLocked(t) if ok { t.when = when t.period = period t.f = f t.arg = arg t.seq = seq ok = tb.addtimerLocked(t) } unlock(&tb.lock) if !ok { badTimer() } }
// Package time knows the layout of this structure. // If this struct changes, adjust ../time/sleep.go:/runtimeTimer. type timer struct { // If this timer is on a heap, which P's heap it is on. // puintptr rather than *p to match uintptr in the versions // of this struct defined in other packages. pp puintptr
// Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr
// What to set the when field to in timerModifiedXX status. nextwhen int64
// The status field holds one of the values below. status uint32 }
type p struct { id int32 (...) // Lock for timers. We normally access the timers while running // on this P, but the scheduler can also do it from a different P. // 讲道理,你p处理本地的 timer 用锁干什么? // 1.14 是可以偷 timer 的,这时候就变成了共享资源,访问的时候是一定要加锁的。 // 上边注释(英文)说的也很清楚,这个是官方的解释 timersLock mutex
// Actions to take at some time. This is used to implement the // standard library's time package. // Must hold timersLock to access. timers []*timer
// Number of timers in P's heap. // Modified using atomic instructions. // 记录当前 p 中 timer的总数量 numTimers uint32
// Number of timerModifiedEarlier timers on P's heap. // This should only be modified while holding timersLock, // or while the timer status is in a transient state // such as timerModifying. // P 中 调整 when 的时间提前了的 timer 数量 adjustTimers uint32
// Number of timerDeleted timers in P's heap. // Modified using atomic instructions. // 记录 p 中被删除的 timer 数量 deletedTimers uint32
// Race context used while executing timer functions. timerRaceCtx uintptr (...) }
(...) if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available }
(...) execute(gp, inheritTime) }
可以得出结论,新版本的“消费者” 从 goroutine 级别 转变到 函数级别。
checkTimers()
// checkTimers runs any timers for the P that are ready. // If now is not 0 it is the current time. // It returns the current time or 0 if it is not known, // and the time when the next timer should run or 0 if there is no next timer, // and reports whether it ran any timers. // If the time when the next timer should run is not 0, // it is always larger than the returned time. // We pass now in and out to avoid extra calls of nanotime. //go:yeswritebarrierrec // rnow // pollUntil 0 表示没有下一个 timer,非 0 表示下一个timer的等待时间 // ran 表示是否执行了 timer funccheckTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { // If there are no timers to adjust, and the first timer on // the heap is not yet ready to run, then there is nothing to do. if atomic.Load(&pp.adjustTimers) == 0 { next := int64(atomic.Load64(&pp.timer0When)) if next == 0 { return now, 0, false } if now == 0 { now = nanotime() } if now < next { // Next timer is not ready to run. // But keep going if we would clear deleted timers. // This corresponds to the condition below where // we decide whether to call clearDeletedTimers. // 尽可能找机会清理 timer if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { return now, next, false } } }
lock(&pp.timersLock)
adjusttimers(pp)
rnow = now iflen(pp.timers) > 0 { if rnow == 0 { rnow = nanotime() } forlen(pp.timers) > 0 { // Note that runtimer may temporarily unlock // pp.timersLock. if tw := runtimer(pp, rnow); tw != 0 { if tw > 0 { pollUntil = tw } break } ran = true } }
// If this is the local P, and there are a lot of deleted timers, // clear them out. We only do this for the local P to reduce // lock contention on timersLock. if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 { clearDeletedTimers(pp) }
unlock(&pp.timersLock)
return rnow, pollUntil, ran }
在上述 checkTimers 中,通过 adjusttimers 调整当前 p 的 timers 数组,我们看一下它的实现
// adjusttimers looks through the timers in the current P's heap for // any timers that have been modified to run earlier, and puts them in // the correct place in the heap. While looking for those timers, // it also moves timers that have been modified to run later, // and removes deleted timers. The caller must have locked the timers for pp. funcadjusttimers(pp *p) { // 判断当前 p 是否有 timer iflen(pp.timers) == 0 { return } if atomic.Load(&pp.adjustTimers) == 0 { if verifyTimers { verifyTimerHeap(pp) } return } // 存放需要移动的 timer var moved []*timer loop: // 遍历当前 p 的 timers for i := 0; i < len(pp.timers); i++ { t := pp.timers[i] if t.pp.ptr() != pp { throw("adjusttimers: bad p") } // 判断当前 timer 的状态 switch s := atomic.Load(&t.status); s { // 表示 timer 需要删除,但是还没有删除呢 case timerDeleted: // 修改 timer 的状态为,正在删除中 timerRemoving if atomic.Cas(&t.status, s, timerRemoving) { // 执行删除操作 dodeltimer(pp, i) // 修改 timer 的状态为,已删除 timerRemoved if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { badTimer() } // 修改待删除 timer 的数量 pp.deletedTimers - 1 atomic.Xadd(&pp.deletedTimers, -1) // Look at this heap position again. // 思考一下就可以知道,为什么需要再次检查当前这个位置的 timer // 通过 dodeltimer 将索引为 i 的 timer 删除后,我们知道的是 // 假设总数量为 n, [0, i) 之前的元素不需要改变,删掉第 I 个后 // 需要在 [i,n-1) 里边中选一个填补 i 的位置,所以需要重新检查一次 i-- } // 表示 timer 的等待时间被调整了 // timerModifiedEarlier 向前调整 // timerModifiedLater 向后调整 case timerModifiedEarlier, timerModifiedLater: // 因为调整了 timer 的时间点,所以需要重新调整该 timer 在堆中的位置 // 修改 timer 状态为,移动中 timerMoving if atomic.Cas(&t.status, s, timerMoving) { // Now we can change the when field. t.when = t.nextwhen // Take t off the heap, and hold onto it. // We don't add it back yet because the // heap manipulation could cause our // loop to skip some other timer. dodeltimer(pp, i) // 将这个 timer 加入到需要移动的 timer 当中 moved = append(moved, t) if s == timerModifiedEarlier { if n := atomic.Xadd(&pp.adjustTimers, -1); int32(n) <= 0 { break loop } } // Look at this heap position again. i-- } case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving: badTimer() case timerWaiting: // OK, nothing to do. case timerModifying: // Check again after modification is complete. osyield() i-- default: badTimer() } }
然后我们再看一下 checkTimers 函数末尾的位置,就是要真正执行 timer 的时候了,通过 runtimer 来执行 p 中的 timer
// runtimer examines the first timer in timers. If it is ready based on now, // it runs the timer and removes or updates it. // Returns 0 if it ran a timer, -1 if there are no more timers, or the time // when the first timer should run. // The caller must have locked the timers for pp. // If a timer is run, this will temporarily unlock the timers.
//go:systemstack funcruntimer(pp *p, now int64)int64 { for { t := pp.timers[0] if t.pp.ptr() != pp { throw("runtimer: bad p") } switch s := atomic.Load(&t.status); s { case timerWaiting: if t.when > now { // Not ready to run. return t.when }
if !atomic.Cas(&t.status, s, timerRunning) { continue } // Note that runOneTimer may temporarily unlock // pp.timersLock. runOneTimer(pp, t, now) return0
case timerDeleted: if !atomic.Cas(&t.status, s, timerRemoving) { continue } dodeltimer0(pp) if !atomic.Cas(&t.status, timerRemoving, timerRemoved) { badTimer() } atomic.Xadd(&pp.deletedTimers, -1) iflen(pp.timers) == 0 { return-1 }
case timerModifiedEarlier, timerModifiedLater: if !atomic.Cas(&t.status, s, timerMoving) { continue } t.when = t.nextwhen dodeltimer0(pp) doaddtimer(pp, t) if s == timerModifiedEarlier { atomic.Xadd(&pp.adjustTimers, -1) } if !atomic.Cas(&t.status, timerMoving, timerWaiting) { badTimer() }
case timerModifying: // Wait for modification to complete. osyield()
case timerNoStatus, timerRemoved: // Should not see a new or inactive timer on the heap. badTimer() case timerRunning, timerRemoving, timerMoving: // These should only be set when timers are locked, // and we didn't do it. badTimer() default: badTimer() } } }
截止到目前为止,我们已经把 checkTimers 给分析完了。
偷 timer
这里的偷 timer 不是说把 另一个 p 的 timer 偷到我本地后再执行,而是在当前这个 p ,执行其他 p timer。
// 截取部分 findrunnable 代码 for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { (...) if i > 2 && shouldStealTimers(p2) { tnow, w, ran := checkTimers(p2, now) (...) } } }