diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 69af75c6b64551167d428baf30b459efd7290154..6e84cd0a21671486693e7f94d5fda8efdedf4bb4 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -327,7 +327,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 	proc->databaseId = databaseid;
 	proc->roleId = owner;
 	proc->lwWaiting = false;
-	proc->lwExclusive = false;
+	proc->lwWaitMode = 0;
 	proc->lwWaitLink = NULL;
 	proc->waitLock = NULL;
 	proc->waitProcLock = NULL;
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4b273a8318f44324d911ed16a52907052fdeb56a..cce87a3cd30280a0b4f6a2cb68cb73a56a1e3d86 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2118,23 +2118,43 @@ XLogFlush(XLogRecPtr record)
 	/* initialize to given target; may increase below */
 	WriteRqstPtr = record;
 
-	/* read LogwrtResult and update local state */
+	/*
+	 * Now wait until we get the write lock, or someone else does the
+	 * flush for us.
+	 */
+	for (;;)
 	{
 		/* use volatile pointer to prevent code rearrangement */
 		volatile XLogCtlData *xlogctl = XLogCtl;
 
+		/* read LogwrtResult and update local state */
 		SpinLockAcquire(&xlogctl->info_lck);
 		if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
 			WriteRqstPtr = xlogctl->LogwrtRqst.Write;
 		LogwrtResult = xlogctl->LogwrtResult;
 		SpinLockRelease(&xlogctl->info_lck);
-	}
 
-	/* done already? */
-	if (!XLByteLE(record, LogwrtResult.Flush))
-	{
-		/* now wait for the write lock */
-		LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+		/* done already? */
+		if (XLByteLE(record, LogwrtResult.Flush))
+			break;
+
+		/*
+		 * Try to get the write lock. If we can't get it immediately, wait
+		 * until it's released, and recheck if we still need to do the flush
+		 * or if the backend that held the lock did it for us already. This
+		 * helps to maintain a good rate of group committing when the system
+		 * is bottlenecked by the speed of fsyncing.
+		 */
+		if (!LWLockWaitUntilFree(WALWriteLock, LW_EXCLUSIVE))
+		{
+			/*
+			 * The lock is now free, but we didn't acquire it yet. Before we
+			 * do, loop back to check if someone else flushed the record for
+			 * us already.
+			 */
+			continue;
+		}
+		/* Got the lock */
 		LogwrtResult = XLogCtl->Write.LogwrtResult;
 		if (!XLByteLE(record, LogwrtResult.Flush))
 		{
@@ -2163,6 +2183,8 @@ XLogFlush(XLogRecPtr record)
 			XLogWrite(WriteRqst, false, false);
 		}
 		LWLockRelease(WALWriteLock);
+		/* done */
+		break;
 	}
 
 	END_CRIT_SECTION();
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index cc4156826b59745545706dcaa660921486cbcdbc..bee35b8c1cf93c9a5c65ee2d76e05e2fa164e4eb 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -430,7 +430,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode)
 			elog(PANIC, "cannot wait without a PGPROC structure");
 
 		proc->lwWaiting = true;
-		proc->lwExclusive = (mode == LW_EXCLUSIVE);
+		proc->lwWaitMode = mode;
 		proc->lwWaitLink = NULL;
 		if (lock->head == NULL)
 			lock->head = proc;
@@ -564,6 +564,144 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
 	return !mustwait;
 }
 
