// Copyright 2018 Google Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package futex provides an implementation of the futex interface as found in // the Linux kernel. It allows one to easily transform Wait() calls into waits // on a channel, which is useful in a Go-based kernel, for example. package futex import ( "sync" "sync/atomic" "gvisor.googlesource.com/gvisor/pkg/syserror" ) // Checker abstracts memory accesses. This is useful because the "addresses" // used in this package may not be real addresses (they could be indices of an // array, for example), or they could be mapped via some special mechanism. // // TODO: Replace this with usermem.IO. type Checker interface { // Check should validate that given address contains the given value. // If it does not contain the value, syserror.EAGAIN must be returned. // Any other error may be returned, which will be propagated. Check(addr uintptr, val uint32) error // Op should atomically perform the operation encoded in op on the data // pointed to by addr, then apply the comparison encoded in op to the // original value at addr, returning the result. // Note that op is an opaque operation whose behaviour is defined // outside of the futex manager. Op(addr uintptr, op uint32) (bool, error) } // Waiter is the struct which gets enqueued into buckets for wake up routines // and requeue routines to scan and notify. Once a Waiter has been enqueued by // WaitPrepare(), callers may listen on C for wake up events. type Waiter struct { // Synchronization: // // - A Waiter that is not enqueued in a bucket is exclusively owned (no // synchronization applies). // // - A Waiter is enqueued in a bucket by calling WaitPrepare(). After this, // waiterEntry, complete, and addr are protected by the bucket.mu ("bucket // lock") of the containing bucket, and bitmask is immutable. complete and // addr are additionally mutated using atomic memory operations, ensuring // that they can be read using atomic memory operations without holding the // bucket lock. // // - A Waiter is only guaranteed to be no longer queued after calling // WaitComplete(). // waiterEntry links Waiter into bucket.waiters. waiterEntry // complete is 1 if the Waiter was removed from its bucket by a wakeup and // 0 otherwise. complete int32 // C is sent to when the Waiter is woken. C chan struct{} // addr is the address being waited on. addr uintptr // The bitmask we're waiting on. // This is used the case of a FUTEX_WAKE_BITSET. bitmask uint32 } // NewWaiter returns a new unqueued Waiter. func NewWaiter() *Waiter { return &Waiter{ C: make(chan struct{}, 1), } } // bucket holds a list of waiters for a given address hash. type bucket struct { // mu protects waiters and contained Waiter state. See comment in Waiter. mu sync.Mutex `state:"nosave"` waiters waiterList `state:"zerovalue"` } // wakeLocked wakes up to n waiters matching the bitmask at the addr for this // bucket and returns the number of waiters woken. // // Preconditions: b.mu must be locked. func (b *bucket) wakeLocked(addr uintptr, bitmask uint32, n int) int { done := 0 for w := b.waiters.Front(); done < n && w != nil; { if w.addr != addr || w.bitmask&bitmask == 0 { // Not matching. w = w.Next() continue } // Remove from the bucket and wake the waiter. woke := w w = w.Next() // Next iteration. b.waiters.Remove(woke) woke.C <- struct{}{} // NOTE: The above channel write establishes a write barrier // according to the memory model, so nothing may be ordered // around it. Since we've dequeued w and will never touch it // again, we can safely store 1 to w.complete here and allow // the WaitComplete() to short-circuit grabbing the bucket // lock. If they somehow miss the w.complete, we are still // holding the lock, so we can know that they won't dequeue w, // assume it's free and have the below operation afterwards. atomic.StoreInt32(&woke.complete, 1) done++ } return done } // requeueLocked takes n waiters from the bucket and moves them to naddr on the // bucket "to". // // Preconditions: b and to must be locked. func (b *bucket) requeueLocked(to *bucket, addr, naddr uintptr, n int) int { done := 0 for w := b.waiters.Front(); done < n && w != nil; { if w.addr != addr { // Not matching. w = w.Next() continue } requeued := w w = w.Next() // Next iteration. b.waiters.Remove(requeued) atomic.StoreUintptr(&requeued.addr, naddr) to.waiters.PushBack(requeued) done++ } return done } const ( // bucketCount is the number of buckets per Manager. By having many of // these we reduce contention when concurrent yet unrelated calls are made. bucketCount = 1 << bucketCountBits bucketCountBits = 10 ) func checkAddr(addr uintptr) error { // Ensure the address is aligned. // It must be a DWORD boundary. if addr&0x3 != 0 { return syserror.EINVAL } return nil } // bucketIndexForAddr returns the index into Manager.buckets for addr. func bucketIndexForAddr(addr uintptr) uintptr { // - The bottom 2 bits of addr must be 0, per checkAddr. // // - On amd64, the top 16 bits of addr (bits 48-63) must be equal to bit 47 // for a canonical address, and (on all existing platforms) bit 47 must be // 0 for an application address. // // Thus 19 bits of addr are "useless" for hashing, leaving only 45 "useful" // bits. We choose one of the simplest possible hash functions that at // least uses all 45 useful bits in the output, given that bucketCountBits // == 10. This hash function also has the property that it will usually map // adjacent addresses to adjacent buckets, slightly improving memory // locality when an application synchronization structure uses multiple // nearby futexes. // // Note that despite the large number of arithmetic operations in the // function, many components can be computed in parallel, such that the // critical path is 1 bit shift + 3 additions (2 in h1, then h1 + h2). This // is also why h1 and h2 are grouped separately; for "(addr >> 2) + ... + // (addr >> 42)" without any additional grouping, the compiler puts all 4 // additions in the critical path. h1 := (addr >> 2) + (addr >> 12) + (addr >> 22) h2 := (addr >> 32) + (addr >> 42) return (h1 + h2) % bucketCount } // Manager holds futex state for a single virtual address space. // // +stateify savable type Manager struct { buckets [bucketCount]bucket `state:"zerovalue"` } // NewManager returns an initialized futex manager. // N.B. we use virtual address to tag futexes, so it only works for private // (within a single process) futex. func NewManager() *Manager { return &Manager{} } // lockBucket returns a locked bucket for the given addr. // // Preconditions: checkAddr(addr) == nil. func (m *Manager) lockBucket(addr uintptr) *bucket { b := &m.buckets[bucketIndexForAddr(addr)] b.mu.Lock() return b } // lockBuckets returns locked buckets for the given addrs. // // Preconditions: checkAddr(addr1) == checkAddr(addr2) == nil. func (m *Manager) lockBuckets(addr1 uintptr, addr2 uintptr) (*bucket, *bucket) { i1 := bucketIndexForAddr(addr1) i2 := bucketIndexForAddr(addr2) b1 := &m.buckets[i1] b2 := &m.buckets[i2] // Ensure that buckets are locked in a consistent order (lowest index // first) to avoid circular locking. switch { case i1 < i2: b1.mu.Lock() b2.mu.Lock() case i2 < i1: b2.mu.Lock() b1.mu.Lock() default: b1.mu.Lock() } return b1, b2 } // Wake wakes up to n waiters matching the bitmask on the given addr. // The number of waiters woken is returned. func (m *Manager) Wake(addr uintptr, bitmask uint32, n int) (int, error) { if err := checkAddr(addr); err != nil { return 0, err } b := m.lockBucket(addr) // This function is very hot; avoid defer. r := b.wakeLocked(addr, bitmask, n) b.mu.Unlock() return r, nil } func (m *Manager) doRequeue(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) { if err := checkAddr(addr); err != nil { return 0, err } if err := checkAddr(naddr); err != nil { return 0, err } b1, b2 := m.lockBuckets(addr, naddr) defer b1.mu.Unlock() if b2 != b1 { defer b2.mu.Unlock() } // Check our value. // This only applied for RequeueCmp(). if c != nil { if err := c.Check(addr, val); err != nil { return 0, err } } // Wake the number required. done := b1.wakeLocked(addr, ^uint32(0), nwake) // Requeue the number required. b1.requeueLocked(b2, addr, naddr, nreq) return done, nil } // Requeue wakes up to nwake waiters on the given addr, and unconditionally // requeues up to nreq waiters on naddr. func (m *Manager) Requeue(addr uintptr, naddr uintptr, nwake int, nreq int) (int, error) { return m.doRequeue(nil, addr, 0, naddr, nwake, nreq) } // RequeueCmp atomically checks that the addr contains val (via the Checker), // wakes up to nwake waiters on addr and then unconditionally requeues nreq // waiters on naddr. func (m *Manager) RequeueCmp(c Checker, addr uintptr, val uint32, naddr uintptr, nwake int, nreq int) (int, error) { return m.doRequeue(c, addr, val, naddr, nwake, nreq) } // WakeOp atomically applies op to the memory address addr2, wakes up to nwake1 // waiters unconditionally from addr1, and, based on the original value at addr2 // and a comparison encoded in op, wakes up to nwake2 waiters from addr2. // It returns the total number of waiters woken. func (m *Manager) WakeOp(c Checker, addr1 uintptr, addr2 uintptr, nwake1 int, nwake2 int, op uint32) (int, error) { if err := checkAddr(addr1); err != nil { return 0, err } if err := checkAddr(addr2); err != nil { return 0, err } b1, b2 := m.lockBuckets(addr1, addr2) done := 0 cond, err := c.Op(addr2, op) if err == nil { // Wake up up to nwake1 entries from the first bucket. done = b1.wakeLocked(addr1, ^uint32(0), nwake1) // Wake up up to nwake2 entries from the second bucket if the // operation yielded true. if cond { done += b2.wakeLocked(addr2, ^uint32(0), nwake2) } } b1.mu.Unlock() if b2 != b1 { b2.mu.Unlock() } return done, err } // WaitPrepare atomically checks that addr contains val (via the Checker), then // enqueues w to be woken by a send to w.C. If WaitPrepare returns nil, the // Waiter must be subsequently removed by calling WaitComplete, whether or not // a wakeup is received on w.C. func (m *Manager) WaitPrepare(w *Waiter, c Checker, addr uintptr, val uint32, bitmask uint32) error { if err := checkAddr(addr); err != nil { return err } // Prepare the Waiter before taking the bucket lock. w.complete = 0 select { case <-w.C: default: } w.addr = addr w.bitmask = bitmask b := m.lockBucket(addr) // This function is very hot; avoid defer. // Perform our atomic check. if err := c.Check(addr, val); err != nil { b.mu.Unlock() return err } // Add the waiter to the bucket. b.waiters.PushBack(w) b.mu.Unlock() return nil } // WaitComplete must be called when a Waiter previously added by WaitPrepare is // no longer eligible to be woken. func (m *Manager) WaitComplete(w *Waiter) { // Can we short-circuit acquiring the lock? // This is the happy path where a notification // was received and we don't need to dequeue this // waiter from any list (or take any locks). if atomic.LoadInt32(&w.complete) != 0 { return } // Take the bucket lock. Note that without holding the bucket lock, the // waiter is not guaranteed to stay in that bucket, so after we take the // bucket lock, we must ensure that the bucket hasn't changed: if it // happens to have changed, we release the old bucket lock and try again // with the new bucket; if it hasn't changed, we know it won't change now // because we hold the lock. var b *bucket for { addr := atomic.LoadUintptr(&w.addr) b = m.lockBucket(addr) // We still have to use an atomic load here, because if w was racily // requeued then w.addr is not protected by b.mu. if addr == atomic.LoadUintptr(&w.addr) { break } b.mu.Unlock() } // Remove waiter from the bucket. w.complete can only be stored with b.mu // locked, so this load doesn't need to use sync/atomic. if w.complete == 0 { b.waiters.Remove(w) } b.mu.Unlock() }