diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index fcb087ed15dfbb4d567ee0c742e1db8ec1353a2d..6fcd9093268e7bc5aaf6102fa9d6adcdcced1d6d 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -1285,6 +1285,7 @@ static ListCell * asyncQueueAddEntries(ListCell *nextNotify) { AsyncQueueEntry qe; + QueuePosition queue_head; int pageno; int offset; int slotno; @@ -1292,8 +1293,21 @@ asyncQueueAddEntries(ListCell *nextNotify) /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); + /* + * We work with a local copy of QUEUE_HEAD, which we write back to shared + * memory upon exiting. The reason for this is that if we have to advance + * to a new page, SimpleLruZeroPage might fail (out of disk space, for + * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, + * subsequent insertions would try to put entries into a page that slru.c + * thinks doesn't exist yet.) So, use a local position variable. Note + * that if we do fail, any already-inserted queue entries are forgotten; + * this is okay, since they'd be useless anyway after our transaction + * rolls back. + */ + queue_head = QUEUE_HEAD; + /* Fetch the current page */ - pageno = QUEUE_POS_PAGE(QUEUE_HEAD); + pageno = QUEUE_POS_PAGE(queue_head); slotno = SimpleLruReadPage(AsyncCtl, pageno, true, InvalidTransactionId); /* Note we mark the page dirty before writing in it */ AsyncCtl->shared->page_dirty[slotno] = true; @@ -1305,7 +1319,7 @@ asyncQueueAddEntries(ListCell *nextNotify) /* Construct a valid queue entry in local variable qe */ asyncQueueNotificationToEntry(n, &qe); - offset = QUEUE_POS_OFFSET(QUEUE_HEAD); + offset = QUEUE_POS_OFFSET(queue_head); /* Check whether the entry really fits on the current page */ if (offset + qe.length <= QUEUE_PAGESIZE) @@ -1331,8 +1345,8 @@ asyncQueueAddEntries(ListCell *nextNotify) &qe, qe.length); - /* Advance QUEUE_HEAD appropriately, and note if page is full */ - if (asyncQueueAdvance(&(QUEUE_HEAD), qe.length)) + /* Advance queue_head appropriately, and detect if page is full */ + if (asyncQueueAdvance(&(queue_head), qe.length)) { /* * Page is full, so we're done here, but first fill the next page @@ -1342,12 +1356,15 @@ asyncQueueAddEntries(ListCell *nextNotify) * asyncQueueIsFull() ensured that there is room to create this * page without overrunning the queue. */ - slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(QUEUE_HEAD)); + slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head)); /* And exit the loop */ break; } } + /* Success, so update the global QUEUE_HEAD */ + QUEUE_HEAD = queue_head; + LWLockRelease(AsyncCtlLock); return nextNotify;