+/*
+ * LWLockWaitUntilFree - Wait until a lock is free
+ *
+ * The semantics of this function are a bit funky.  If the lock is currently
+ * free, it is acquired in the given mode, and the function returns true.  If
+ * the lock isn't immediately free, the function waits until it is released
+ * and returns false, but does not acquire the lock.
+ *
+ * This is currently used for WALWriteLock: when a backend flushes the WAL,
+ * holding WALWriteLock, it can flush the commit records of many other
+ * backends as a side-effect.  Those other backends need to wait until the
+ * flush finishes, but don't need to acquire the lock anymore.  They can just
+ * wake up, observe that their records have already been flushed, and return.
+ */
+bool
+LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode)
+{
+	volatile LWLock *lock = &(LWLockArray[lockid].lock);
+	PGPROC	   *proc = MyProc;
+	bool		mustwait;
+	int			extraWaits = 0;
+
+	PRINT_LWDEBUG("LWLockWaitUntilFree", lockid, lock);
+
+	/* Ensure we will have room to remember the lock */
+	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
+		elog(ERROR, "too many LWLocks taken");
+
+	/*
+	 * Lock out cancel/die interrupts until we exit the code section protected
+	 * by the LWLock.  This ensures that interrupts will not interfere with
+	 * manipulations of data structures in shared memory.
+	 */
+	HOLD_INTERRUPTS();
+
+	/* Acquire mutex.  Time spent holding mutex should be short! */
+	SpinLockAcquire(&lock->mutex);
+
+	/* If I can get the lock, do so quickly. */
+	if (mode == LW_EXCLUSIVE)
+	{
+		if (lock->exclusive == 0 && lock->shared == 0)
+		{
+			lock->exclusive++;
+			mustwait = false;
+		}
+		else
+			mustwait = true;
+	}
+	else
+	{
+		if (lock->exclusive == 0)
+		{
+			lock->shared++;
+			mustwait = false;
+		}
+		else
+			mustwait = true;
+	}
+
+	if (mustwait)
+	{
+		/*
+		 * Add myself to wait queue.
+		 *
+		 * If we don't have a PGPROC structure, there's no way to wait.  This
+		 * should never occur, since MyProc should only be null during shared
+		 * memory initialization.
+		 */
+		if (proc == NULL)
+			elog(PANIC, "cannot wait without a PGPROC structure");
+
+		proc->lwWaiting = true;
+		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+		proc->lwWaitLink = NULL;
+		if (lock->head == NULL)
+			lock->head = proc;
+		else
+			lock->tail->lwWaitLink = proc;
+		lock->tail = proc;
+
+		/* Can release the mutex now */
+		SpinLockRelease(&lock->mutex);
+
+		/*
+		 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
+		 * wakups, because we share the semaphore with ProcWaitForSignal.
+		 */
+		LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "waiting");
+
+#ifdef LWLOCK_STATS
+		block_counts[lockid]++;
+#endif
+
+		TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
+
+		for (;;)
+		{
+			/* "false" means cannot accept cancel/die interrupt here. */
+			PGSemaphoreLock(&proc->sem, false);
+			if (!proc->lwWaiting)
+				break;
+			extraWaits++;
+		}
+
+		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
+
+		LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "awakened");
+	}
+	else
+	{
+		/* We are done updating shared state of the lock itself. */
+		SpinLockRelease(&lock->mutex);
+	}
+
+	/*
+	 * Fix the process wait semaphore's count for any absorbed wakeups.
+	 */
+	while (extraWaits-- > 0)
+		PGSemaphoreUnlock(&proc->sem);
+
+	if (mustwait)
+	{
+		/* Failed to get lock, so release interrupt holdoff */
+		RESUME_INTERRUPTS();
+		LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "failed");
+		TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode);
+	}
+	else
+	{
+		/* Add lock to list of locks held by this backend */
+		held_lwlocks[num_held_lwlocks++] = lockid;
+		TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode);
+	}
+
+	return !mustwait;
+}
+
 /*
  * LWLockRelease - release a previously acquired lock
  */
@@ -618,20 +756,36 @@ LWLockRelease(LWLockId lockid)
 			/*
 			 * Remove the to-be-awakened PGPROCs from the queue.  If the front
 			 * waiter wants exclusive lock, awaken him only. Otherwise awaken
-			 * as many waiters as want shared access.
+			 * as many waiters as want shared access (or just want to be
+			 * woken up when the lock becomes free without acquiring it,
+			 * ie. LWLockWaitUntilFree).
 			 */
+			bool releaseOK = true;
+
 			proc = head;
