TTY: Fix data race where calls into tty.queue's waiter were not synchronized.
Now, there's a waiter for each end (master and slave) of the TTY, and each waiter.Entry is only enqueued in one of the waiters. PiperOrigin-RevId: 208734483 Change-Id: I06996148f123075f8dd48cde5a553e2be74c6dce
This commit is contained in:
parent
12a4912aed
commit
d4939f6dc2
|
@ -23,6 +23,7 @@ import (
|
|||
"gvisor.googlesource.com/gvisor/pkg/sentry/arch"
|
||||
"gvisor.googlesource.com/gvisor/pkg/sentry/context"
|
||||
"gvisor.googlesource.com/gvisor/pkg/sentry/usermem"
|
||||
"gvisor.googlesource.com/gvisor/pkg/syserror"
|
||||
"gvisor.googlesource.com/gvisor/pkg/waiter"
|
||||
)
|
||||
|
||||
|
@ -90,6 +91,12 @@ type lineDiscipline struct {
|
|||
// column is the location in a row of the cursor. This is important for
|
||||
// handling certain special characters like backspace.
|
||||
column int
|
||||
|
||||
// masterWaiter is used to wait on the master end of the TTY.
|
||||
masterWaiter waiter.Queue `state:"zerovalue"`
|
||||
|
||||
// slaveWaiter is used to wait on the slave end of the TTY.
|
||||
slaveWaiter waiter.Queue `state:"zerovalue"`
|
||||
}
|
||||
|
||||
func newLineDiscipline(termios linux.KernelTermios) *lineDiscipline {
|
||||
|
@ -127,7 +134,9 @@ func (l *lineDiscipline) setTermios(ctx context.Context, io usermem.IO, args arc
|
|||
// buffer to its read buffer. Anything already in the read buffer is
|
||||
// now readable.
|
||||
if oldCanonEnabled && !l.termios.LEnabled(linux.ICANON) {
|
||||
l.inQueue.pushWaitBuf(l)
|
||||
if n := l.inQueue.pushWaitBuf(l); n > 0 {
|
||||
l.slaveWaiter.Notify(waiter.EventIn)
|
||||
}
|
||||
}
|
||||
|
||||
return 0, err
|
||||
|
@ -152,13 +161,32 @@ func (l *lineDiscipline) inputQueueReadSize(ctx context.Context, io usermem.IO,
|
|||
func (l *lineDiscipline) inputQueueRead(ctx context.Context, dst usermem.IOSequence) (int64, error) {
|
||||
l.termiosMu.RLock()
|
||||
defer l.termiosMu.RUnlock()
|
||||
return l.inQueue.read(ctx, dst, l)
|
||||
n, pushed, err := l.inQueue.read(ctx, dst, l)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if n > 0 {
|
||||
l.masterWaiter.Notify(waiter.EventOut)
|
||||
if pushed {
|
||||
l.slaveWaiter.Notify(waiter.EventIn)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
return 0, syserror.ErrWouldBlock
|
||||
}
|
||||
|
||||
func (l *lineDiscipline) inputQueueWrite(ctx context.Context, src usermem.IOSequence) (int64, error) {
|
||||
l.termiosMu.RLock()
|
||||
defer l.termiosMu.RUnlock()
|
||||
return l.inQueue.write(ctx, src, l)
|
||||
n, err := l.inQueue.write(ctx, src, l)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if n > 0 {
|
||||
l.slaveWaiter.Notify(waiter.EventIn)
|
||||
return n, nil
|
||||
}
|
||||
return 0, syserror.ErrWouldBlock
|
||||
}
|
||||
|
||||
func (l *lineDiscipline) outputQueueReadSize(ctx context.Context, io usermem.IO, args arch.SyscallArguments) error {
|
||||
|
@ -168,13 +196,32 @@ func (l *lineDiscipline) outputQueueReadSize(ctx context.Context, io usermem.IO,
|
|||
func (l *lineDiscipline) outputQueueRead(ctx context.Context, dst usermem.IOSequence) (int64, error) {
|
||||
l.termiosMu.RLock()
|
||||
defer l.termiosMu.RUnlock()
|
||||
return l.outQueue.read(ctx, dst, l)
|
||||
n, pushed, err := l.outQueue.read(ctx, dst, l)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if n > 0 {
|
||||
l.slaveWaiter.Notify(waiter.EventOut)
|
||||
if pushed {
|
||||
l.masterWaiter.Notify(waiter.EventIn)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
return 0, syserror.ErrWouldBlock
|
||||
}
|
||||
|
||||
func (l *lineDiscipline) outputQueueWrite(ctx context.Context, src usermem.IOSequence) (int64, error) {
|
||||
l.termiosMu.RLock()
|
||||
defer l.termiosMu.RUnlock()
|
||||
return l.outQueue.write(ctx, src, l)
|
||||
n, err := l.outQueue.write(ctx, src, l)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if n > 0 {
|
||||
l.masterWaiter.Notify(waiter.EventIn)
|
||||
return n, nil
|
||||
}
|
||||
return 0, syserror.ErrWouldBlock
|
||||
}
|
||||
|
||||
// transformer is a helper interface to make it easier to stateify queue.
|
||||
|
@ -326,7 +373,9 @@ func (*inputQueueTransformer) transform(l *lineDiscipline, q *queue, buf []byte)
|
|||
q.readBuf.WriteRune(c)
|
||||
// Anything written to the readBuf will have to be echoed.
|
||||
if l.termios.LEnabled(linux.ECHO) {
|
||||
l.outQueue.writeBytes(cBytes, l)
|
||||
if l.outQueue.writeBytes(cBytes, l) > 0 {
|
||||
l.masterWaiter.Notify(waiter.EventIn)
|
||||
}
|
||||
}
|
||||
|
||||
// If we finish a line, make it available for reading.
|
||||
|
|
|
@ -124,14 +124,12 @@ func (mf *masterFileOperations) Release() {
|
|||
|
||||
// EventRegister implements waiter.Waitable.EventRegister.
|
||||
func (mf *masterFileOperations) EventRegister(e *waiter.Entry, mask waiter.EventMask) {
|
||||
mf.t.ld.inQueue.EventRegister(e, mask)
|
||||
mf.t.ld.outQueue.EventRegister(e, mask)
|
||||
mf.t.ld.masterWaiter.EventRegister(e, mask)
|
||||
}
|
||||
|
||||
// EventUnregister implements waiter.Waitable.EventUnregister.
|
||||
func (mf *masterFileOperations) EventUnregister(e *waiter.Entry) {
|
||||
mf.t.ld.inQueue.EventUnregister(e)
|
||||
mf.t.ld.outQueue.EventUnregister(e)
|
||||
mf.t.ld.masterWaiter.EventUnregister(e)
|
||||
}
|
||||
|
||||
// Readiness implements waiter.Waitable.Readiness.
|
||||
|
|
|
@ -38,8 +38,6 @@ type queue struct {
|
|||
// mu protects everything in queue.
|
||||
mu sync.Mutex `state:"nosave"`
|
||||
|
||||
waiter.Queue `state:"zerovalue"`
|
||||
|
||||
// readBuf is buffer of data ready to be read when readable is true.
|
||||
// This data has been processed.
|
||||
readBuf bytes.Buffer `state:".([]byte)"`
|
||||
|
@ -112,15 +110,17 @@ func (q *queue) readableSize(ctx context.Context, io usermem.IO, args arch.Sysca
|
|||
|
||||
}
|
||||
|
||||
// read reads from q to userspace.
|
||||
// read reads from q to userspace. It returns the number of bytes read as well
|
||||
// as whether the read caused more readable data to become available (whether
|
||||
// data was pushed from the wait buffer to the read buffer).
|
||||
//
|
||||
// Preconditions:
|
||||
// * l.termiosMu must be held for reading.
|
||||
func (q *queue) read(ctx context.Context, dst usermem.IOSequence, l *lineDiscipline) (int64, error) {
|
||||
func (q *queue) read(ctx context.Context, dst usermem.IOSequence, l *lineDiscipline) (int64, bool, error) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if !q.readable {
|
||||
return 0, syserror.ErrWouldBlock
|
||||
return 0, false, syserror.ErrWouldBlock
|
||||
}
|
||||
|
||||
// Read out from the read buffer.
|
||||
|
@ -133,7 +133,7 @@ func (q *queue) read(ctx context.Context, dst usermem.IOSequence, l *lineDiscipl
|
|||
}
|
||||
n, err := dst.Writer(ctx).Write(q.readBuf.Bytes()[:n])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
return 0, false, err
|
||||
}
|
||||
// Discard bytes read out.
|
||||
q.readBuf.Next(n)
|
||||
|
@ -144,16 +144,9 @@ func (q *queue) read(ctx context.Context, dst usermem.IOSequence, l *lineDiscipl
|
|||
}
|
||||
|
||||
// Move data from the queue's wait buffer to its read buffer.
|
||||
q.pushWaitBufLocked(l)
|
||||
nPushed := q.pushWaitBufLocked(l)
|
||||
|
||||
// If state changed, notify any waiters. If nothing was available to
|
||||
// read, let the caller know we could block.
|
||||
if n > 0 {
|
||||
q.Notify(waiter.EventOut)
|
||||
} else {
|
||||
return 0, syserror.ErrWouldBlock
|
||||
}
|
||||
return int64(n), nil
|
||||
return int64(n), nPushed > 0, nil
|
||||
}
|
||||
|
||||
// write writes to q from userspace.
|
||||
|
@ -169,14 +162,20 @@ func (q *queue) write(ctx context.Context, src usermem.IOSequence, l *lineDiscip
|
|||
return 0, err
|
||||
}
|
||||
b = b[:n]
|
||||
return q.writeBytes(b, l)
|
||||
|
||||
// If state changed, notify any waiters. If we were unable to write
|
||||
// anything, let the caller know we could block.
|
||||
if c := q.writeBytes(b, l); c > 0 {
|
||||
return c, nil
|
||||
}
|
||||
return 0, syserror.ErrWouldBlock
|
||||
}
|
||||
|
||||
// writeBytes writes to q from b.
|
||||
//
|
||||
// Preconditions:
|
||||
// * l.termiosMu must be held for reading.
|
||||
func (q *queue) writeBytes(b []byte, l *lineDiscipline) (int64, error) {
|
||||
func (q *queue) writeBytes(b []byte, l *lineDiscipline) int64 {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
// Write as much as possible to the read buffer.
|
||||
|
@ -185,36 +184,26 @@ func (q *queue) writeBytes(b []byte, l *lineDiscipline) (int64, error) {
|
|||
// Write remaining data to the wait buffer.
|
||||
nWaiting, _ := q.waitBuf.Write(b[n:])
|
||||
|
||||
// If state changed, notify any waiters. If we were unable to write
|
||||
// anything, let the caller know we could block.
|
||||
if n > 0 {
|
||||
q.Notify(waiter.EventIn)
|
||||
} else if nWaiting == 0 {
|
||||
return 0, syserror.ErrWouldBlock
|
||||
}
|
||||
return int64(n + nWaiting), nil
|
||||
return int64(n + nWaiting)
|
||||
}
|
||||
|
||||
// pushWaitBuf fills the queue's read buffer with data from the wait buffer.
|
||||
//
|
||||
// Preconditions:
|
||||
// * l.termiosMu must be held for reading.
|
||||
func (q *queue) pushWaitBuf(l *lineDiscipline) {
|
||||
func (q *queue) pushWaitBuf(l *lineDiscipline) int {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
q.pushWaitBufLocked(l)
|
||||
return q.pushWaitBufLocked(l)
|
||||
}
|
||||
|
||||
// Preconditions:
|
||||
// * l.termiosMu must be held for reading.
|
||||
// * q.mu must be locked.
|
||||
func (q *queue) pushWaitBufLocked(l *lineDiscipline) {
|
||||
func (q *queue) pushWaitBufLocked(l *lineDiscipline) int {
|
||||
// Remove bytes from the wait buffer and move them to the read buffer.
|
||||
n := q.transform(l, q, q.waitBuf.Bytes())
|
||||
q.waitBuf.Next(n)
|
||||
|
||||
// If state changed, notify any waiters.
|
||||
if n > 0 {
|
||||
q.Notify(waiter.EventIn)
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
|
|
@ -109,14 +109,12 @@ func (sf *slaveFileOperations) Release() {
|
|||
|
||||
// EventRegister implements waiter.Waitable.EventRegister.
|
||||
func (sf *slaveFileOperations) EventRegister(e *waiter.Entry, mask waiter.EventMask) {
|
||||
sf.si.t.ld.outQueue.EventRegister(e, mask)
|
||||
sf.si.t.ld.inQueue.EventRegister(e, mask)
|
||||
sf.si.t.ld.slaveWaiter.EventRegister(e, mask)
|
||||
}
|
||||
|
||||
// EventUnregister implements waiter.Waitable.EventUnregister.
|
||||
func (sf *slaveFileOperations) EventUnregister(e *waiter.Entry) {
|
||||
sf.si.t.ld.outQueue.EventUnregister(e)
|
||||
sf.si.t.ld.inQueue.EventUnregister(e)
|
||||
sf.si.t.ld.slaveWaiter.EventUnregister(e)
|
||||
}
|
||||
|
||||
// Readiness implements waiter.Waitable.Readiness.
|
||||
|
|
Loading…
Reference in New Issue