diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 76f77cb0fcf2f7e63d0d0d3419690db79d71178e..91fa4ca3b7c948d1014d77993639074fb71ac28a 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -14805,6 +14805,12 @@ SELECT * FROM pg_ls_dir('.') WITH ORDINALITY AS t(ls,n); <entry>channel names that the session is currently listening on</entry> </row> + <row> + <entry><literal><function>pg_notification_queue_usage()</function></literal></entry> + <entry><type>double</type></entry> + <entry>fraction of the asynchronous notification queue currently occupied (0-1)</entry> + </row> + <row> <entry><literal><function>pg_my_temp_schema()</function></literal></entry> <entry><type>oid</type></entry> @@ -14945,10 +14951,19 @@ SET search_path TO <replaceable>schema</> <optional>, <replaceable>schema</>, .. <primary>pg_listening_channels</primary> </indexterm> + <indexterm> + <primary>pg_notification_queue_usage</primary> + </indexterm> + <para> <function>pg_listening_channels</function> returns a set of names of - channels that the current session is listening to. See <xref - linkend="sql-listen"> for more information. + asynchronous notification channels that the current session is listening + to. <function>pg_notification_queue_usage</function> returns the + fraction of the total available space for notifications currently + occupied by notifications that are waiting to be processed, as a + <type>double</type> in the range 0-1. + See <xref linkend="sql-listen"> and <xref linkend="sql-notify"> + for more information. </para> <indexterm> diff --git a/doc/src/sgml/ref/notify.sgml b/doc/src/sgml/ref/notify.sgml index ad574e9ea03228f74ef7bc25bc3ccf1846d60c0f..4dd560838b19111bdc48975fa0665f7941b63ee3 100644 --- a/doc/src/sgml/ref/notify.sgml +++ b/doc/src/sgml/ref/notify.sgml @@ -165,6 +165,11 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla cleanup. In this case you should make sure that this session ends its current transaction so that cleanup can proceed. </para> + <para> + The function <function>pg_notification_queue_usage</function> returns the + fraction of the queue that is currently occupied by pending notifications. + See <xref linkend="functions-info"> for more information. + </para> <para> A transaction that has executed <command>NOTIFY</command> cannot be prepared for two-phase commit. diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 2826b7e43c446c13497490a7b629735758a84dcf..3b71174b82640a9118ced2d771a083e59f9603f6 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -371,6 +371,7 @@ static bool asyncQueueIsFull(void); static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); static ListCell *asyncQueueAddEntries(ListCell *nextNotify); +static double asyncQueueUsage(void); static void asyncQueueFillWarning(void); static bool SignalBackends(void); static void asyncQueueReadAllNotifications(void); @@ -1362,26 +1363,37 @@ asyncQueueAddEntries(ListCell *nextNotify) } /* - * Check whether the queue is at least half full, and emit a warning if so. - * - * This is unlikely given the size of the queue, but possible. - * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. + * SQL function to return the fraction of the notification queue currently + * occupied. + */ +Datum +pg_notification_queue_usage(PG_FUNCTION_ARGS) +{ + double usage; + + LWLockAcquire(AsyncQueueLock, LW_SHARED); + usage = asyncQueueUsage(); + LWLockRelease(AsyncQueueLock); + + PG_RETURN_FLOAT8(usage); +} + +/* + * Return the fraction of the queue that is currently occupied. * - * Caller must hold exclusive AsyncQueueLock. + * The caller must hold AysncQueueLock in (at least) shared mode. */ -static void -asyncQueueFillWarning(void) +static double +asyncQueueUsage(void) { int headPage = QUEUE_POS_PAGE(QUEUE_HEAD); int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); int occupied; - double fillDegree; - TimestampTz t; occupied = headPage - tailPage; if (occupied == 0) - return; /* fast exit for common case */ + return (double) 0; /* fast exit for common case */ if (occupied < 0) { @@ -1389,8 +1401,24 @@ asyncQueueFillWarning(void) occupied += QUEUE_MAX_PAGE + 1; } - fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2); + return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2); +} + +/* + * Check whether the queue is at least half full, and emit a warning if so. + * + * This is unlikely given the size of the queue, but possible. + * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. + * + * Caller must hold exclusive AsyncQueueLock. + */ +static void +asyncQueueFillWarning(void) +{ + double fillDegree; + TimestampTz t; + fillDegree = asyncQueueUsage(); if (fillDegree < 0.5) return; diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index 44ce2b3a6200689085a69e1bd19972f0530f0c6d..8f6685fd0cce89d8060dd8468b7575bd85bb412d 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201507021 +#define CATALOG_VERSION_NO 201507171 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 6fd1278d1b2448a3ab120f25a81f801a61f5a962..1d68ad7209e1c65333491a4a646945dfc525f2ab 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -4046,10 +4046,14 @@ DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 0 f f f f t DESCR("get the available time zone names"); DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ )); DESCR("trigger description with pretty-print option"); + +/* asynchronous notifications */ DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ )); DESCR("get the channels that the current backend listens to"); DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ )); DESCR("send a notification event"); +DATA(insert OID = 3296 ( pg_notification_queue_usage PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notification_queue_usage _null_ _null_ _null_ )); +DESCR("get the fraction of the asynchronous notification queue currently in use"); /* non-persistent series generator */ DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 0 f f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ )); diff --git a/src/include/commands/async.h b/src/include/commands/async.h index 8491f4736f701004666e684d4f1966bf545ebaa8..677dcaa3d0332e36fd989e5f783d6e164e82d7a7 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -37,6 +37,7 @@ extern void Async_UnlistenAll(void); /* notify-related SQL functions */ extern Datum pg_listening_channels(PG_FUNCTION_ARGS); extern Datum pg_notify(PG_FUNCTION_ARGS); +extern Datum pg_notification_queue_usage(PG_FUNCTION_ARGS); /* perform (or cancel) outbound notify processing at transaction commit */ extern void PreCommit_Notify(void); diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out new file mode 100644 index 0000000000000000000000000000000000000000..92d281a7d1f3cd6192120c15dd777af39610b7a1 --- /dev/null +++ b/src/test/isolation/expected/async-notify.out @@ -0,0 +1,17 @@ +Parsed test spec with 2 sessions + +starting permutation: listen begin check notify check +step listen: LISTEN a; +step begin: BEGIN; +step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; +nonzero + +f +step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; +count + +1000 +step check: SELECT pg_notification_queue_usage() > 0 AS nonzero; +nonzero + +t diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec new file mode 100644 index 0000000000000000000000000000000000000000..7f451b18a1579e808abb36ba6ed127c3571a12f5 --- /dev/null +++ b/src/test/isolation/specs/async-notify.spec @@ -0,0 +1,14 @@ +# Verify that pg_notification_queue_usage correctly reports a non-zero result, +# after submitting notifications while another connection is listening for +# those notifications and waiting inside an active transaction. + +session "listener" +step "listen" { LISTEN a; } +step "begin" { BEGIN; } +teardown { ROLLBACK; } + +session "notifier" +step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; } +step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; } + +permutation "listen" "begin" "check" "notify" "check" diff --git a/src/test/regress/expected/async.out b/src/test/regress/expected/async.out index ae0d5df3b7825f3cd8c1c8d0fda5a95a729b5a75..19cbe38e636d092b0677c3fba9f1ff273edab81f 100644 --- a/src/test/regress/expected/async.out +++ b/src/test/regress/expected/async.out @@ -32,3 +32,11 @@ NOTIFY notify_async2; LISTEN notify_async2; UNLISTEN notify_async2; UNLISTEN *; +-- Should return zero while there are no pending notifications. +-- src/test/isolation/specs/async-notify.spec tests for actual usage. +SELECT pg_notification_queue_usage(); + pg_notification_queue_usage +----------------------------- + 0 +(1 row) + diff --git a/src/test/regress/sql/async.sql b/src/test/regress/sql/async.sql index af3a904e0f56f29267c95cbf5e7eada8e4b0df31..40f6e015387b5254cfbddcd55b0c67e964d65e3b 100644 --- a/src/test/regress/sql/async.sql +++ b/src/test/regress/sql/async.sql @@ -17,3 +17,7 @@ NOTIFY notify_async2; LISTEN notify_async2; UNLISTEN notify_async2; UNLISTEN *; + +-- Should return zero while there are no pending notifications. +-- src/test/isolation/specs/async-notify.spec tests for actual usage. +SELECT pg_notification_queue_usage();