Skip to content
Snippets Groups Projects
Commit 6e3323d4 authored by Robert Haas's avatar Robert Haas
Browse files

Triggered change notifications.

Kevin Grittner, reviewed (in earlier versions) by Álvaro Herrera
parent c8397bd6
No related branches found
Tags REL8_3_RC1
No related merge requests found
......@@ -45,6 +45,7 @@ SUBDIRS = \
seg \
spi \
tablefunc \
tcn \
test_parser \
tsearch2 \
unaccent \
......
# contrib/tcn/Makefile
MODULES = tcn
EXTENSION = tcn
DATA = tcn--1.0.sql
ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = contrib/tcn
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif
/* contrib/tcn/tcn--1.0.sql */
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
\echo Use "CREATE EXTENSION tcn" to load this file. \quit
CREATE FUNCTION triggered_change_notification()
RETURNS pg_catalog.trigger
AS 'MODULE_PATHNAME'
LANGUAGE C;
/*-------------------------------------------------------------------------
*
* tcn.c
* triggered change notification support for PostgreSQL
*
* Portions Copyright (c) 2011-2012, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* contrib/tcn/tcn.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "executor/spi.h"
#include "commands/async.h"
#include "commands/trigger.h"
#include "lib/stringinfo.h"
#include "utils/rel.h"
#include "utils/syscache.h"
PG_MODULE_MAGIC;
/* forward declarations */
Datum triggered_change_notification(PG_FUNCTION_ARGS);
/*
* Copy from s (for source) to r (for result), wrapping with q (quote)
* characters and doubling any quote characters found.
*/
static void
strcpy_quoted(StringInfo r, const char *s, const char q)
{
appendStringInfoCharMacro(r, q);
while (*s)
{
if (*s == q)
appendStringInfoCharMacro(r, q);
appendStringInfoCharMacro(r, *s);
s++;
}
appendStringInfoCharMacro(r, q);
}
/*
* triggered_change_notification
*
* This trigger function will send a notification of data modification with
* primary key values. The channel will be "tcn" unless the trigger is
* created with a parameter, in which case that parameter will be used.
*/
PG_FUNCTION_INFO_V1(triggered_change_notification);
Datum
triggered_change_notification(PG_FUNCTION_ARGS)
{
TriggerData *trigdata = (TriggerData *) fcinfo->context;
Trigger *trigger;
int nargs;
HeapTuple trigtuple;
Relation rel;
TupleDesc tupdesc;
char *channel;
char operation;
StringInfo payload = makeStringInfo();
bool foundPK;
List *indexoidlist;
ListCell *indexoidscan;
/* make sure it's called as a trigger */
if (!CALLED_AS_TRIGGER(fcinfo))
ereport(ERROR,
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("triggered_change_notification: must be called as trigger")));
/* and that it's called after the change */
if (!TRIGGER_FIRED_AFTER(trigdata->tg_event))
ereport(ERROR,
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("triggered_change_notification: must be called after the change")));
/* and that it's called for each row */
if (!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
ereport(ERROR,
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("triggered_change_notification: must be called for each row")));
if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
operation = 'I';
else if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
operation = 'U';
else if (TRIGGER_FIRED_BY_DELETE(trigdata->tg_event))
operation = 'D';
else
{
elog(ERROR, "triggered_change_notification: trigger fired by unrecognized operation");
operation = 'X'; /* silence compiler warning */
}
trigger = trigdata->tg_trigger;
nargs = trigger->tgnargs;
if (nargs > 1)
ereport(ERROR,
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("triggered_change_notification: must not be called with more than one parameter")));
if (nargs == 0)
channel = "tcn";
else
channel = trigger->tgargs[0];
/* get tuple data */
trigtuple = trigdata->tg_trigtuple;
rel = trigdata->tg_relation;
tupdesc = rel->rd_att;
foundPK = false;
/*
* Get the list of index OIDs for the table from the relcache, and look up
* each one in the pg_index syscache until we find one marked primary key
* (hopefully there isn't more than one such).
*/
indexoidlist = RelationGetIndexList(rel);
foreach(indexoidscan, indexoidlist)
{
Oid indexoid = lfirst_oid(indexoidscan);
HeapTuple indexTuple;
Form_pg_index index;
indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
if (!HeapTupleIsValid(indexTuple)) /* should not happen */
elog(ERROR, "cache lookup failed for index %u", indexoid);
index = (Form_pg_index) GETSTRUCT(indexTuple);
/* we're only interested if it is the primary key */
if (index->indisprimary)
{
int numatts = index->indnatts;
if (numatts > 0)
{
int i;
foundPK = true;
strcpy_quoted(payload, RelationGetRelationName(rel), '"');
appendStringInfoCharMacro(payload, ',');
appendStringInfoCharMacro(payload, operation);
for (i = 0; i < numatts; i++)
{
int colno = index->indkey.values[i];
appendStringInfoCharMacro(payload, ',');
strcpy_quoted(payload, NameStr((tupdesc->attrs[colno - 1])->attname), '"');
appendStringInfoCharMacro(payload, '=');
strcpy_quoted(payload, SPI_getvalue(trigtuple, tupdesc, colno), '\'');
}
Async_Notify(channel, payload->data);
}
ReleaseSysCache(indexTuple);
break;
}
ReleaseSysCache(indexTuple);
}
list_free(indexoidlist);
if (!foundPK)
ereport(ERROR,
(errcode(ERRCODE_E_R_I_E_TRIGGER_PROTOCOL_VIOLATED),
errmsg("triggered_change_notification: must be called on a table with a primary key")));
return PointerGetDatum(NULL); /* after trigger; value doesn't matter */
}
# tcn extension
comment = 'Triggered change notifications'
default_version = '1.0'
module_pathname = '$libdir/tcn'
relocatable = true
......@@ -128,6 +128,7 @@ CREATE EXTENSION <replaceable>module_name</> FROM unpackaged;
&contrib-spi;
&sslinfo;
&tablefunc;
&tcn;
&test-parser;
&tsearch2;
&unaccent;
......
......@@ -136,6 +136,7 @@
<!ENTITY sepgsql SYSTEM "sepgsql.sgml">
<!ENTITY sslinfo SYSTEM "sslinfo.sgml">
<!ENTITY tablefunc SYSTEM "tablefunc.sgml">
<!ENTITY tcn SYSTEM "tcn.sgml">
<!ENTITY test-parser SYSTEM "test-parser.sgml">
<!ENTITY tsearch2 SYSTEM "tsearch2.sgml">
<!ENTITY unaccent SYSTEM "unaccent.sgml">
......
<!-- doc/src/sgml/tcn.sgml -->
<sect1 id="tcn" xreflabel="tcn">
<title>tcn</title>
<indexterm zone="tcn">
<primary>tcn</primary>
</indexterm>
<indexterm zone="tcn">
<primary>triggered_change_notification</primary>
</indexterm>
<para>
The <filename>tcn</> module provides a trigger function that notifies
listeners of changes to any table on which it is attached. It must be
used as an <literal>AFTER</> trigger <literal>FOR EACH ROW</>.
</para>
<para>
Only one parameter may be suupplied to the function in a
<literal>CREATE TRIGGER</> statement, and that is optional. If supplied
it will be used for the channel name for the notifications. If omitted
<literal>tcn</> will be used for the channel name.
</para>
<para>
The payload of the notifications consists of the table name, a letter to
indicate which type of operation was performed, and column name/value pairs
for primary key columns. Each part is separated from the next by a comma.
For ease of parsing using regular expressions, table and column names are
always wrapped in double quotes, and data values are always wrapped in
single quotes. Embeded quotes are doubled.
</para>
<para>
A brief example of using the extension follows.
<programlisting>
test=# create table tcndata
test-# (
test(# a int not null,
test(# b date not null,
test(# c text,
test(# primary key (a, b)
test(# );
NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "tcndata_pkey" for table "tcndata"
CREATE TABLE
test=# create trigger tcndata_tcn_trigger
test-# after insert or update or delete on tcndata
test-# for each row execute procedure triggered_change_notification();
CREATE TRIGGER
test=# listen tcn;
LISTEN
test=# insert into tcndata values (1, date '2012-12-22', 'one'),
test-# (1, date '2012-12-23', 'another'),
test-# (2, date '2012-12-23', 'two');
INSERT 0 3
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",I,"a"='2',"b"='2012-12-23'" received from server process with PID 22770.
test=# update tcndata set c = 'uno' where a = 1;
UPDATE 2
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
Asynchronous notification "tcn" with payload ""tcndata",U,"a"='1',"b"='2012-12-23'" received from server process with PID 22770.
test=# delete from tcndata where a = 1 and b = date '2012-12-22';
DELETE 1
Asynchronous notification "tcn" with payload ""tcndata",D,"a"='1',"b"='2012-12-22'" received from server process with PID 22770.
</programlisting>
</para>
</sect1>
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment