2019-04-29 21:25:05 +00:00
|
|
|
// Copyright 2018 The gVisor Authors.
|
2018-07-09 21:03:03 +00:00
|
|
|
//
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
2018-04-27 17:37:02 +00:00
|
|
|
|
|
|
|
package pipe
|
|
|
|
|
|
|
|
// Rx is the receive side of the shared memory ring buffer.
|
|
|
|
type Rx struct {
|
|
|
|
p pipe
|
|
|
|
|
|
|
|
tail uint64
|
|
|
|
head uint64
|
|
|
|
}
|
|
|
|
|
|
|
|
// Init initializes the receive end of the pipe. In the initial state, the next
|
|
|
|
// slot to be inspected is the very first one.
|
|
|
|
func (r *Rx) Init(b []byte) {
|
|
|
|
r.p.init(b)
|
|
|
|
r.tail = 0xfffffffe * jump
|
|
|
|
r.head = r.tail
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pull reads the next buffer from the pipe, returning nil if there isn't one
|
|
|
|
// currently available.
|
|
|
|
//
|
|
|
|
// The returned slice is available until Flush() is next called. After that, it
|
|
|
|
// must not be touched.
|
|
|
|
func (r *Rx) Pull() []byte {
|
|
|
|
if r.head == r.tail+jump {
|
|
|
|
// We've already pulled the whole pipe.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
header := r.p.readAtomic(r.head)
|
|
|
|
if header&slotFree != 0 {
|
|
|
|
// The next slot is free, we can't pull it yet.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
payloadSize := header & slotSizeMask
|
|
|
|
newHead := r.head + payloadToSlotSize(payloadSize)
|
|
|
|
headWrap := (r.head & revolutionMask) | uint64(len(r.p.buffer))
|
|
|
|
|
|
|
|
// Check if this is a wrapping slot. If that's the case, it carries no
|
|
|
|
// data, so we just skip it and try again from the first slot.
|
|
|
|
if int64(newHead-headWrap) >= 0 {
|
|
|
|
if int64(newHead-headWrap) > int64(jump) || newHead&offsetMask != 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
if r.tail == r.head {
|
|
|
|
// If this is the first pull since the last Flush()
|
|
|
|
// call, we flush the state so that the sender can use
|
|
|
|
// this space if it needs to.
|
|
|
|
r.p.writeAtomic(r.head, slotFree|slotToPayloadSize(newHead-r.head))
|
|
|
|
r.tail = newHead
|
|
|
|
}
|
|
|
|
|
|
|
|
r.head = newHead
|
|
|
|
return r.Pull()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Grab the buffer before updating r.head.
|
|
|
|
b := r.p.data(r.head, payloadSize)
|
|
|
|
r.head = newHead
|
|
|
|
return b
|
|
|
|
}
|
|
|
|
|
|
|
|
// Flush tells the transmitter that all buffers pulled since the last Flush()
|
|
|
|
// have been used, so the transmitter is free to used their slots for further
|
|
|
|
// transmission.
|
|
|
|
func (r *Rx) Flush() {
|
|
|
|
if r.head == r.tail {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r.p.writeAtomic(r.tail, slotFree|slotToPayloadSize(r.head-r.tail))
|
|
|
|
r.tail = r.head
|
|
|
|
}
|
|
|
|
|
|
|
|
// Bytes returns the byte slice on which the pipe operates.
|
|
|
|
func (r *Rx) Bytes() []byte {
|
|
|
|
return r.p.buffer
|
|
|
|
}
|