-			if (!proc->lwExclusive)
+			if (proc->lwWaitMode != LW_EXCLUSIVE)
 			{
 				while (proc->lwWaitLink != NULL &&
-					   !proc->lwWaitLink->lwExclusive)
+					   proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
+				{
 					proc = proc->lwWaitLink;
+					if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+						releaseOK = false;
+				}
 			}
 			/* proc is now the last PGPROC to be released */
 			lock->head = proc->lwWaitLink;
 			proc->lwWaitLink = NULL;
-			/* prevent additional wakeups until retryer gets to run */
-			lock->releaseOK = false;
+			/*
+			 * Prevent additional wakeups until retryer gets to run. Backends
+			 * that are just waiting for the lock to become free don't prevent
+			 * wakeups, because they might decide that they don't want the
+			 * lock, after all.
+			 */
+			if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
+				releaseOK = false;
+
+			lock->releaseOK = releaseOK;
 		}
 		else
 		{
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 4c2b6d480240777048d928ea500b5aa75693db4f..2196f514d8f7ba01a5c4017ba12f3b15426115f1 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -362,7 +362,7 @@ InitProcess(void)
 	if (IsAutoVacuumWorkerProcess())
 		MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
 	MyProc->lwWaiting = false;
-	MyProc->lwExclusive = false;
+	MyProc->lwWaitMode = 0;
 	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
@@ -517,7 +517,7 @@ InitAuxiliaryProcess(void)
 	MyPgXact->inCommit = false;
 	MyPgXact->vacuumFlags = 0;
 	MyProc->lwWaiting = false;
-	MyProc->lwExclusive = false;
+	MyProc->lwWaitMode = 0;
 	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d
index f6030973141fb2e473e5e339aeb7ccc04d343128..8ae8877d993b6a1faa52d6ee93012cbf97e10a54 100644
--- a/src/backend/utils/probes.d
+++ b/src/backend/utils/probes.d
@@ -35,6 +35,8 @@ provider postgresql {
 	probe lwlock__wait__done(LWLockId, LWLockMode);
 	probe lwlock__condacquire(LWLockId, LWLockMode);
 	probe lwlock__condacquire__fail(LWLockId, LWLockMode);
+	probe lwlock__wait__until__free(LWLockId, LWLockMode);
+	probe lwlock__wait__until__free__fail(LWLockId, LWLockMode);
 
 	probe lock__wait__start(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
 	probe lock__wait__done(unsigned int, unsigned int, unsigned int, unsigned int, unsigned int, LOCKMODE);
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index df3df298ae9cbc4927169856f01a13c4a87ccd45..c684964a34fe25cd07646b6d48c4ac87d37cd544 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -94,7 +94,10 @@ typedef enum LWLockId
 typedef enum LWLockMode
 {
 	LW_EXCLUSIVE,
-	LW_SHARED
+	LW_SHARED,
+	LW_WAIT_UNTIL_FREE	/* A special mode used in PGPROC->lwlockMode, when
+						 * waiting for lock to become free. Not to be used
+						 * as LWLockAcquire argument */
 } LWLockMode;
 
 
@@ -105,6 +108,7 @@ extern bool Trace_lwlocks;
 extern LWLockId LWLockAssign(void);
 extern void LWLockAcquire(LWLockId lockid, LWLockMode mode);
 extern bool LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode);
+extern bool LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode);
 extern void LWLockRelease(LWLockId lockid);
 extern void LWLockReleaseAll(void);
 extern bool LWLockHeldByMe(LWLockId lockid);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 9a6696a4ef04845da382e35591a3039002dc7323..415c0935ad611f4d4badcd09bac70554075731f4 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -101,7 +101,7 @@ struct PGPROC
 
 	/* Info about LWLock the process is currently waiting for, if any. */
 	bool		lwWaiting;		/* true if waiting for an LW lock */
-	bool		lwExclusive;	/* true if waiting for exclusive access */
+	uint8		lwWaitMode;		/* lwlock mode being waited for */
 	struct PGPROC *lwWaitLink;	/* next waiter for same LW lock */
 
 	/* Info about lock the process is currently waiting for, if any. */