unix: return ECONNRESET if peer closed with data not read
For SOCK_STREAM type unix socket, we shall return ECONNRESET if peer is closed with data not read. We explictly set a flag when closing one end, to differentiate from just shutdown (where zero shall be returned). Fixes: #735 Signed-off-by: Jianfeng Tan <henry.tjf@antfin.com>
This commit is contained in:
parent
96f78e2466
commit
2c3e2ed2bf
|
@ -385,3 +385,6 @@ func (c *ConnectedEndpoint) RecvMaxQueueSize() int64 {
|
|||
func (c *ConnectedEndpoint) Release() {
|
||||
c.ref.DecRefWithDestructor(c.close)
|
||||
}
|
||||
|
||||
// CloseUnread implements transport.ConnectedEndpoint.CloseUnread.
|
||||
func (c *ConnectedEndpoint) CloseUnread() {}
|
||||
|
|
|
@ -220,6 +220,11 @@ func (e *connectionedEndpoint) Close() {
|
|||
case e.Connected():
|
||||
e.connected.CloseSend()
|
||||
e.receiver.CloseRecv()
|
||||
// Still have unread data? If yes, we set this into the write
|
||||
// end so that the peer can get ECONNRESET) when it does read.
|
||||
if e.receiver.RecvQueuedSize() > 0 {
|
||||
e.connected.CloseUnread()
|
||||
}
|
||||
c = e.connected
|
||||
r = e.receiver
|
||||
e.connected = nil
|
||||
|
|
|
@ -33,6 +33,7 @@ type queue struct {
|
|||
|
||||
mu sync.Mutex `state:"nosave"`
|
||||
closed bool
|
||||
unread bool
|
||||
used int64
|
||||
limit int64
|
||||
dataList messageList
|
||||
|
@ -160,7 +161,9 @@ func (q *queue) Dequeue() (e *message, notify bool, err *syserr.Error) {
|
|||
if q.dataList.Front() == nil {
|
||||
err := syserr.ErrWouldBlock
|
||||
if q.closed {
|
||||
err = syserr.ErrClosedForReceive
|
||||
if err = syserr.ErrClosedForReceive; q.unread {
|
||||
err = syserr.ErrConnectionReset
|
||||
}
|
||||
}
|
||||
q.mu.Unlock()
|
||||
|
||||
|
@ -188,7 +191,9 @@ func (q *queue) Peek() (*message, *syserr.Error) {
|
|||
if q.dataList.Front() == nil {
|
||||
err := syserr.ErrWouldBlock
|
||||
if q.closed {
|
||||
err = syserr.ErrClosedForReceive
|
||||
if err = syserr.ErrClosedForReceive; q.unread {
|
||||
err = syserr.ErrConnectionReset
|
||||
}
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
@ -208,3 +213,11 @@ func (q *queue) QueuedSize() int64 {
|
|||
func (q *queue) MaxQueueSize() int64 {
|
||||
return q.limit
|
||||
}
|
||||
|
||||
// CloseUnread sets flag to indicate that the peer is closed (not shutdown)
|
||||
// with unread data. So if read on this queue shall return ECONNRESET error.
|
||||
func (q *queue) CloseUnread() {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
q.unread = true
|
||||
}
|
||||
|
|
|
@ -604,6 +604,10 @@ type ConnectedEndpoint interface {
|
|||
// Release releases any resources owned by the ConnectedEndpoint. It should
|
||||
// be called before droping all references to a ConnectedEndpoint.
|
||||
Release()
|
||||
|
||||
// CloseUnread sets the fact that this end is closed with unread data to
|
||||
// the peer socket.
|
||||
CloseUnread()
|
||||
}
|
||||
|
||||
// +stateify savable
|
||||
|
@ -707,6 +711,11 @@ func (e *connectedEndpoint) Release() {
|
|||
e.writeQueue.DecRef()
|
||||
}
|
||||
|
||||
// CloseUnread implements ConnectedEndpoint.CloseUnread.
|
||||
func (e *connectedEndpoint) CloseUnread() {
|
||||
e.writeQueue.CloseUnread()
|
||||
}
|
||||
|
||||
// baseEndpoint is an embeddable unix endpoint base used in both the connected and connectionless
|
||||
// unix domain socket Endpoint implementations.
|
||||
//
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
#include <stdio.h>
|
||||
#include <sys/un.h>
|
||||
#include <poll.h>
|
||||
#include "gtest/gtest.h"
|
||||
#include "gtest/gtest.h"
|
||||
#include "test/syscalls/linux/socket_test_util.h"
|
||||
|
@ -70,6 +71,25 @@ TEST_P(StreamUnixSocketPairTest, RecvmsgOneSideClosed) {
|
|||
SyscallSucceedsWithValue(0));
|
||||
}
|
||||
|
||||
TEST_P(StreamUnixSocketPairTest, ReadOneSideClosedWithUnreadData) {
|
||||
auto sockets = ASSERT_NO_ERRNO_AND_VALUE(NewSocketPair());
|
||||
|
||||
char buf[10] = {};
|
||||
ASSERT_THAT(RetryEINTR(write)(sockets->second_fd(), buf, sizeof(buf)),
|
||||
SyscallSucceedsWithValue(sizeof(buf)));
|
||||
|
||||
ASSERT_THAT(shutdown(sockets->first_fd(), SHUT_RDWR),
|
||||
SyscallSucceeds());
|
||||
|
||||
ASSERT_THAT(RetryEINTR(read)(sockets->second_fd(), buf, sizeof(buf)),
|
||||
SyscallSucceedsWithValue(0));
|
||||
|
||||
ASSERT_THAT(close(sockets->release_first_fd()), SyscallSucceeds());
|
||||
|
||||
ASSERT_THAT(RetryEINTR(read)(sockets->second_fd(), buf, sizeof(buf)),
|
||||
SyscallFailsWithErrno(ECONNRESET));
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
AllUnixDomainSockets, StreamUnixSocketPairTest,
|
||||
::testing::ValuesIn(IncludeReversals(VecCat<SocketPairKind>(
|
||||
|
|
Loading…
Reference in New Issue