// Copyright 2018 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package pipe implements a shared memory ring buffer on which a single reader // and a single writer can operate (read/write) concurrently. The ring buffer // allows for data of different sizes to be written, and preserves the boundary // of the written data. // // Example usage is as follows: // // wb := t.Push(20) // // Write data to wb. // t.Flush() // // rb := r.Pull() // // Do something with data in rb. // t.Flush() package pipe import ( "math" ) const ( jump uint64 = math.MaxUint32 + 1 offsetMask uint64 = math.MaxUint32 revolutionMask uint64 = ^offsetMask sizeOfSlotHeader = 8 // sizeof(uint64) slotFree uint64 = 1 << 63 slotSizeMask uint64 = math.MaxUint32 ) // payloadToSlotSize calculates the total size of a slot based on its payload // size. The total size is the header size, plus the payload size, plus padding // if necessary to make the total size a multiple of sizeOfSlotHeader. func payloadToSlotSize(payloadSize uint64) uint64 { s := sizeOfSlotHeader + payloadSize return (s + sizeOfSlotHeader - 1) &^ (sizeOfSlotHeader - 1) } // slotToPayloadSize calculates the payload size of a slot based on the total // size of the slot. This is only meant to be used when creating slots that // don't carry information (e.g., free slots or wrap slots). func slotToPayloadSize(offset uint64) uint64 { return offset - sizeOfSlotHeader } // pipe is a basic data structure used by both (transmit & receive) ends of a // pipe. Indices into this pipe are split into two fields: offset, which counts // the number of bytes from the beginning of the buffer, and revolution, which // counts the number of times the index has wrapped around. type pipe struct { buffer []byte } // init initializes the pipe buffer such that its size is a multiple of the size // of the slot header. func (p *pipe) init(b []byte) { p.buffer = b[:len(b)&^(sizeOfSlotHeader-1)] } // data returns a section of the buffer starting at the given index (which may // include revolution information) and with the given size. func (p *pipe) data(idx uint64, size uint64) []byte { return p.buffer[(idx&offsetMask)+sizeOfSlotHeader:][:size] }