Implement Queue.Receive.

Receive implements the behaviour of msgrcv(2) without the MSG_COPY flag.

Updates #135
This commit is contained in:
Zyad A. Ali 2021-06-16 21:02:16 +02:00
parent 527c369299
commit 930984a1aa
1 changed files with 130 additions and 0 deletions

View File

@ -294,6 +294,136 @@ func (q *Queue) append(ctx context.Context, m Message, creds *auth.Credentials,
return nil
}
// Receive removes a message from the queue and returns it. See msgrcv(2).
func (q *Queue) Receive(ctx context.Context, b Blocker, mType int64, maxSize int64, wait, truncate, except bool, pid int32) (msg *Message, err error) {
if maxSize < 0 || maxSize > maxMessageBytes {
return nil, linuxerr.EINVAL
}
max := uint64(maxSize)
// Try to perform a non-blocking receive using queue.pop. If EWOULDBLOCK
// is returned, start the blocking procedure. Otherwise, return normally.
creds := auth.CredentialsFromContext(ctx)
if msg, err := q.pop(ctx, creds, mType, max, truncate, except, pid); err != linuxerr.EWOULDBLOCK {
return msg, err
}
if !wait {
return nil, linuxerr.ENOMSG
}
e, ch := waiter.NewChannelEntry(nil)
q.receivers.EventRegister(&e, waiter.EventIn)
for {
if msg, err = q.pop(ctx, creds, mType, max, truncate, except, pid); err != linuxerr.EWOULDBLOCK {
break
}
b.Block(ch)
}
q.receivers.EventUnregister(&e)
return msg, err
}
// pop pops the first message from the queue that matches the given type. It
// returns an error for all the cases specified in msgrcv(2). If the queue is
// empty or no message of the specified type is available, a EWOULDBLOCK error
// is returned, which can then be used as a signal to block the process or fail.
func (q *Queue) pop(ctx context.Context, creds *auth.Credentials, mType int64, maxSize uint64, truncate, except bool, pid int32) (msg *Message, _ error) {
q.mu.Lock()
defer q.mu.Unlock()
if !q.obj.CheckPermissions(creds, fs.PermMask{Read: true}) {
// The calling process does not have read permission on the message
// queue, and does not have the CAP_IPC_OWNER capability in the user
// namespace that governs its IPC namespace.
return nil, linuxerr.EACCES
}
// Queue was removed while the process was waiting.
if q.dead {
return nil, linuxerr.EIDRM
}
if q.messages.Empty() {
return nil, linuxerr.EWOULDBLOCK
}
// Get a message from the queue.
switch {
case mType == 0:
msg = q.messages.Front()
case mType > 0:
msg = q.msgOfType(mType, except)
case mType < 0:
msg = q.msgOfTypeLessThan(-1 * mType)
}
// If no message exists, return a blocking singal.
if msg == nil {
return nil, linuxerr.EWOULDBLOCK
}
// Check message's size is acceptable.
if maxSize < msg.Size {
if !truncate {
return nil, linuxerr.E2BIG
}
msg.Size = maxSize
msg.Text = msg.Text[:maxSize+1]
}
q.messages.Remove(msg)
q.byteCount -= msg.Size
q.messageCount--
q.receivePID = pid
q.receiveTime = ktime.NowFromContext(ctx)
// Notify senders about available space.
q.senders.Notify(waiter.EventOut)
return msg, nil
}
// msgOfType returns the first message with the specified type, nil if no
// message is found. If except is true, the first message of a type not equal
// to mType will be returned.
//
// Precondition: caller must hold q.mu.
func (q *Queue) msgOfType(mType int64, except bool) *Message {
if except {
for msg := q.messages.Front(); msg != nil; msg = msg.Next() {
if msg.Type != mType {
return msg
}
}
return nil
}
for msg := q.messages.Front(); msg != nil; msg = msg.Next() {
if msg.Type == mType {
return msg
}
}
return nil
}
// msgOfTypeLessThan return the the first message with the lowest type less
// than or equal to mType, nil if no such message exists.
//
// Precondition: caller must hold q.mu.
func (q *Queue) msgOfTypeLessThan(mType int64) (m *Message) {
min := mType
for msg := q.messages.Front(); msg != nil; msg = msg.Next() {
if msg.Type <= mType && msg.Type < min {
m = msg
min = msg.Type
}
}
return m
}
// Lock implements ipc.Mechanism.Lock.
func (q *Queue) Lock() {
q.mu.Lock()