Remove TODOs from Async IO
Block and drain requests in io_destroy(2). Note the reason to create read-only mapping. PiperOrigin-RevId: 305786312
This commit is contained in:
parent
ace90f823c
commit
9f87502b46
|
@ -59,25 +59,27 @@ func (a *aioManager) newAIOContext(events uint32, id uint64) bool {
|
|||
}
|
||||
|
||||
a.contexts[id] = &AIOContext{
|
||||
done: make(chan struct{}, 1),
|
||||
requestReady: make(chan struct{}, 1),
|
||||
maxOutstanding: events,
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// destroyAIOContext destroys an asynchronous I/O context.
|
||||
// destroyAIOContext destroys an asynchronous I/O context. It doesn't wait for
|
||||
// for pending requests to complete. Returns the destroyed AIOContext so it can
|
||||
// be drained.
|
||||
//
|
||||
// False is returned if the context does not exist.
|
||||
func (a *aioManager) destroyAIOContext(id uint64) bool {
|
||||
// Nil is returned if the context does not exist.
|
||||
func (a *aioManager) destroyAIOContext(id uint64) *AIOContext {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
ctx, ok := a.contexts[id]
|
||||
if !ok {
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
delete(a.contexts, id)
|
||||
ctx.destroy()
|
||||
return true
|
||||
return ctx
|
||||
}
|
||||
|
||||
// lookupAIOContext looks up the given context.
|
||||
|
@ -102,8 +104,8 @@ type ioResult struct {
|
|||
//
|
||||
// +stateify savable
|
||||
type AIOContext struct {
|
||||
// done is the notification channel used for all requests.
|
||||
done chan struct{} `state:"nosave"`
|
||||
// requestReady is the notification channel used for all requests.
|
||||
requestReady chan struct{} `state:"nosave"`
|
||||
|
||||
// mu protects below.
|
||||
mu sync.Mutex `state:"nosave"`
|
||||
|
@ -129,8 +131,14 @@ func (ctx *AIOContext) destroy() {
|
|||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
ctx.dead = true
|
||||
if ctx.outstanding == 0 {
|
||||
close(ctx.done)
|
||||
ctx.checkForDone()
|
||||
}
|
||||
|
||||
// Preconditions: ctx.mu must be held by caller.
|
||||
func (ctx *AIOContext) checkForDone() {
|
||||
if ctx.dead && ctx.outstanding == 0 {
|
||||
close(ctx.requestReady)
|
||||
ctx.requestReady = nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -154,11 +162,12 @@ func (ctx *AIOContext) PopRequest() (interface{}, bool) {
|
|||
|
||||
// Is there anything ready?
|
||||
if e := ctx.results.Front(); e != nil {
|
||||
ctx.results.Remove(e)
|
||||
ctx.outstanding--
|
||||
if ctx.outstanding == 0 && ctx.dead {
|
||||
close(ctx.done)
|
||||
if ctx.outstanding == 0 {
|
||||
panic("AIOContext outstanding is going negative")
|
||||
}
|
||||
ctx.outstanding--
|
||||
ctx.results.Remove(e)
|
||||
ctx.checkForDone()
|
||||
return e.data, true
|
||||
}
|
||||
return nil, false
|
||||
|
@ -172,26 +181,58 @@ func (ctx *AIOContext) FinishRequest(data interface{}) {
|
|||
|
||||
// Push to the list and notify opportunistically. The channel notify
|
||||
// here is guaranteed to be safe because outstanding must be non-zero.
|
||||
// The done channel is only closed when outstanding reaches zero.
|
||||
// The requestReady channel is only closed when outstanding reaches zero.
|
||||
ctx.results.PushBack(&ioResult{data: data})
|
||||
|
||||
select {
|
||||
case ctx.done <- struct{}{}:
|
||||
case ctx.requestReady <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// WaitChannel returns a channel that is notified when an AIO request is
|
||||
// completed.
|
||||
//
|
||||
// The boolean return value indicates whether or not the context is active.
|
||||
func (ctx *AIOContext) WaitChannel() (chan struct{}, bool) {
|
||||
// completed. Returns nil if the context is destroyed and there are no more
|
||||
// outstanding requests.
|
||||
func (ctx *AIOContext) WaitChannel() chan struct{} {
|
||||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
if ctx.outstanding == 0 && ctx.dead {
|
||||
return nil, false
|
||||
return ctx.requestReady
|
||||
}
|
||||
|
||||
// Dead returns true if the context has been destroyed.
|
||||
func (ctx *AIOContext) Dead() bool {
|
||||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
return ctx.dead
|
||||
}
|
||||
|
||||
// CancelPendingRequest forgets about a request that hasn't yet completed.
|
||||
func (ctx *AIOContext) CancelPendingRequest() {
|
||||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
|
||||
if ctx.outstanding == 0 {
|
||||
panic("AIOContext outstanding is going negative")
|
||||
}
|
||||
return ctx.done, true
|
||||
ctx.outstanding--
|
||||
ctx.checkForDone()
|
||||
}
|
||||
|
||||
// Drain drops all completed requests. Pending requests remain untouched.
|
||||
func (ctx *AIOContext) Drain() {
|
||||
ctx.mu.Lock()
|
||||
defer ctx.mu.Unlock()
|
||||
|
||||
if ctx.outstanding == 0 {
|
||||
return
|
||||
}
|
||||
size := uint32(ctx.results.Len())
|
||||
if ctx.outstanding < size {
|
||||
panic("AIOContext outstanding is going negative")
|
||||
}
|
||||
ctx.outstanding -= size
|
||||
ctx.results.Reset()
|
||||
ctx.checkForDone()
|
||||
}
|
||||
|
||||
// aioMappable implements memmap.MappingIdentity and memmap.Mappable for AIO
|
||||
|
@ -332,9 +373,9 @@ func (mm *MemoryManager) NewAIOContext(ctx context.Context, events uint32) (uint
|
|||
Length: aioRingBufferSize,
|
||||
MappingIdentity: m,
|
||||
Mappable: m,
|
||||
// TODO(fvoznika): Linux does "do_mmap_pgoff(..., PROT_READ |
|
||||
// PROT_WRITE, ...)" in fs/aio.c:aio_setup_ring(); why do we make this
|
||||
// mapping read-only?
|
||||
// Linux uses "do_mmap_pgoff(..., PROT_READ | PROT_WRITE, ...)" in
|
||||
// fs/aio.c:aio_setup_ring(). Since we don't implement AIO_RING_MAGIC,
|
||||
// user mode should not write to this page.
|
||||
Perms: usermem.Read,
|
||||
MaxPerms: usermem.Read,
|
||||
})
|
||||
|
@ -349,11 +390,11 @@ func (mm *MemoryManager) NewAIOContext(ctx context.Context, events uint32) (uint
|
|||
return id, nil
|
||||
}
|
||||
|
||||
// DestroyAIOContext destroys an asynchronous I/O context. It returns false if
|
||||
// the context does not exist.
|
||||
func (mm *MemoryManager) DestroyAIOContext(ctx context.Context, id uint64) bool {
|
||||
// DestroyAIOContext destroys an asynchronous I/O context. It returns the
|
||||
// destroyed context. nil if the context does not exist.
|
||||
func (mm *MemoryManager) DestroyAIOContext(ctx context.Context, id uint64) *AIOContext {
|
||||
if _, ok := mm.LookupAIOContext(ctx, id); !ok {
|
||||
return false
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only unmaps after it assured that the address is a valid aio context to
|
||||
|
|
|
@ -16,5 +16,5 @@ package mm
|
|||
|
||||
// afterLoad is invoked by stateify.
|
||||
func (a *AIOContext) afterLoad() {
|
||||
a.done = make(chan struct{}, 1)
|
||||
a.requestReady = make(chan struct{}, 1)
|
||||
}
|
||||
|
|
|
@ -114,14 +114,28 @@ func IoSetup(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.Sysca
|
|||
func IoDestroy(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.SyscallControl, error) {
|
||||
id := args[0].Uint64()
|
||||
|
||||
// Destroy the given context.
|
||||
if !t.MemoryManager().DestroyAIOContext(t, id) {
|
||||
ctx := t.MemoryManager().DestroyAIOContext(t, id)
|
||||
if ctx == nil {
|
||||
// Does not exist.
|
||||
return 0, nil, syserror.EINVAL
|
||||
}
|
||||
// FIXME(fvoznika): Linux blocks until all AIO to the destroyed context is
|
||||
// done.
|
||||
return 0, nil, nil
|
||||
|
||||
// Drain completed requests amd wait for pending requests until there are no
|
||||
// more.
|
||||
for {
|
||||
ctx.Drain()
|
||||
|
||||
ch := ctx.WaitChannel()
|
||||
if ch == nil {
|
||||
// No more requests, we're done.
|
||||
return 0, nil, nil
|
||||
}
|
||||
// The task cannot be interrupted during the wait. Equivalent to
|
||||
// TASK_UNINTERRUPTIBLE in Linux.
|
||||
t.UninterruptibleSleepStart(true /* deactivate */)
|
||||
<-ch
|
||||
t.UninterruptibleSleepFinish(true /* activate */)
|
||||
}
|
||||
}
|
||||
|
||||
// IoGetevents implements linux syscall io_getevents(2).
|
||||
|
@ -200,13 +214,13 @@ func IoGetevents(t *kernel.Task, args arch.SyscallArguments) (uintptr, *kernel.S
|
|||
func waitForRequest(ctx *mm.AIOContext, t *kernel.Task, haveDeadline bool, deadline ktime.Time) (interface{}, error) {
|
||||
for {
|
||||
if v, ok := ctx.PopRequest(); ok {
|
||||
// Request was readly available. Just return it.
|
||||
// Request was readily available. Just return it.
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// Need to wait for request completion.
|
||||
done, active := ctx.WaitChannel()
|
||||
if !active {
|
||||
done := ctx.WaitChannel()
|
||||
if done == nil {
|
||||
// Context has been destroyed.
|
||||
return nil, syserror.EINVAL
|
||||
}
|
||||
|
@ -248,6 +262,10 @@ func memoryFor(t *kernel.Task, cb *ioCallback) (usermem.IOSequence, error) {
|
|||
}
|
||||
|
||||
func performCallback(t *kernel.Task, file *fs.File, cbAddr usermem.Addr, cb *ioCallback, ioseq usermem.IOSequence, ctx *mm.AIOContext, eventFile *fs.File) {
|
||||
if ctx.Dead() {
|
||||
ctx.CancelPendingRequest()
|
||||
return
|
||||
}
|
||||
ev := &ioEvent{
|
||||
Data: cb.Data,
|
||||
Obj: uint64(cbAddr),
|
||||
|
|
|
@ -89,6 +89,7 @@ class AIOTest : public FileTest {
|
|||
FileTest::TearDown();
|
||||
if (ctx_ != 0) {
|
||||
ASSERT_THAT(DestroyContext(), SyscallSucceeds());
|
||||
ctx_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -188,14 +189,19 @@ TEST_F(AIOTest, BadWrite) {
|
|||
}
|
||||
|
||||
TEST_F(AIOTest, ExitWithPendingIo) {
|
||||
// Setup a context that is 5 entries deep.
|
||||
ASSERT_THAT(SetupContext(5), SyscallSucceeds());
|
||||
// Setup a context that is 100 entries deep.
|
||||
ASSERT_THAT(SetupContext(100), SyscallSucceeds());
|
||||
|
||||
struct iocb cb = CreateCallback();
|
||||
struct iocb* cbs[] = {&cb};
|
||||
|
||||
// Submit a request but don't complete it to make it pending.
|
||||
EXPECT_THAT(Submit(1, cbs), SyscallSucceeds());
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
EXPECT_THAT(Submit(1, cbs), SyscallSucceeds());
|
||||
}
|
||||
|
||||
ASSERT_THAT(DestroyContext(), SyscallSucceeds());
|
||||
ctx_ = 0;
|
||||
}
|
||||
|
||||
int Submitter(void* arg) {
|
||||
|
|
Loading…
Reference in New Issue