// Copyright 2018 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pipe // Tx is the transmit side of the shared memory ring buffer. type Tx struct { p pipe maxPayloadSize uint64 head uint64 tail uint64 next uint64 tailHeader uint64 } // Init initializes the transmit end of the pipe. In the initial state, the next // slot to be written is the very first one, and the transmitter has the whole // ring buffer available to it. func (t *Tx) Init(b []byte) { t.p.init(b) // maxPayloadSize excludes the header of the payload, and the header // of the wrapping message. t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader t.tail = 0xfffffffe * jump t.next = t.tail t.head = t.tail + jump t.p.write(t.tail, slotFree) } // Capacity determines how many records of the given size can be written to the // pipe before it fills up. func (t *Tx) Capacity(recordSize uint64) uint64 { available := uint64(len(t.p.buffer)) - sizeOfSlotHeader entryLen := payloadToSlotSize(recordSize) return available / entryLen } // Push reserves "payloadSize" bytes for transmission in the pipe. The caller // populates the returned slice with the data to be transferred and enventually // calls Flush() to make the data visible to the reader, or Abort() to make the // pipe forget all Push() calls since the last Flush(). // // The returned slice is available until Flush() or Abort() is next called. // After that, it must not be touched. func (t *Tx) Push(payloadSize uint64) []byte { // Fail request if we know we will never have enough room. if payloadSize > t.maxPayloadSize { return nil } totalLen := payloadToSlotSize(payloadSize) newNext := t.next + totalLen nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer)) if int64(newNext-nextWrap) >= 0 { // The new buffer would overflow the pipe, so we push a wrapping // slot, then try to add the actual slot to the front of the // pipe. newNext = (newNext & revolutionMask) + jump wrappingPayloadSize := slotToPayloadSize(newNext - t.next) if !t.reclaim(newNext) { return nil } oldNext := t.next t.next = newNext if oldNext != t.tail { t.p.write(oldNext, wrappingPayloadSize) } else { t.tailHeader = wrappingPayloadSize t.Flush() } newNext += totalLen } // Check that we have enough room for the buffer. if !t.reclaim(newNext) { return nil } if t.next != t.tail { t.p.write(t.next, payloadSize) } else { t.tailHeader = payloadSize } // Grab the buffer before updating t.next. b := t.p.data(t.next, payloadSize) t.next = newNext return b } // reclaim attempts to advance the head until at least newNext. If the head is // already at or beyond newNext, nothing happens and true is returned; otherwise // it tries to reclaim slots that have already been consumed by the receive end // of the pipe (they will be marked as free) and returns a boolean indicating // whether it was successful in reclaiming enough slots. func (t *Tx) reclaim(newNext uint64) bool { for int64(newNext-t.head) > 0 { // Can't reclaim if slot is not free. header := t.p.readAtomic(t.head) if header&slotFree == 0 { return false } payloadSize := header & slotSizeMask newHead := t.head + payloadToSlotSize(payloadSize) // Check newHead is within bounds and valid. if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) { return false } t.head = newHead } return true } // Abort causes all Push() calls since the last Flush() to be forgotten and // therefore they will not be made visible to the receiver. func (t *Tx) Abort() { t.next = t.tail } // Flush causes all buffers pushed since the last Flush() [or Abort(), whichever // is the most recent] to be made visible to the receiver. func (t *Tx) Flush() { if t.next == t.tail { // Nothing to do if there are no pushed buffers. return } if t.next != t.head { // The receiver will spin in t.next, so we must make sure that // the slotFree bit is set. t.p.write(t.next, slotFree) } t.p.writeAtomic(t.tail, t.tailHeader) t.tail = t.next } // Bytes returns the byte slice on which the pipe operates. func (t *Tx) Bytes() []byte { return t.p.buffer }