// Copyright 2018 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package kernel // CPU scheduling, real and fake. import ( "fmt" "math/rand" "sync/atomic" "time" "gvisor.googlesource.com/gvisor/pkg/abi/linux" "gvisor.googlesource.com/gvisor/pkg/sentry/hostcpu" "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/sched" ktime "gvisor.googlesource.com/gvisor/pkg/sentry/kernel/time" "gvisor.googlesource.com/gvisor/pkg/sentry/limits" "gvisor.googlesource.com/gvisor/pkg/sentry/usage" "gvisor.googlesource.com/gvisor/pkg/syserror" ) // TaskGoroutineState is a coarse representation of the current execution // status of a kernel.Task goroutine. type TaskGoroutineState int const ( // TaskGoroutineNonexistent indicates that the task goroutine has either // not yet been created by Task.Start() or has returned from Task.run(). // This must be the zero value for TaskGoroutineState. TaskGoroutineNonexistent TaskGoroutineState = iota // TaskGoroutineRunningSys indicates that the task goroutine is executing // sentry code. TaskGoroutineRunningSys // TaskGoroutineRunningApp indicates that the task goroutine is executing // application code. TaskGoroutineRunningApp // TaskGoroutineBlockedInterruptible indicates that the task goroutine is // blocked in Task.block(), and hence may be woken by Task.interrupt() // (e.g. due to signal delivery). TaskGoroutineBlockedInterruptible // TaskGoroutineBlockedUninterruptible indicates that the task goroutine is // stopped outside of Task.block() and Task.doStop(), and hence cannot be // woken by Task.interrupt(). TaskGoroutineBlockedUninterruptible // TaskGoroutineStopped indicates that the task goroutine is blocked in // Task.doStop(). TaskGoroutineStopped is similar to // TaskGoroutineBlockedUninterruptible, but is a separate state to make it // possible to determine when Task.stop is meaningful. TaskGoroutineStopped ) // TaskGoroutineSchedInfo contains task goroutine scheduling state which must // be read and updated atomically. // // +stateify savable type TaskGoroutineSchedInfo struct { // Timestamp was the value of Kernel.cpuClock when this // TaskGoroutineSchedInfo was last updated. Timestamp uint64 // State is the current state of the task goroutine. State TaskGoroutineState // UserTicks is the amount of time the task goroutine has spent executing // its associated Task's application code, in units of linux.ClockTick. UserTicks uint64 // SysTicks is the amount of time the task goroutine has spent executing in // the sentry, in units of linux.ClockTick. SysTicks uint64 } // userTicksAt returns the extrapolated value of ts.UserTicks after // Kernel.CPUClockNow() indicates a time of now. // // Preconditions: now <= Kernel.CPUClockNow(). (Since Kernel.cpuClock is // monotonic, this is satisfied if now is the result of a previous call to // Kernel.CPUClockNow().) This requirement exists because otherwise a racing // change to t.gosched can cause userTicksAt to adjust stats by too much, // making the observed stats non-monotonic. func (ts *TaskGoroutineSchedInfo) userTicksAt(now uint64) uint64 { if ts.Timestamp < now && ts.State == TaskGoroutineRunningApp { // Update stats to reflect execution since the last update. return ts.UserTicks + (now - ts.Timestamp) } return ts.UserTicks } // sysTicksAt returns the extrapolated value of ts.SysTicks after // Kernel.CPUClockNow() indicates a time of now. // // Preconditions: As for userTicksAt. func (ts *TaskGoroutineSchedInfo) sysTicksAt(now uint64) uint64 { if ts.Timestamp < now && ts.State == TaskGoroutineRunningSys { return ts.SysTicks + (now - ts.Timestamp) } return ts.SysTicks } // Preconditions: The caller must be running on the task goroutine. func (t *Task) accountTaskGoroutineEnter(state TaskGoroutineState) { now := t.k.CPUClockNow() if t.gosched.State != TaskGoroutineRunningSys { panic(fmt.Sprintf("Task goroutine switching from state %v (expected %v) to %v", t.gosched.State, TaskGoroutineRunningSys, state)) } t.goschedSeq.BeginWrite() // This function is very hot; avoid defer. t.gosched.SysTicks += now - t.gosched.Timestamp t.gosched.Timestamp = now t.gosched.State = state t.goschedSeq.EndWrite() } // Preconditions: The caller must be running on the task goroutine, and leaving // a state indicated by a previous call to // t.accountTaskGoroutineEnter(state). func (t *Task) accountTaskGoroutineLeave(state TaskGoroutineState) { now := t.k.CPUClockNow() if t.gosched.State != state { panic(fmt.Sprintf("Task goroutine switching from state %v (expected %v) to %v", t.gosched.State, state, TaskGoroutineRunningSys)) } t.goschedSeq.BeginWrite() // This function is very hot; avoid defer. if state == TaskGoroutineRunningApp { t.gosched.UserTicks += now - t.gosched.Timestamp } t.gosched.Timestamp = now t.gosched.State = TaskGoroutineRunningSys t.goschedSeq.EndWrite() } // TaskGoroutineSchedInfo returns a copy of t's task goroutine scheduling info. // Most clients should use t.CPUStats() instead. func (t *Task) TaskGoroutineSchedInfo() TaskGoroutineSchedInfo { return SeqAtomicLoadTaskGoroutineSchedInfo(&t.goschedSeq, &t.gosched) } // CPUStats returns the CPU usage statistics of t. func (t *Task) CPUStats() usage.CPUStats { return t.cpuStatsAt(t.k.CPUClockNow()) } // Preconditions: As for TaskGoroutineSchedInfo.userTicksAt. func (t *Task) cpuStatsAt(now uint64) usage.CPUStats { tsched := t.TaskGoroutineSchedInfo() return usage.CPUStats{ UserTime: time.Duration(tsched.userTicksAt(now) * uint64(linux.ClockTick)), SysTime: time.Duration(tsched.sysTicksAt(now) * uint64(linux.ClockTick)), VoluntarySwitches: atomic.LoadUint64(&t.yieldCount), } } // CPUStats returns the combined CPU usage statistics of all past and present // threads in tg. func (tg *ThreadGroup) CPUStats() usage.CPUStats { tg.pidns.owner.mu.RLock() defer tg.pidns.owner.mu.RUnlock() // Hack to get a pointer to the Kernel. if tg.leader == nil { // Per comment on tg.leader, this is only possible if nothing in the // ThreadGroup has ever executed anyway. return usage.CPUStats{} } return tg.cpuStatsAtLocked(tg.leader.k.CPUClockNow()) } // Preconditions: As for TaskGoroutineSchedInfo.userTicksAt. The TaskSet mutex // must be locked. func (tg *ThreadGroup) cpuStatsAtLocked(now uint64) usage.CPUStats { stats := tg.exitedCPUStats // Account for live tasks. for t := tg.tasks.Front(); t != nil; t = t.Next() { stats.Accumulate(t.cpuStatsAt(now)) } return stats } // JoinedChildCPUStats implements the semantics of RUSAGE_CHILDREN: "Return // resource usage statistics for all children of [tg] that have terminated and // been waited for. These statistics will include the resources used by // grandchildren, and further removed descendants, if all of the intervening // descendants waited on their terminated children." func (tg *ThreadGroup) JoinedChildCPUStats() usage.CPUStats { tg.pidns.owner.mu.RLock() defer tg.pidns.owner.mu.RUnlock() return tg.childCPUStats } // taskClock is a ktime.Clock that measures the time that a task has spent // executing. taskClock is primarily used to implement CLOCK_THREAD_CPUTIME_ID. // // +stateify savable type taskClock struct { t *Task // If includeSys is true, the taskClock includes both time spent executing // application code as well as time spent in the sentry. Otherwise, the // taskClock includes only time spent executing application code. includeSys bool // Implements waiter.Waitable. TimeUntil wouldn't change its estimation // based on either of the clock events, so there's no event to be // notified for. ktime.NoClockEvents `state:"nosave"` // Implements ktime.Clock.WallTimeUntil. // // As an upper bound, a task's clock cannot advance faster than CPU // time. It would have to execute at a rate of more than 1 task-second // per 1 CPU-second, which isn't possible. ktime.WallRateClock `state:"nosave"` } // UserCPUClock returns a clock measuring the CPU time the task has spent // executing application code. func (t *Task) UserCPUClock() ktime.Clock { return &taskClock{t: t, includeSys: false} } // CPUClock returns a clock measuring the CPU time the task has spent executing // application and "kernel" code. func (t *Task) CPUClock() ktime.Clock { return &taskClock{t: t, includeSys: true} } // Now implements ktime.Clock.Now. func (tc *taskClock) Now() ktime.Time { stats := tc.t.CPUStats() if tc.includeSys { return ktime.FromNanoseconds((stats.UserTime + stats.SysTime).Nanoseconds()) } return ktime.FromNanoseconds(stats.UserTime.Nanoseconds()) } // tgClock is a ktime.Clock that measures the time a thread group has spent // executing. tgClock is primarily used to implement CLOCK_PROCESS_CPUTIME_ID. // // +stateify savable type tgClock struct { tg *ThreadGroup // If includeSys is true, the tgClock includes both time spent executing // application code as well as time spent in the sentry. Otherwise, the // tgClock includes only time spent executing application code. includeSys bool // Implements waiter.Waitable. ktime.ClockEventsQueue `state:"nosave"` } // Now implements ktime.Clock.Now. func (tgc *tgClock) Now() ktime.Time { stats := tgc.tg.CPUStats() if tgc.includeSys { return ktime.FromNanoseconds((stats.UserTime + stats.SysTime).Nanoseconds()) } return ktime.FromNanoseconds(stats.UserTime.Nanoseconds()) } // WallTimeUntil implements ktime.Clock.WallTimeUntil. func (tgc *tgClock) WallTimeUntil(t, now ktime.Time) time.Duration { // Thread group CPU time should not exceed wall time * live tasks, since // task goroutines exit after the transition to TaskExitZombie in // runExitNotify. tgc.tg.pidns.owner.mu.RLock() n := tgc.tg.liveTasks tgc.tg.pidns.owner.mu.RUnlock() if n == 0 { if t.Before(now) { return 0 } // The timer tick raced with thread group exit, after which no more // tasks can enter the thread group. So tgc.Now() will never advance // again. Return a large delay; the timer should be stopped long before // it comes again anyway. return time.Hour } // This is a lower bound on the amount of time that can elapse before an // associated timer expires, so returning this value tends to result in a // sequence of closely-spaced ticks just before timer expiry. To avoid // this, round up to the nearest ClockTick; CPU usage measurements are // limited to this resolution anyway. remaining := time.Duration(t.Sub(now).Nanoseconds()/int64(n)) * time.Nanosecond return ((remaining + (linux.ClockTick - time.Nanosecond)) / linux.ClockTick) * linux.ClockTick } // UserCPUClock returns a ktime.Clock that measures the time that a thread // group has spent executing. func (tg *ThreadGroup) UserCPUClock() ktime.Clock { return &tgClock{tg: tg, includeSys: false} } // CPUClock returns a ktime.Clock that measures the time that a thread group // has spent executing, including sentry time. func (tg *ThreadGroup) CPUClock() ktime.Clock { return &tgClock{tg: tg, includeSys: true} } type kernelCPUClockTicker struct { k *Kernel // These are essentially kernelCPUClockTicker.Notify local variables that // are cached between calls to reduce allocations. rng *rand.Rand tgs []*ThreadGroup } func newKernelCPUClockTicker(k *Kernel) *kernelCPUClockTicker { return &kernelCPUClockTicker{ k: k, rng: rand.New(rand.NewSource(rand.Int63())), } } // Notify implements ktime.TimerListener.Notify. func (ticker *kernelCPUClockTicker) Notify(exp uint64) { // Only increment cpuClock by 1 regardless of the number of expirations. // This approximately compensates for cases where thread throttling or bad // Go runtime scheduling prevents the kernelCPUClockTicker goroutine, and // presumably task goroutines as well, from executing for a long period of // time. It's also necessary to prevent CPU clocks from seeing large // discontinuous jumps. now := atomic.AddUint64(&ticker.k.cpuClock, 1) // Check thread group CPU timers. tgs := ticker.k.tasks.Root.ThreadGroupsAppend(ticker.tgs) for _, tg := range tgs { if atomic.LoadUint32(&tg.cpuTimersEnabled) == 0 { continue } ticker.k.tasks.mu.RLock() if tg.leader == nil { // No tasks have ever run in this thread group. ticker.k.tasks.mu.RUnlock() continue } // Accumulate thread group CPU stats, and randomly select running tasks // using reservoir sampling to receive CPU timer signals. var virtReceiver *Task nrVirtCandidates := 0 var profReceiver *Task nrProfCandidates := 0 tgUserTime := tg.exitedCPUStats.UserTime tgSysTime := tg.exitedCPUStats.SysTime for t := tg.tasks.Front(); t != nil; t = t.Next() { tsched := t.TaskGoroutineSchedInfo() tgUserTime += time.Duration(tsched.userTicksAt(now) * uint64(linux.ClockTick)) tgSysTime += time.Duration(tsched.sysTicksAt(now) * uint64(linux.ClockTick)) switch tsched.State { case TaskGoroutineRunningApp: // Considered by ITIMER_VIRT, ITIMER_PROF, and RLIMIT_CPU // timers. nrVirtCandidates++ if int(randInt31n(ticker.rng, int32(nrVirtCandidates))) == 0 { virtReceiver = t } fallthrough case TaskGoroutineRunningSys: // Considered by ITIMER_PROF and RLIMIT_CPU timers. nrProfCandidates++ if int(randInt31n(ticker.rng, int32(nrProfCandidates))) == 0 { profReceiver = t } } } tgVirtNow := ktime.FromNanoseconds(tgUserTime.Nanoseconds()) tgProfNow := ktime.FromNanoseconds((tgUserTime + tgSysTime).Nanoseconds()) // All of the following are standard (not real-time) signals, which are // automatically deduplicated, so we ignore the number of expirations. tg.signalHandlers.mu.Lock() // It should only be possible for these timers to advance if we found // at least one running task. if virtReceiver != nil { // ITIMER_VIRTUAL newItimerVirtSetting, exp := tg.itimerVirtSetting.At(tgVirtNow) tg.itimerVirtSetting = newItimerVirtSetting if exp != 0 { virtReceiver.sendSignalLocked(sigPriv(linux.SIGVTALRM), true) } } if profReceiver != nil { // ITIMER_PROF newItimerProfSetting, exp := tg.itimerProfSetting.At(tgProfNow) tg.itimerProfSetting = newItimerProfSetting if exp != 0 { profReceiver.sendSignalLocked(sigPriv(linux.SIGPROF), true) } // RLIMIT_CPU soft limit newRlimitCPUSoftSetting, exp := tg.rlimitCPUSoftSetting.At(tgProfNow) tg.rlimitCPUSoftSetting = newRlimitCPUSoftSetting if exp != 0 { profReceiver.sendSignalLocked(sigPriv(linux.SIGXCPU), true) } // RLIMIT_CPU hard limit rlimitCPUMax := tg.limits.Get(limits.CPU).Max if rlimitCPUMax != limits.Infinity && !tgProfNow.Before(ktime.FromSeconds(int64(rlimitCPUMax))) { profReceiver.sendSignalLocked(sigPriv(linux.SIGKILL), true) } } tg.signalHandlers.mu.Unlock() ticker.k.tasks.mu.RUnlock() } // Retain tgs between calls to Notify to reduce allocations. for i := range tgs { tgs[i] = nil } ticker.tgs = tgs[:0] } // Destroy implements ktime.TimerListener.Destroy. func (ticker *kernelCPUClockTicker) Destroy() { } // randInt31n returns a random integer in [0, n). // // randInt31n is equivalent to math/rand.Rand.int31n(), which is unexported. // See that function for details. func randInt31n(rng *rand.Rand, n int32) int32 { v := rng.Uint32() prod := uint64(v) * uint64(n) low := uint32(prod) if low < uint32(n) { thresh := uint32(-n) % uint32(n) for low < thresh { v = rng.Uint32() prod = uint64(v) * uint64(n) low = uint32(prod) } } return int32(prod >> 32) } // NotifyRlimitCPUUpdated is called by setrlimit. // // Preconditions: The caller must be running on the task goroutine. func (t *Task) NotifyRlimitCPUUpdated() { t.k.cpuClockTicker.Atomically(func() { t.tg.pidns.owner.mu.RLock() defer t.tg.pidns.owner.mu.RUnlock() t.tg.signalHandlers.mu.Lock() defer t.tg.signalHandlers.mu.Unlock() rlimitCPU := t.tg.limits.Get(limits.CPU) t.tg.rlimitCPUSoftSetting = ktime.Setting{ Enabled: rlimitCPU.Cur != limits.Infinity, Next: ktime.FromNanoseconds((time.Duration(rlimitCPU.Cur) * time.Second).Nanoseconds()), Period: time.Second, } if rlimitCPU.Max != limits.Infinity { // Check if tg is already over the hard limit. tgcpu := t.tg.cpuStatsAtLocked(t.k.CPUClockNow()) tgProfNow := ktime.FromNanoseconds((tgcpu.UserTime + tgcpu.SysTime).Nanoseconds()) if !tgProfNow.Before(ktime.FromSeconds(int64(rlimitCPU.Max))) { t.sendSignalLocked(sigPriv(linux.SIGKILL), true) } } t.tg.updateCPUTimersEnabledLocked() }) } // Preconditions: The signal mutex must be locked. func (tg *ThreadGroup) updateCPUTimersEnabledLocked() { rlimitCPU := tg.limits.Get(limits.CPU) if tg.itimerVirtSetting.Enabled || tg.itimerProfSetting.Enabled || tg.rlimitCPUSoftSetting.Enabled || rlimitCPU.Max != limits.Infinity { atomic.StoreUint32(&tg.cpuTimersEnabled, 1) } else { atomic.StoreUint32(&tg.cpuTimersEnabled, 0) } } // StateStatus returns a string representation of the task's current state, // appropriate for /proc/[pid]/status. func (t *Task) StateStatus() string { switch s := t.TaskGoroutineSchedInfo().State; s { case TaskGoroutineNonexistent: t.tg.pidns.owner.mu.RLock() defer t.tg.pidns.owner.mu.RUnlock() switch t.exitState { case TaskExitZombie: return "Z (zombie)" case TaskExitDead: return "X (dead)" default: // The task goroutine can't exit before passing through // runExitNotify, so this indicates that the task has been created, // but the task goroutine hasn't yet started. The Linux equivalent // is struct task_struct::state == TASK_NEW // (kernel/fork.c:copy_process() => // kernel/sched/core.c:sched_fork()), but the TASK_NEW bit is // masked out by TASK_REPORT for /proc/[pid]/status, leaving only // TASK_RUNNING. return "R (running)" } case TaskGoroutineRunningSys, TaskGoroutineRunningApp: return "R (running)" case TaskGoroutineBlockedInterruptible: return "S (sleeping)" case TaskGoroutineStopped: t.tg.signalHandlers.mu.Lock() defer t.tg.signalHandlers.mu.Unlock() switch t.stop.(type) { case *groupStop: return "T (stopped)" case *ptraceStop: return "t (tracing stop)" } fallthrough case TaskGoroutineBlockedUninterruptible: // This is the name Linux uses for TASK_UNINTERRUPTIBLE and // TASK_KILLABLE (= TASK_UNINTERRUPTIBLE | TASK_WAKEKILL): // fs/proc/array.c:task_state_array. return "D (disk sleep)" default: panic(fmt.Sprintf("Invalid TaskGoroutineState: %v", s)) } } // CPUMask returns a copy of t's allowed CPU mask. func (t *Task) CPUMask() sched.CPUSet { t.mu.Lock() defer t.mu.Unlock() return t.allowedCPUMask.Copy() } // SetCPUMask sets t's allowed CPU mask based on mask. It takes ownership of // mask. // // Preconditions: mask.Size() == // sched.CPUSetSize(t.Kernel().ApplicationCores()). func (t *Task) SetCPUMask(mask sched.CPUSet) error { if want := sched.CPUSetSize(t.k.applicationCores); mask.Size() != want { panic(fmt.Sprintf("Invalid CPUSet %v (expected %d bytes)", mask, want)) } // Remove CPUs in mask above Kernel.applicationCores. mask.ClearAbove(t.k.applicationCores) // Ensure that at least 1 CPU is still allowed. if mask.NumCPUs() == 0 { return syserror.EINVAL } if t.k.useHostCores { // No-op; pretend the mask was immediately changed back. return nil } t.tg.pidns.owner.mu.RLock() rootTID := t.tg.pidns.owner.Root.tids[t] t.tg.pidns.owner.mu.RUnlock() t.mu.Lock() defer t.mu.Unlock() t.allowedCPUMask = mask atomic.StoreInt32(&t.cpu, assignCPU(mask, rootTID)) return nil } // CPU returns the cpu id for a given task. func (t *Task) CPU() int32 { if t.k.useHostCores { return int32(hostcpu.GetCPU()) } return atomic.LoadInt32(&t.cpu) } // assignCPU returns the virtualized CPU number for the task with global TID // tid and allowedCPUMask allowed. func assignCPU(allowed sched.CPUSet, tid ThreadID) (cpu int32) { // To pretend that threads are evenly distributed to allowed CPUs, choose n // to be less than the number of CPUs in allowed ... n := int(tid) % int(allowed.NumCPUs()) // ... then pick the nth CPU in allowed. allowed.ForEachCPU(func(c uint) { if n--; n == 0 { cpu = int32(c) } }) return cpu } // Niceness returns t's niceness. func (t *Task) Niceness() int { t.mu.Lock() defer t.mu.Unlock() return t.niceness } // Priority returns t's priority. func (t *Task) Priority() int { t.mu.Lock() defer t.mu.Unlock() return t.niceness + 20 } // SetNiceness sets t's niceness to n. func (t *Task) SetNiceness(n int) { t.mu.Lock() defer t.mu.Unlock() t.niceness = n } // NumaPolicy returns t's current numa policy. func (t *Task) NumaPolicy() (policy int32, nodeMask uint32) { t.mu.Lock() defer t.mu.Unlock() return t.numaPolicy, t.numaNodeMask } // SetNumaPolicy sets t's numa policy. func (t *Task) SetNumaPolicy(policy int32, nodeMask uint32) { t.mu.Lock() defer t.mu.Unlock() t.numaPolicy = policy t.numaNodeMask = nodeMask }