// Copyright 2018 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package queue provides the implementation of transmit and receive queues // based on shared memory ring buffers. package queue import ( "encoding/binary" "sync/atomic" "gvisor.googlesource.com/gvisor/pkg/log" "gvisor.googlesource.com/gvisor/pkg/tcpip/link/sharedmem/pipe" ) const ( // Offsets within a posted buffer. postedOffset = 0 postedSize = 8 postedRemainingInGroup = 12 postedUserData = 16 postedID = 24 sizeOfPostedBuffer = 32 // Offsets within a received packet header. consumedPacketSize = 0 consumedPacketReserved = 4 sizeOfConsumedPacketHeader = 8 // Offsets within a consumed buffer. consumedOffset = 0 consumedSize = 8 consumedUserData = 12 consumedID = 20 sizeOfConsumedBuffer = 28 // The following are the allowed states of the shared data area. eventFDUninitialized = 0 eventFDDisabled = 1 eventFDEnabled = 2 ) // RxBuffer is the descriptor of a receive buffer. type RxBuffer struct { Offset uint64 Size uint32 ID uint64 UserData uint64 } // Rx is a receive queue. It is implemented with one tx and one rx pipe: the tx // pipe is used to "post" buffers, while the rx pipe is used to receive packets // whose contents have been written to previously posted buffers. // // This struct is thread-compatible. type Rx struct { tx pipe.Tx rx pipe.Rx sharedEventFDState *uint32 } // Init initializes the receive queue with the given pipes, and shared state // pointer -- the latter is used to enable/disable eventfd notifications. func (r *Rx) Init(tx, rx []byte, sharedEventFDState *uint32) { r.sharedEventFDState = sharedEventFDState r.tx.Init(tx) r.rx.Init(rx) } // EnableNotification updates the shared state such that the peer will notify // the eventfd when there are packets to be dequeued. func (r *Rx) EnableNotification() { atomic.StoreUint32(r.sharedEventFDState, eventFDEnabled) } // DisableNotification updates the shared state such that the peer will not // notify the eventfd. func (r *Rx) DisableNotification() { atomic.StoreUint32(r.sharedEventFDState, eventFDDisabled) } // PostedBuffersLimit returns the maximum number of buffers that can be posted // before the tx queue fills up. func (r *Rx) PostedBuffersLimit() uint64 { return r.tx.Capacity(sizeOfPostedBuffer) } // PostBuffers makes the given buffers available for receiving data from the // peer. Once they are posted, the peer is free to write to them and will // eventually post them back for consumption. func (r *Rx) PostBuffers(buffers []RxBuffer) bool { for i := range buffers { b := r.tx.Push(sizeOfPostedBuffer) if b == nil { r.tx.Abort() return false } pb := &buffers[i] binary.LittleEndian.PutUint64(b[postedOffset:], pb.Offset) binary.LittleEndian.PutUint32(b[postedSize:], pb.Size) binary.LittleEndian.PutUint32(b[postedRemainingInGroup:], 0) binary.LittleEndian.PutUint64(b[postedUserData:], pb.UserData) binary.LittleEndian.PutUint64(b[postedID:], pb.ID) } r.tx.Flush() return true } // Dequeue receives buffers that have been previously posted by PostBuffers() // and that have been filled by the peer and posted back. // // This is similar to append() in that new buffers are appended to "bufs", with // reallocation only if "bufs" doesn't have enough capacity. func (r *Rx) Dequeue(bufs []RxBuffer) ([]RxBuffer, uint32) { for { outBufs := bufs // Pull the next descriptor from the rx pipe. b := r.rx.Pull() if b == nil { return bufs, 0 } if len(b) < sizeOfConsumedPacketHeader { log.Warningf("Ignoring packet header: size (%v) is less than header size (%v)", len(b), sizeOfConsumedPacketHeader) r.rx.Flush() continue } totalDataSize := binary.LittleEndian.Uint32(b[consumedPacketSize:]) // Calculate the number of buffer descriptors and copy them // over to the output. count := (len(b) - sizeOfConsumedPacketHeader) / sizeOfConsumedBuffer offset := sizeOfConsumedPacketHeader buffersSize := uint32(0) for i := count; i > 0; i-- { s := binary.LittleEndian.Uint32(b[offset+consumedSize:]) buffersSize += s if buffersSize < s { // The buffer size overflows an unsigned 32-bit // integer, so break out and force it to be // ignored. totalDataSize = 1 buffersSize = 0 break } outBufs = append(outBufs, RxBuffer{ Offset: binary.LittleEndian.Uint64(b[offset+consumedOffset:]), Size: s, ID: binary.LittleEndian.Uint64(b[offset+consumedID:]), }) offset += sizeOfConsumedBuffer } r.rx.Flush() if buffersSize < totalDataSize { // The descriptor is corrupted, ignore it. log.Warningf("Ignoring packet: actual data size (%v) less than expected size (%v)", buffersSize, totalDataSize) continue } return outBufs, totalDataSize } } // Bytes returns the byte slices on which the queue operates. func (r *Rx) Bytes() (tx, rx []byte) { return r.tx.Bytes(), r.rx.Bytes() } // DecodeRxBufferHeader decodes the header of a buffer posted on an rx queue. func DecodeRxBufferHeader(b []byte) RxBuffer { return RxBuffer{ Offset: binary.LittleEndian.Uint64(b[postedOffset:]), Size: binary.LittleEndian.Uint32(b[postedSize:]), ID: binary.LittleEndian.Uint64(b[postedID:]), UserData: binary.LittleEndian.Uint64(b[postedUserData:]), } } // RxCompletionSize returns the number of bytes needed to encode an rx // completion containing "count" buffers. func RxCompletionSize(count int) uint64 { return sizeOfConsumedPacketHeader + uint64(count)*sizeOfConsumedBuffer } // EncodeRxCompletion encodes an rx completion header. func EncodeRxCompletion(b []byte, size, reserved uint32) { binary.LittleEndian.PutUint32(b[consumedPacketSize:], size) binary.LittleEndian.PutUint32(b[consumedPacketReserved:], reserved) } // EncodeRxCompletionBuffer encodes the i-th rx completion buffer header. func EncodeRxCompletionBuffer(b []byte, i int, rxb RxBuffer) { b = b[RxCompletionSize(i):] binary.LittleEndian.PutUint64(b[consumedOffset:], rxb.Offset) binary.LittleEndian.PutUint32(b[consumedSize:], rxb.Size) binary.LittleEndian.PutUint64(b[consumedUserData:], rxb.UserData) binary.LittleEndian.PutUint64(b[consumedID:], rxb.ID) }