diff --git a/pkg/sentry/fs/host/socket.go b/pkg/sentry/fs/host/socket.go index 2392787cb..107336a3e 100644 --- a/pkg/sentry/fs/host/socket.go +++ b/pkg/sentry/fs/host/socket.go @@ -385,3 +385,6 @@ func (c *ConnectedEndpoint) RecvMaxQueueSize() int64 { func (c *ConnectedEndpoint) Release() { c.ref.DecRefWithDestructor(c.close) } + +// CloseUnread implements transport.ConnectedEndpoint.CloseUnread. +func (c *ConnectedEndpoint) CloseUnread() {} diff --git a/pkg/sentry/socket/unix/transport/connectioned.go b/pkg/sentry/socket/unix/transport/connectioned.go index 4bd15808a..dea11e253 100644 --- a/pkg/sentry/socket/unix/transport/connectioned.go +++ b/pkg/sentry/socket/unix/transport/connectioned.go @@ -220,6 +220,11 @@ func (e *connectionedEndpoint) Close() { case e.Connected(): e.connected.CloseSend() e.receiver.CloseRecv() + // Still have unread data? If yes, we set this into the write + // end so that the peer can get ECONNRESET) when it does read. + if e.receiver.RecvQueuedSize() > 0 { + e.connected.CloseUnread() + } c = e.connected r = e.receiver e.connected = nil diff --git a/pkg/sentry/socket/unix/transport/queue.go b/pkg/sentry/socket/unix/transport/queue.go index 0415fae9a..1c71609e2 100644 --- a/pkg/sentry/socket/unix/transport/queue.go +++ b/pkg/sentry/socket/unix/transport/queue.go @@ -33,6 +33,7 @@ type queue struct { mu sync.Mutex `state:"nosave"` closed bool + unread bool used int64 limit int64 dataList messageList @@ -160,7 +161,9 @@ func (q *queue) Dequeue() (e *message, notify bool, err *syserr.Error) { if q.dataList.Front() == nil { err := syserr.ErrWouldBlock if q.closed { - err = syserr.ErrClosedForReceive + if err = syserr.ErrClosedForReceive; q.unread { + err = syserr.ErrConnectionReset + } } q.mu.Unlock() @@ -188,7 +191,9 @@ func (q *queue) Peek() (*message, *syserr.Error) { if q.dataList.Front() == nil { err := syserr.ErrWouldBlock if q.closed { - err = syserr.ErrClosedForReceive + if err = syserr.ErrClosedForReceive; q.unread { + err = syserr.ErrConnectionReset + } } return nil, err } @@ -208,3 +213,11 @@ func (q *queue) QueuedSize() int64 { func (q *queue) MaxQueueSize() int64 { return q.limit } + +// CloseUnread sets flag to indicate that the peer is closed (not shutdown) +// with unread data. So if read on this queue shall return ECONNRESET error. +func (q *queue) CloseUnread() { + q.mu.Lock() + defer q.mu.Unlock() + q.unread = true +} diff --git a/pkg/sentry/socket/unix/transport/unix.go b/pkg/sentry/socket/unix/transport/unix.go index 2b0ad6395..a103b1aab 100644 --- a/pkg/sentry/socket/unix/transport/unix.go +++ b/pkg/sentry/socket/unix/transport/unix.go @@ -604,6 +604,10 @@ type ConnectedEndpoint interface { // Release releases any resources owned by the ConnectedEndpoint. It should // be called before droping all references to a ConnectedEndpoint. Release() + + // CloseUnread sets the fact that this end is closed with unread data to + // the peer socket. + CloseUnread() } // +stateify savable @@ -707,6 +711,11 @@ func (e *connectedEndpoint) Release() { e.writeQueue.DecRef() } +// CloseUnread implements ConnectedEndpoint.CloseUnread. +func (e *connectedEndpoint) CloseUnread() { + e.writeQueue.CloseUnread() +} + // baseEndpoint is an embeddable unix endpoint base used in both the connected and connectionless // unix domain socket Endpoint implementations. // diff --git a/test/syscalls/linux/socket_unix_stream.cc b/test/syscalls/linux/socket_unix_stream.cc index c7e8860f0..be661c2b6 100644 --- a/test/syscalls/linux/socket_unix_stream.cc +++ b/test/syscalls/linux/socket_unix_stream.cc @@ -14,6 +14,7 @@ #include #include +#include #include "gtest/gtest.h" #include "gtest/gtest.h" #include "test/syscalls/linux/socket_test_util.h" @@ -70,6 +71,25 @@ TEST_P(StreamUnixSocketPairTest, RecvmsgOneSideClosed) { SyscallSucceedsWithValue(0)); } +TEST_P(StreamUnixSocketPairTest, ReadOneSideClosedWithUnreadData) { + auto sockets = ASSERT_NO_ERRNO_AND_VALUE(NewSocketPair()); + + char buf[10] = {}; + ASSERT_THAT(RetryEINTR(write)(sockets->second_fd(), buf, sizeof(buf)), + SyscallSucceedsWithValue(sizeof(buf))); + + ASSERT_THAT(shutdown(sockets->first_fd(), SHUT_RDWR), + SyscallSucceeds()); + + ASSERT_THAT(RetryEINTR(read)(sockets->second_fd(), buf, sizeof(buf)), + SyscallSucceedsWithValue(0)); + + ASSERT_THAT(close(sockets->release_first_fd()), SyscallSucceeds()); + + ASSERT_THAT(RetryEINTR(read)(sockets->second_fd(), buf, sizeof(buf)), + SyscallFailsWithErrno(ECONNRESET)); +} + INSTANTIATE_TEST_SUITE_P( AllUnixDomainSockets, StreamUnixSocketPairTest, ::testing::ValuesIn(IncludeReversals(VecCat(