parent
9149b2cefd
commit
2aeab259c4
|
@ -121,7 +121,7 @@ func (ep *Endpoint) ctrlWaitFirst() error {
|
||||||
return ep.futexWaitUntilActive()
|
return ep.futexWaitUntilActive()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ep *Endpoint) ctrlRoundTrip() error {
|
func (ep *Endpoint) ctrlRoundTrip(mayRetainP bool) error {
|
||||||
if err := ep.enterFutexWait(); err != nil {
|
if err := ep.enterFutexWait(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -133,6 +133,9 @@ func (ep *Endpoint) ctrlRoundTrip() error {
|
||||||
if err := ep.futexWakePeer(); err != nil {
|
if err := ep.futexWakePeer(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Since we don't know if the peer Endpoint is in the same process as this
|
||||||
|
// one (in which case it may need our P to run), we allow our P to be
|
||||||
|
// retaken regardless of mayRetainP.
|
||||||
return ep.futexWaitUntilActive()
|
return ep.futexWaitUntilActive()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -223,6 +223,23 @@ func (ep *Endpoint) RecvFirst() (uint32, error) {
|
||||||
// * If ep is a client Endpoint, ep.Connect() has previously been called and
|
// * If ep is a client Endpoint, ep.Connect() has previously been called and
|
||||||
// returned nil.
|
// returned nil.
|
||||||
func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
|
func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
|
||||||
|
return ep.sendRecv(dataLen, false /* mayRetainP */)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendRecvFast is equivalent to SendRecv, but may prevent the caller's runtime
|
||||||
|
// P from being released, in which case the calling goroutine continues to
|
||||||
|
// count against GOMAXPROCS while waiting for the peer Endpoint to return
|
||||||
|
// control to the caller.
|
||||||
|
//
|
||||||
|
// SendRecvFast is appropriate if the peer Endpoint is expected to consistently
|
||||||
|
// return control in a short amount of time (less than ~10ms).
|
||||||
|
//
|
||||||
|
// Preconditions: As for SendRecv.
|
||||||
|
func (ep *Endpoint) SendRecvFast(dataLen uint32) (uint32, error) {
|
||||||
|
return ep.sendRecv(dataLen, true /* mayRetainP */)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ep *Endpoint) sendRecv(dataLen uint32, mayRetainP bool) (uint32, error) {
|
||||||
if dataLen > ep.dataCap {
|
if dataLen > ep.dataCap {
|
||||||
panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap))
|
panic(fmt.Sprintf("attempting to send packet with datagram length %d (maximum %d)", dataLen, ep.dataCap))
|
||||||
}
|
}
|
||||||
|
@ -233,7 +250,7 @@ func (ep *Endpoint) SendRecv(dataLen uint32) (uint32, error) {
|
||||||
// they can only shoot themselves in the foot.
|
// they can only shoot themselves in the foot.
|
||||||
*ep.dataLen() = dataLen
|
*ep.dataLen() = dataLen
|
||||||
raceBecomeInactive()
|
raceBecomeInactive()
|
||||||
if err := ep.ctrlRoundTrip(); err != nil {
|
if err := ep.ctrlRoundTrip(mayRetainP); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
raceBecomeActive()
|
raceBecomeActive()
|
||||||
|
|
|
@ -528,7 +528,7 @@ func (c *Client) sendRecvChannel(t message, r message) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the request and receive the server's response.
|
// Send the request and receive the server's response.
|
||||||
rsz, err := ch.send(t)
|
rsz, err := ch.send(t, false /* isServer */)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// See above.
|
// See above.
|
||||||
c.channelsMu.Lock()
|
c.channelsMu.Lock()
|
||||||
|
|
|
@ -85,7 +85,7 @@ func (ch *channel) service(cs *connState) error {
|
||||||
}
|
}
|
||||||
r := cs.handle(m)
|
r := cs.handle(m)
|
||||||
msgRegistry.put(m)
|
msgRegistry.put(m)
|
||||||
rsz, err = ch.send(r)
|
rsz, err = ch.send(r, true /* isServer */)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ func (ch *channel) Close() error {
|
||||||
//
|
//
|
||||||
// The return value is the size of the received response. Not that in the
|
// The return value is the size of the received response. Not that in the
|
||||||
// server case, this is the size of the next request.
|
// server case, this is the size of the next request.
|
||||||
func (ch *channel) send(m message) (uint32, error) {
|
func (ch *channel) send(m message, isServer bool) (uint32, error) {
|
||||||
if log.IsLogging(log.Debug) {
|
if log.IsLogging(log.Debug) {
|
||||||
log.Debugf("send [channel @%p] %s", ch, m.String())
|
log.Debugf("send [channel @%p] %s", ch, m.String())
|
||||||
}
|
}
|
||||||
|
@ -162,8 +162,12 @@ func (ch *channel) send(m message) (uint32, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform the one-shot communication.
|
// Perform the one-shot communication.
|
||||||
|
if isServer {
|
||||||
return ch.data.SendRecv(ssz)
|
return ch.data.SendRecv(ssz)
|
||||||
}
|
}
|
||||||
|
// RPCs are expected to return quickly rather than block.
|
||||||
|
return ch.data.SendRecvFast(ssz)
|
||||||
|
}
|
||||||
|
|
||||||
// recv decodes a message that exists on the channel.
|
// recv decodes a message that exists on the channel.
|
||||||
//
|
//
|
||||||
|
|
Loading…
Reference in New Issue