From 54f2b601ef6e55de0e70d2ec95f4cb3250ffbd23 Mon Sep 17 00:00:00 2001 From: "Thomas G. Lockhart" <lockhart@fourpalms.org> Date: Wed, 20 Dec 2000 17:22:35 +0000 Subject: [PATCH] rserv replication toolkit from Vadim Mikheev. --- contrib/rserv/ApplySnapshot.in | 52 +++ contrib/rserv/CleanLog.in | 47 ++ contrib/rserv/GetSyncID.in | 55 +++ contrib/rserv/InitRservTest.in | 259 +++++++++++ contrib/rserv/Makefile | 59 +++ contrib/rserv/MasterAddTable.in | 61 +++ contrib/rserv/MasterInit.in | 107 +++++ contrib/rserv/PrepareSnapshot.in | 56 +++ contrib/rserv/README.rserv | 128 ++++++ contrib/rserv/RServ.pm | 761 +++++++++++++++++++++++++++++++ contrib/rserv/Replicate.in | 100 ++++ contrib/rserv/RservTest.in | 313 +++++++++++++ contrib/rserv/SlaveAddTable.in | 59 +++ contrib/rserv/SlaveInit.in | 65 +++ contrib/rserv/SyncSyncID.in | 48 ++ contrib/rserv/master.sql.in | 101 ++++ contrib/rserv/regress.sh | 32 ++ contrib/rserv/rserv.c | 319 +++++++++++++ contrib/rserv/slave.sql.in | 22 + 19 files changed, 2644 insertions(+) create mode 100644 contrib/rserv/ApplySnapshot.in create mode 100644 contrib/rserv/CleanLog.in create mode 100644 contrib/rserv/GetSyncID.in create mode 100644 contrib/rserv/InitRservTest.in create mode 100644 contrib/rserv/Makefile create mode 100644 contrib/rserv/MasterAddTable.in create mode 100644 contrib/rserv/MasterInit.in create mode 100644 contrib/rserv/PrepareSnapshot.in create mode 100644 contrib/rserv/README.rserv create mode 100644 contrib/rserv/RServ.pm create mode 100644 contrib/rserv/Replicate.in create mode 100644 contrib/rserv/RservTest.in create mode 100644 contrib/rserv/SlaveAddTable.in create mode 100644 contrib/rserv/SlaveInit.in create mode 100644 contrib/rserv/SyncSyncID.in create mode 100644 contrib/rserv/master.sql.in create mode 100755 contrib/rserv/regress.sh create mode 100644 contrib/rserv/rserv.c create mode 100644 contrib/rserv/slave.sql.in diff --git a/contrib/rserv/ApplySnapshot.in b/contrib/rserv/ApplySnapshot.in new file mode 100644 index 00000000000..a3b79e19eea --- /dev/null +++ b/contrib/rserv/ApplySnapshot.in @@ -0,0 +1,52 @@ +# -*- perl -*- +# ApplySnapshot +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use lib "@LIBDIR@"; +use IO::File; +use RServ; +use Getopt::Long; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "help", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; +my $snapshot = $opt_snapshot || "__Snapshot"; + +if (defined($opt_help) || (scalar(@ARGV) < 1)) { + print "Usage: $0 --host=name --user=name --password=string slavedb\n"; + exit ((scalar(@ARGV) < 1)? 1:0); +} + +my $slave = $ARGV[0] || "slave"; + +my $sinfo = "dbname=$slave"; +$sinfo = "$sinfo host=$opt_host" if (defined($opt_host)); +$sinfo = "$sinfo user=$opt_user" if (defined($opt_user)); +$sinfo = "$sinfo password=$opt_password" if (defined($opt_password)); + +my $conn = Pg::connectdb(sinfo); + +my $inpf = new IO::File; +$inpf = STDIN; + +$res = ApplySnapshot ($conn, $inpf); + +if ($res > 0) +{ + printf STDERR "Snapshot applied\n"; +} +elsif ($res != 0) +{ + printf STDERR "ERROR\n"; + exit(1); +} + +exit(0); diff --git a/contrib/rserv/CleanLog.in b/contrib/rserv/CleanLog.in new file mode 100644 index 00000000000..6fa1253de2d --- /dev/null +++ b/contrib/rserv/CleanLog.in @@ -0,0 +1,47 @@ +# -*- perl -*- +# CleanLog +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use lib "@LIBDIR@"; + +use Getopt::Long; +use RServ; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "help", "snapshot=s", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; +my $snapshot = $opt_snapshot || "__Snapshot"; + +if (defined($opt_help) || (scalar(@ARGV) < 2) || ($ARGV[1] !~ /^\d+$/)) { + print "Usage: $PROGRAM_NAME --host=name --user=name --password=string masterdb syncid\n"; + exit ((scalar(@ARGV) < 2)? 1: 0); +} + +my $dbname = $ARGV[0]; +my $howold = $ARGV[1]; + +my $minfo = "dbname=$dbname"; +$minfo = "$minfo host=$opt_host" if (defined($opt_host)); +$minfo = "$minfo user=$opt_user" if (defined($opt_user)); +$minfo = "$minfo password=$opt_password" if (defined($opt_password)); + +print "Master connection is $minfo\n" if ($debug); +print "Slave connection is $sinfo\n" if ($debug); + +my $conn = Pg::connectdb($minfo); + +$res = CleanLog($conn, $howold); + +exit(1) if $res < 0; + +printf STDERR "Deleted %d log records\n", $res if $res > 0; + +exit(0); diff --git a/contrib/rserv/GetSyncID.in b/contrib/rserv/GetSyncID.in new file mode 100644 index 00000000000..2ffe7d3c547 --- /dev/null +++ b/contrib/rserv/GetSyncID.in @@ -0,0 +1,55 @@ +# -*- perl -*- +# GetSyncID +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use lib "@LIBDIR@"; + +use Pg; +use Getopt::Long; +use RServ; + +$| = 1; + +my $verbose = 1; + +$result = GetOptions("debug!", "verbose!", "help", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose if (defined($opt_verbose)); + +if (defined($opt_help) || (scalar(@ARGV) < 1)) { + print "Usage: $0 --host=name --user=name --password=string slavedb\n"; + exit ((scalar(@ARGV) < 1)? 1: 0); +} + +my $dbname = $ARGV[0]; + +my $sinfo = "dbname=$dbname"; +$sinfo = "$sinfo host=$opt_host" if (defined($opt_host)); +$sinfo = "$sinfo user=$opt_user" if (defined($opt_user)); +$sinfo = "$sinfo password=$opt_password" if (defined($opt_password)); + +if ($verbose) { print "Connecting to '$sinfo'\n" }; +my $conn = Pg::connectdb($sinfo); + +$res = GetSyncID($conn); + +die "ERROR\n" if $res < 0; + +if (! defined $res) +{ + printf STDERR "No SyncID found\n"; +} +else +{ + print("Last SyncID applied: ") if ($verbose); + printf "%d", $res; + print("\n") if ($verbose); +} + +exit(0); diff --git a/contrib/rserv/InitRservTest.in b/contrib/rserv/InitRservTest.in new file mode 100644 index 00000000000..414744c6c01 --- /dev/null +++ b/contrib/rserv/InitRservTest.in @@ -0,0 +1,259 @@ +#!/bin/sh +# InitRservTest +# erServer demonstration implementation +# (c) 2000 Vadim Mikheev, PostgreSQL Inc. + +[ -n "$RSERV_PERL" ] || RSERV_PERL=@LIBDIR@ +[ -n "$RSERV_SQL" ] || RSERV_SQL=@SQLDIR@ +[ -n "$RSERV_BIN" ] || RSERV_BIN=@BINDIR@ +export RSERV_PERL +export RSERV_SQL +export RSERV_BIN + +pargs= + +while [[ $1 == -* ]]; do + case "$1" in + --user) + shift + pargs="$pargs -U $1" + ;; + --host) + shift + pargs="$pargs -h $1" + ;; + *) + echo "Usage: $0 --user name --host name masterdb slavedb" + exit 1 + ;; + esac + shift +done + +masterdb=$1 +slavedb=$2 + +[ "${masterdb}" != "" ] || masterdb=master +[ "${slavedb}" != "" ] || slavedb=slave + +echo "Master -> $masterdb" +echo "Slave -> $slavedb" + +############################################################################ + +fill() +{ + table="create table test (i text, k int, l int); +copy test from stdin; +Line: 1 1 1 +Line: 2 2 2 +Line: 3 3 3 +Line: 4 4 4 +Line: 5 5 5 +Line: 6 6 6 +Line: 7 7 7 +Line: 8 8 8 +Line: 9 9 9 +Line: 10 10 10 +Line: 11 11 11 +Line: 12 12 12 +Line: 13 13 13 +Line: 14 14 14 +Line: 15 15 15 +Line: 16 16 16 +Line: 17 17 17 +Line: 18 18 18 +Line: 19 19 19 +Line: 20 20 20 +Line: 21 21 21 +Line: 22 22 22 +Line: 23 23 23 +Line: 24 24 24 +Line: 25 25 25 +Line: 26 26 26 +Line: 27 27 27 +Line: 28 28 28 +Line: 29 29 29 +Line: 30 30 30 +Line: 31 31 31 +Line: 32 32 32 +Line: 33 33 33 +Line: 34 34 34 +Line: 35 35 35 +Line: 36 36 36 +Line: 37 37 37 +Line: 38 38 38 +Line: 39 39 39 +Line: 40 40 40 +Line: 41 41 41 +Line: 42 42 42 +Line: 43 43 43 +Line: 44 44 44 +Line: 45 45 45 +Line: 46 46 46 +Line: 47 47 47 +Line: 48 48 48 +Line: 49 49 49 +Line: 50 50 50 +Line: 51 51 51 +Line: 52 52 52 +Line: 53 53 53 +Line: 54 54 54 +Line: 55 55 55 +Line: 56 56 56 +Line: 57 57 57 +Line: 58 58 58 +Line: 59 59 59 +Line: 60 60 60 +Line: 61 61 61 +Line: 62 62 62 +Line: 63 63 63 +Line: 64 64 64 +Line: 65 65 65 +Line: 66 66 66 +Line: 67 67 67 +Line: 68 68 68 +Line: 69 69 69 +Line: 70 70 70 +Line: 71 71 71 +Line: 72 72 72 +Line: 73 73 73 +Line: 74 74 74 +Line: 75 75 75 +Line: 76 76 76 +Line: 77 77 77 +Line: 78 78 78 +Line: 79 79 79 +Line: 80 80 80 +Line: 81 81 81 +Line: 82 82 82 +Line: 83 83 83 +Line: 84 84 84 +Line: 85 85 85 +Line: 86 86 86 +Line: 87 87 87 +Line: 88 88 88 +Line: 89 89 89 +Line: 90 90 90 +Line: 91 91 91 +Line: 92 92 92 +Line: 93 93 93 +Line: 94 94 94 +Line: 95 95 95 +Line: 96 96 96 +Line: 97 97 97 +Line: 98 98 98 +Line: 99 99 99 +Line: 100 100 100 +\\."; + echo "$table" | psql $pargs $1 || exit + if [ "$1" = "$masterdb" ] + then + rm -rf __tmpf__ + psql $pargs -c "select * into table testoid from test" $1 || exit + psql $pargs -c "copy testoid with oids to '`pwd`/__tmpf__'" $1 || exit + psql $pargs -c "select * into table teststr from test" $1 || exit + else + psql $pargs -c "select * into table testoid from test where k < 0" $1 || exit + psql $pargs -c "copy testoid with oids from '`pwd`/__tmpf__'" $1 || exit + psql $pargs -c "select * into table teststr from test" $1 || exit + rm -rf __tmpf__ + fi + psql $pargs -c "create unique index i_test on test (k)" $1 || exit + psql $pargs -c "create unique index i_testoid on testoid (oid)" $1 || exit + psql $pargs -c "create unique index i_teststr on teststr (i)" $1 || exit + psql $pargs -c vacuum $1 || exit +} + +############################################################################ + +echo +echo +echo ' ATTENTION' +echo +echo This script will destroy databases with names MASTER and SLAVE +echo +echo -n "Are you going to continue ? [Y/N] " + +read answ + +case $answ in + Y*|y*) + ;; + *) + exit + ;; +esac + +echo +echo + +sql="drop database $masterdb" +echo $sql +psql $pargs -c "$sql" template1 +sql="create database $masterdb" +echo $sql +psql $pargs -c "$sql" template1 || exit + +echo Setup master system +psql $pargs $masterdb < $RSERV_SQL/master.sql || exit + +echo Wait for template1 to become available... +sleep 1 + +sql="drop database $slavedb" +echo $sql +psql $pargs -c "$sql" template1 +sql="create database $slavedb" +echo $sql +psql $pargs -c "$sql" template1 || exit + +echo Setup slave system +psql $pargs $slavedb < $RSERV_SQL/slave.sql || exit + +echo Create and fill test, testoid and teststr tables in master db +fill $masterdb +echo +echo Register test, testoid and teststr tables for replication on master +echo +$RSERV_BIN/MasterAddTable $masterdb test k +$RSERV_BIN/MasterAddTable $masterdb testoid oid +$RSERV_BIN/MasterAddTable $masterdb teststr i + +echo Create and fill test, testoid and teststr tables in slave db +fill $slavedb +echo +echo Register test, testoid and teststr tables for replication on slave +echo +$RSERV_BIN/SlaveAddTable $slavedb test k +$RSERV_BIN/SlaveAddTable $slavedb testoid oid +$RSERV_BIN/SlaveAddTable $slavedb teststr i + +echo +echo +echo +echo +echo +echo +echo +echo +echo " Now make changes in $masterdb db and run" +echo +echo " Replicate $masterdb $slavedb" +echo +echo " to replicate the master on the slave." +echo +echo " You may also use the RservTest tcl utility" +echo " to demonstrate this functionality." +echo +echo +echo +echo +echo +echo +echo +echo + +exit + +############################################################################ diff --git a/contrib/rserv/Makefile b/contrib/rserv/Makefile new file mode 100644 index 00000000000..8d933f006e0 --- /dev/null +++ b/contrib/rserv/Makefile @@ -0,0 +1,59 @@ +# Makefile for erServer demonstration implementation +# (c) 2000 Vadim Mikheev, PostgreSQL Inc. + +#vpath %.pl perl +#vpath %.pm perl + +subdir = contrib/rserv +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global + +NAME = rserv +OBJS = $(NAME).o +DOCS = README.$(NAME) +SQLS = master.sql slave.sql +TCLS = RservTest +PERLS = MasterInit SlaveInit MasterAddTable SlaveAddTable Replicate CleanLog +PERLS += PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID +LIBS = RServ.pm +SCRIPTS = InitRservTest +MODS = $(OBJS:.o=$(DLSUFFIX)) + +override CPPFLAGS += -I$(srcdir) +override CFLAGS += $(CFLAGS_SL) + +INPUTFILES = $(wildcard *.in) +CLEANFILES = $(INPUTFILES:.in=) +CLEANFILES += $(OBJS) $(MODS) + +.PHONY: all install installdirs tarball + +all: $(SQLS) $(TCLS) $(PERLS) $(SCRIPTS) $(MODS) + +install: all installdirs + $(INSTALL_DATA) $(SQLS) $(libdir)/contrib + $(INSTALL_SCRIPT) $(TCLS) $(PERLS) $(SCRIPTS) $(bindir) + $(INSTALL_SCRIPT) $(LIBS) $(libdir)/contrib + $(INSTALL_SHLIB) $(MODS) $(libdir)/contrib + $(INSTALL_DATA) $(DOCS) $(docdir)/contrib/$(NAME) + +installdirs: + $(mkinstalldirs) $(datadir)/contrib $(libdir)/contrib $(docdir)/contrib/$(NAME) + +%.sql: %.sql.in + rm -f $@; \ + C=`pwd`; \ + sed -e "s:_OBJWD_:$(libdir)/contrib:g" \ + -e "s:_DLSUFFIX_:$(DLSUFFIX):g" < $< > $@ + +%: %.in + sed -e "s:_OBJWD_:$(libdir)/contrib:g" \ + -e "s:_DLSUFFIX_:$(DLSUFFIX):g" \ + -e "s:@SQLDIR@:$(libdir)/contrib:g" \ + -e "s:@BINDIR@:$(bindir):g" \ + -e "s:@LIBDIR@:$(libdir)/contrib:g" < $< > $@ + chmod 775 $@ + +clean: +# @echo "Removing $(CLEANFILES)" + rm -f $(CLEANFILES) diff --git a/contrib/rserv/MasterAddTable.in b/contrib/rserv/MasterAddTable.in new file mode 100644 index 00000000000..3d92b47cbd4 --- /dev/null +++ b/contrib/rserv/MasterAddTable.in @@ -0,0 +1,61 @@ +# -*- perl -*- +# MasterAddTable +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Pg; +use Getopt::Long; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "help", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; + +if (defined($opt_help) || (scalar(@ARGV) < 3)) { + print "Usage: $0 --host=name --user=name --password=string masterdb table column\n"; + exit ((scalar(@ARGV) < 3)? 1: 0); +} + +my $dbname = $ARGV[0]; +my $table = $ARGV[1]; +my $keyname = $ARGV[2]; + +my $minfo = "dbname=$dbname"; +$minfo = "$minfo host=$opt_host" if (defined($opt_host)); +$minfo = "$minfo user=$opt_user" if (defined($opt_user)); +$minfo = "$minfo password=$opt_password" if (defined($opt_password)); + +my $conn = Pg::connectdb($minfo); + +my $result = $conn->exec("BEGIN"); +die $conn->errorMessage if $result->resultStatus ne PGRES_COMMAND_OK; + +$result = $conn->exec("select pgc.oid, pga.attnum from pg_class pgc" . + ", pg_attribute pga where pgc.relname = '$table'" . + " and pgc.oid = pga.attrelid" . + " and pga.attname = '$keyname'"); +die $conn->errorMessage if $result->resultStatus ne PGRES_TUPLES_OK; + +my @row = $result->fetchrow; +die "Can't find table/key\n" if ! defined $row[0] || ! defined $row[1]; + +$result = $conn->exec("create trigger _RSERV_TRIGGER_T_ after" . + " insert or update or delete on $table" . + " for each row execute procedure" . + " _rserv_log_('$row[1]')"); +die $conn->errorMessage if $result->resultStatus ne PGRES_COMMAND_OK; + +$result = $conn->exec("insert into _RSERV_TABLES_ (tname, cname, reloid, key)" . + " values ('$table', '$keyname', $row[0], $row[1])"); +die $conn->errorMessage if $result->resultStatus ne PGRES_COMMAND_OK; + +$result = $conn->exec("COMMIT"); +die $conn->errorMessage if $result->resultStatus ne PGRES_COMMAND_OK; + +exit(0); diff --git a/contrib/rserv/MasterInit.in b/contrib/rserv/MasterInit.in new file mode 100644 index 00000000000..9464424fa2d --- /dev/null +++ b/contrib/rserv/MasterInit.in @@ -0,0 +1,107 @@ +# -*- perl -*- +# MasterInit +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Pg; +use Getopt::Long; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "help", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; + +if (defined($opt_help) || (scalar(@ARGV) < 1)) { + print "Usage: $0 --host=name --user=name --password=string masterdb\n"; + exit ((scalar(@ARGV) < 1)? 1:0); +} + +my $master = $ARGV[0] || "master"; + +my $minfo = "dbname=$master"; +$minfo = "$minfo host=$opt_host" if (defined($opt_host)); +$minfo = "$minfo user=$opt_user" if (defined($opt_user)); +$minfo = "$minfo password=$opt_password" if (defined($opt_password)); + +sub RollbackAndQuit { + $conn = shift @_; + + print STDERR "Error in query: ", $conn->errorMessage; + $conn->exec("ROLLBACK"); + exit (-1); +} + +my $conn = Pg::connectdb($minfo); +if ($conn->status != PGRES_CONNECTION_OK) { + print STDERR "Failed opening $minfo\n"; + exit 1; +} + +my $result = $conn->exec("BEGIN"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("set transaction isolation level serializable"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +# List of slave servers +$result = $conn->exec("create table _RSERV_SERVERS_" . + " (server serial, host text, post int4, dbase text)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +# List of replicated tables +$result = $conn->exec("create table _RSERV_TABLES_" . + " (tname name, cname name, reloid oid, key int4)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +# Bookkeeping log for row replication +$result = $conn->exec("create table _RSERV_LOG_" . + " (reloid oid, logid int4, logtime timestamp, deleted int4, key text)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +# This is to speedup lookup of deleted tuples +$result = $conn->exec("create index _RSERV_LOG_INDX_DLT_ID_ on _RSERV_LOG_ (deleted, logid)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +# This is to speedup cleanup +$result = $conn->exec("create index _RSERV_LOG_INDX_TM_ID_ on _RSERV_LOG_ (logtime, logid)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +# This is to speedup trigger and lookup of updated tuples +$result = $conn->exec("create index _RSERV_LOG_INDX_REL_KEY_ on _RSERV_LOG_ (reloid, key)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +# Sync point for each slave server +$result = $conn->exec("create table _RSERV_SYNC_" . + " (server int4, syncid int4, synctime timestamp" . + ", status int4, minid int4, maxid int4, active text)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("create index _RSERV_SYNC_INDX_SRV_ID_ on _RSERV_SYNC_ (server, syncid)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +# Sync point reference numbers +$result = $conn->exec("create sequence _rserv_sync_seq_"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("CREATE FUNCTION _rserv_log_() RETURNS opaque" . + " AS '_OBJWD_/rserv_DLSUFFIX_' LANGUAGE 'c'"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("CREATE FUNCTION _rserv_sync_(int4) RETURNS int4" . + " AS '_OBJWD_/rserv_DLSUFFIX_' LANGUAGE 'c'"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("CREATE FUNCTION _rserv_debug_(int4) RETURNS int4" . + " AS '_OBJWD_/rserv_DLSUFFIX_' LANGUAGE 'c'"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("COMMIT"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +exit (0); diff --git a/contrib/rserv/PrepareSnapshot.in b/contrib/rserv/PrepareSnapshot.in new file mode 100644 index 00000000000..fe57ab949cc --- /dev/null +++ b/contrib/rserv/PrepareSnapshot.in @@ -0,0 +1,56 @@ +# -*- perl -*- +# PrepareSnapshot +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use lib "@LIBDIR@"; + +use IO::File; +use Getopt::Long; +use RServ; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "help", "snapshot=s", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; +my $snapshot = $opt_snapshot || "__Snapshot"; + +if (defined($opt_help) || (scalar(@ARGV) < 1)) { + print "Usage: $0 --snapshot=file --host=name --user=name --password=string masterdb\n"; + exit ((scalar(@ARGV) < 1)? 1:0); +} + +my $master = $ARGV[0] || "master"; +my $server = 0; + +my $minfo = "dbname=$master"; +$minfo = "$minfo host=$opt_host" if (defined($opt_host)); +$minfo = "$minfo user=$opt_user" if (defined($opt_user)); +$minfo = "$minfo password=$opt_password" if (defined($opt_password)); + +my $conn = Pg::connectdb($minfo); + +my $outf = new IO::File; +$outf = STDOUT; + +$res = PrepareSnapshot ($conn, $outf, $server); + +if ($res == 0) +{ + printf STDERR "Sync-ed\n"; + exit(0); +} +if ($res > 0) +{ + printf STDERR "Snapshot dumped to STDOUT\n"; + exit(0); +} + +printf STDERR "ERROR\n"; +exit(1); diff --git a/contrib/rserv/README.rserv b/contrib/rserv/README.rserv new file mode 100644 index 00000000000..5ccd87b3459 --- /dev/null +++ b/contrib/rserv/README.rserv @@ -0,0 +1,128 @@ +erServer demonstration implementation +(c) 2000, Vadim Mikheev and Thomas Lockhart, PostgreSQL Inc. + +Version 0.1: + Replicates a master database to a single slave database. + Tested under Linux (Mandrake 7.2). + +Requirements: + +- PostgreSQL >= 7.0.X + A separate Makefile is required for PostgreSQL 7.0.x and earlier +- Perl5 and the PostgreSQL perl interface +- TCL and the PostgreSQL tcl interface (for demo only) + + +How to compile: + +- make all +- make install + +Scripts and libraries are installed in places which are consistant +with the way other contrib/ code is installed; underneath the core +items in a contrib/ directory. + + +The toolset: + +MasterInit dbname + sets up structures and user-defined functions for a master + database. + +SlaveInit dbname + sets up structures for a slave database. Does not include triggers, + but only bookkeeping tables. + +MasterAddTable dbname table column + sets up triggers for the specified table and column. Note that this + column must be updated for replication to occur. + +SlaveAddTable dbname table column + sets up bookkeeping for the specified table and column. + +Replicate masterdb slavedb + actually replicate changes from a master to single slave. Note that + this must be repeated to replicate to multiple slaves, but the + bookkeeping for each slave is handled separately so each can be + updated at different times and with different sets of changes. + +GetSyncID [--noverbose] slavedb + returns the last syncid the specified slave has seen. May be used + in conjunction with CleanLog using the --noverbose option. + +CleanLog masterdb syncid + removes obsolete entries in the master database replication log + table up to the specified replication sequence number. + +Other utilities: + +PrepareSnapshot + build a file of replication information from the specified masterdb. + +ApplySnapshot + use a file of replication information to apply to the specified + slavedb. + + +How to run a demo: + +Run the InitRservTest script. It will create two local databases +'master' & 'slave' with table 'test' in them. It accepts the following +arguments: + --help + Print a usage message and quit. + --user name + Access the database with another username. + --host name + Access a remote database. Note that the shared library *must* be + visible in the same path as installed on the build machine. + masterdb + slavedb + Names of test databases. Defaults to 'master' and 'slave', + respectively. + +Once the test databases are set up, simply updating the master table +is sufficient to log a replication event. You can update the table +test then run "Replicate master slave", or you can use the demo +program RservTest. + +Run the tcl/tk GUI demo program, RservTest. It has a single window, +which has four buttons and three data entry boxes. + + -------------------------------------------------- + | PostgreSQL Asynchronous Replication | + | Master Slave | + | < master > < slave > | + | | + | [ Update ] < > | + | [ Replicate ] | + | [ Show ] ____________ | + | | + | [ Quit ] | + | | + -------------------------------------------------- + +The demo has the following behaviors: + +If you enter a string into the data entry field to the right of +[Update], then that string will be used to either update the master +database or to query the slave database. + +If you click [Update], then the string in the data entry box will be +entered into the master database. + +If you click [Replicate], then all changes since the last replication +will be propagated to the slave database. + +If you click [Show], then the slave database will be queried to find +the string in the data entry box to the right of the [Update] +button. If the string does not (yet) exist in the slave database, then +"n/a" will appear to the right of the [Show] button. If the string +does exist in the slave database, then it will be printed to the right +of the [Show] button. + + +Todo: +1. Support for multiple slave servers. +2. Explicit support for master/slave failover. +3. More docs. diff --git a/contrib/rserv/RServ.pm b/contrib/rserv/RServ.pm new file mode 100644 index 00000000000..de0f037cbe0 --- /dev/null +++ b/contrib/rserv/RServ.pm @@ -0,0 +1,761 @@ +# -*- perl -*- +# RServ.pm +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +package RServ; + +require Exporter; +@ISA = qw(Exporter); +@EXPORT = qw(PrepareSnapshot ApplySnapshot GetSyncID SyncSyncID CleanLog); +@EXPORT_OK = qw(); + +use Pg; + +$debug = 0; +$quiet = 1; + +my %Mtables = (); +my %Stables = (); + +sub PrepareSnapshot +{ + my ($conn, $outf, $server) = @_; # (@_[0], @_[1], @_[2]); + + my $result = $conn->exec("BEGIN"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + $result = $conn->exec("set transaction isolation level serializable"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + # MAP oid --> tabname, keyname + $result = $conn->exec("select pgc.oid, pgc.relname, pga.attname" . + " from _RSERV_TABLES_ rt, pg_class pgc, pg_attribute pga" . + " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" . + " and pga.attnum = rt.key"); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + my @row; + while (@row = $result->fetchrow) + { + # printf "$row[0], $row[1], $row[2]\n"; + push @{$Mtables{$row[0]}}, $row[1], $row[2]; + } + + # Read last succeeded sync + $sql = "select syncid, synctime, minid, maxid, active from _RSERV_SYNC_" . + " where server = $server and syncid = (select max(syncid) from" . + " _RSERV_SYNC_ where server = $server and status > 0)"; + + printf "$sql\n" if $debug; + + $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + my @lastsync = $result->fetchrow; + + my $sinfo = ""; + if ($lastsync[3] ne '') # sync info + { + $sinfo = "and (l.logid >= $lastsync[3]"; + $sinfo .= " or l.logid in ($lastsync[4])" if $lastsync[4] ne ''; + $sinfo .= ")"; + } + + my $havedeal = 0; + + # DELETED rows + $sql = "select l.reloid, l.key from _RSERV_LOG_ l" . + " where l.deleted = 1 $sinfo order by l.reloid"; + + printf "$sql\n" if $debug; + + $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + $lastoid = ''; + while (@row = $result->fetchrow) + { + next unless exists $Mtables{$row[0]}; + if ($lastoid != $row[0]) + { + if ($lastoid eq '') + { + my $syncid = GetSYNCID($conn, $outf); + return($syncid) if $syncid < 0; + $havedeal = 1; + } + else + { + printf $outf "\\.\n"; + } + printf $outf "-- DELETE $Mtables{$row[0]}[0]\n"; + $lastoid = $row[0]; + } + if (! defined $row[1]) + { + print STDERR "NULL key\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + printf $outf "%s\n", OutputValue($row[1]); + } + printf $outf "\\.\n" if $lastoid ne ''; + + # UPDATED rows + + my ($taboid, $tabname, $tabkey); + foreach $taboid (keys %Mtables) + { + ($tabname, $tabkey) = @{$Mtables{$taboid}}; + my $oidkey = ($tabkey eq 'oid') ? "_$tabname.oid," : ''; + $sql = sprintf "select $oidkey _$tabname.* from _RSERV_LOG_ l," . + " $tabname _$tabname where l.reloid = $taboid and l.deleted = 0 $sinfo" . + " and l.key = _$tabname.${tabkey}::text"; + + printf "$sql\n" if $debug; + + $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + printf $outf "-- ERROR\n" if $havedeal; + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + next if $result->ntuples <= 0; + if (! $havedeal) + { + my $syncid = GetSYNCID($conn, $outf); + return($syncid) if $syncid < 0; + $havedeal = 1; + } + printf $outf "-- UPDATE $tabname\n"; + while (@row = $result->fetchrow) + { + for ($i = 0; $i <= $#row; $i++) + { + printf $outf " " if $i; + printf $outf "%s", OutputValue($row[$i]); + } + printf $outf "\n"; + } + printf $outf "\\.\n"; + } + + unless ($havedeal) + { + $conn->exec("ROLLBACK"); + return(0); + } + + # Remember this snapshot info + $result = $conn->exec("select _rserv_sync_($server)"); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + printf $outf "-- ERROR\n"; + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + $result = $conn->exec("COMMIT"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + printf $outf "-- ERROR\n"; + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + printf $outf "-- OK\n"; + + return(1); + +} + +sub OutputValue +{ + my ($val) = @_; # @_[0]; + + return("\\N") unless defined $val; + + $val =~ s/\\/\\\\/g; + $val =~ s/ /\\011/g; + $val =~ s/\n/\\012/g; + $val =~ s/\'/\\047/g; + + return($val); +} + +# Get syncid for new snapshot +sub GetSYNCID +{ + my ($conn, $outf) = @_; # (@_[0], @_[1]); + + my $result = $conn->exec("select nextval('_rserv_sync_seq_')"); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + my @row = $result->fetchrow; + + printf $outf "-- SYNCID $row[0]\n"; + return($row[0]); +} + + +sub CleanLog +{ + my ($conn, $howold) = @_; # (@_[0], @_[1]); + + my $result = $conn->exec("BEGIN"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + my $sql = "select rs.maxid, rs.active from _RSERV_SYNC_ rs" . + " where rs.syncid = (select max(rs2.syncid) from _RSERV_SYNC_ rs2" . + " where rs2.server = rs.server and rs2.status > 0) order by rs.maxid"; + + printf "$sql\n" if $debug; + + $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + return(-1); + } + my $maxid = ''; + my %active = (); + while (my @row = $result->fetchrow) + { + $maxid = $row[0] if $maxid eq ''; + last if $row[0] > $maxid; + my @ids = split(/[ ]+,[ ]+/, $row[1]); + foreach $aid (@ids) + { + $active{$aid} = 1 unless exists $active{$aid}; + } + } + if ($maxid eq '') + { + print STDERR "No Sync IDs\n" unless ($quiet); + return(0); + } + my $alist = join(',', keys %active); + my $sinfo = "logid < $maxid"; + $sinfo .= " and logid not in ($alist)" if $alist ne ''; + + $sql = "delete from _RSERV_LOG_ where " . + "logtime < now() - '$howold second'::interval and $sinfo"; + + printf "$sql\n" if $debug; + + $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + $maxid = $result->cmdTuples; + + $result = $conn->exec("COMMIT"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + return($maxid); +} + +sub ApplySnapshot +{ + my ($conn, $inpf) = @_; # (@_[0], @_[1]); + + my $result = $conn->exec("BEGIN"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + $result = $conn->exec("SET CONSTRAINTS ALL DEFERRED"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + # MAP name --> oid, keyname, keynum + my $sql = "select pgc.oid, pgc.relname, pga.attname, rt.key" . + " from _RSERV_SLAVE_TABLES_ rt, pg_class pgc, pg_attribute pga" . + " where pgc.oid = rt.reloid and pga.attrelid = rt.reloid" . + " and pga.attnum = rt.key"; + $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + while (@row = $result->fetchrow) + { + # printf " %s %s\n", $row[1], $row[0]; + push @{$Stables{$row[1]}}, $row[0], $row[2], $row[3]; + } + + my $ok = 0; + my $syncid = ''; + while(<$inpf>) + { + $_ =~ s/\n//; + my ($cmt, $cmd, $prm) = split (/[ ]+/, $_, 3); + if ($cmt ne '--') + { + printf STDERR "Invalid format\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + if ($cmd eq 'DELETE') + { + if ($syncid eq '') + { + printf STDERR "Sync ID unspecified\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + $result = DoDelete($conn, $inpf, $prm); + if ($result) + { + $conn->exec("ROLLBACK"); + return($result); + } + } + elsif ($cmd eq 'UPDATE') + { + if ($syncid eq '') + { + printf STDERR "Sync ID unspecified\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + $result = DoUpdate($conn, $inpf, $prm); + if ($result) + { + $conn->exec("ROLLBACK"); + return($result); + } + } + elsif ($cmd eq 'SYNCID') + { + if ($syncid ne '') + { + printf STDERR "Second Sync ID ?!\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + if ($prm !~ /^\d+$/) + { + printf STDERR "Invalid Sync ID $prm\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + $syncid = $prm; + + printf STDERR "Sync ID $syncid\n" unless ($quiet); + + $result = $conn->exec("select syncid, synctime from " . + "_RSERV_SLAVE_SYNC_ where syncid = " . + "(select max(syncid) from _RSERV_SLAVE_SYNC_)"); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + my @row = $result->fetchrow; + if (! defined $row[0]) + { + $result = $conn->exec("insert into" . + " _RSERV_SLAVE_SYNC_(syncid, synctime) values ($syncid, now())"); + } + elsif ($row[0] >= $prm) + { + printf STDERR "Sync-ed to ID $row[0] ($row[1])\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(0); + } + else + { + $result = $conn->exec("update _RSERV_SLAVE_SYNC_" . + " set syncid = $syncid, synctime = now()"); + } + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + } + elsif ($cmd eq 'OK') + { + $ok = 1; + last; + } + elsif ($cmd eq 'ERROR') + { + printf STDERR "ERROR signaled\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + else + { + printf STDERR "Unknown command $cmd\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + } + + if (! $ok) + { + printf STDERR "No OK flag in input\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(-2); + } + + $result = $conn->exec("COMMIT"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + return(1); +} + +sub DoDelete +{ + my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); + + my $ok = 0; + while(<$inpf>) + { + if ($_ !~ /\n$/) + { + printf STDERR "Invalid format\n" unless ($quiet); + return(-2); + } + my $key = $_; + $key =~ s/\n//; + if ($key eq '\.') + { + $ok = 1; + last; + } + + my $sql = "delete from $tabname where $Stables{$tabname}->[1] = '$key'"; + + printf "$sql\n" if $debug; + + my $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + return(-1); + } + } + + if (! $ok) + { + printf STDERR "No end of input in DELETE section\n" unless ($quiet); + return(-2); + } + + return(0); +} + + +sub DoUpdate +{ + my ($conn, $inpf, $tabname) = @_; # (@_[0], @_[1], @_[2]); + my $oidkey = ($Stables{$tabname}->[2] < 0) ? 1 : 0; + + my @CopyBuf = (); + my $CBufLen = 0; + my $CBufMax = 16 * 1024 * 1024; # max size of buf for copy + + my $sql = "select attnum, attname from pg_attribute" . + " where attrelid = $Stables{$tabname}->[0] and attnum > 0"; + + my $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + return(-1); + } + + my @anames = (); + while (@row = $result->fetchrow) + { + $anames[$row[0]] = $row[1]; + } + + my $istring; + my $ok = 0; + while(<$inpf>) + { + if ($_ !~ /\n$/) + { + printf STDERR "Invalid format\n" unless ($quiet); + return(-2); + } + $istring = $_; + $istring =~ s/\n//; + if ($istring eq '\.') + { + $ok = 1; + last; + } + my @vals = split(/ /, $istring); + if ($oidkey) + { + if ($vals[0] !~ /^\d+$/ || $vals[0] <= 0) + { + printf STDERR "Invalid OID\n" unless ($quiet); + return(-2); + } + $oidkey = $vals[0]; + } + else + { + unshift @vals, ''; + } + + $sql = "update $tabname set "; + my $ocnt = 0; + for (my $i = 1; $i <= $#anames; $i++) + { + if ($vals[$i] eq '\N') + { + if ($i == $Stables{$tabname}->[2]) + { + printf STDERR "NULL key\n" unless ($quiet); + return(-2); + } + $vals[$i] = 'null'; + } + else + { + $vals[$i] = "'" . $vals[$i] . "'"; + next if $i == $Stables{$tabname}->[2]; + } + $ocnt++; + $sql .= ', ' if $ocnt > 1; + $sql .= "$anames[$i] = $vals[$i]"; + } + if ($oidkey) + { + $sql .= " where $Stables{$tabname}->[1] = $oidkey"; + } + else + { + $sql .= " where $Stables{$tabname}->[1] = $vals[$Stables{$tabname}->[2]]"; + } + + printf "$sql\n" if $debug; + + $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + return(-1); + } + next if $result->cmdTuples == 1; # updated + + if ($result->cmdTuples > 1) + { + printf STDERR "Duplicate keys\n" unless ($quiet); + return(-2); + } + + # no key - copy + push @CopyBuf, "$istring\n"; + $CBufLen += length($istring); + + if ($CBufLen >= $CBufMax) + { + $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); + return($result) if $result; + @CopyBuf = (); + $CBufLen = 0; + } + } + + if (! $ok) + { + printf STDERR "No end of input in UPDATE section\n" unless ($quiet); + return(-2); + } + + if ($CBufLen) + { + $result = DoCopy($conn, $tabname, $oidkey, \@CopyBuf); + return($result) if $result; + } + + return(0); +} + + +sub DoCopy +{ + my ($conn, $tabname, $withoids, $CBuf) = @_; # (@_[0], @_[1], @_[2], @_[3]); + + my $sql = "COPY $tabname " . (($withoids) ? "WITH OIDS " : '') . +"FROM STDIN"; + my $result = $conn->exec($sql); + if ($result->resultStatus ne PGRES_COPY_IN) + { + print STDERR $conn->errorMessage unless ($quiet); + return(-1); + } + + foreach $str (@{$CBuf}) + { + $conn->putline($str); + } + + $conn->putline("\\.\n"); + + if ($conn->endcopy) + { + print STDERR $conn->errorMessage unless ($quiet); + return(-1); + } + + return(0); + +} + + +# +# Returns last SyncID applied on Slave +# +sub GetSyncID +{ + my ($conn) = @_; # (@_[0]); + + my $result = $conn->exec("select max(syncid) from _RSERV_SLAVE_SYNC_"); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + return(-1); + } + my @row = $result->fetchrow; + return(undef) unless defined $row[0]; # null + return($row[0]); +} + +# +# Updates _RSERV_SYNC_ on Master with Slave SyncID +# +sub SyncSyncID +{ + my ($conn, $server, $syncid) = @_; # (@_[0], @_[1], @_[2]); + + my $result = $conn->exec("BEGIN"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + $result = $conn->exec("select synctime, status from _RSERV_SYNC_" . + " where server = $server and syncid = $syncid" . + " for update"); + if ($result->resultStatus ne PGRES_TUPLES_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + my @row = $result->fetchrow; + if (! defined $row[0]) + { + printf STDERR "No SyncID $syncid found for server $server\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(0); + } + if ($row[1] > 0) + { + printf STDERR "SyncID $syncid for server $server already updated\n" unless ($quiet); + $conn->exec("ROLLBACK"); + return(0); + } + $result = $conn->exec("update _RSERV_SYNC_" . + " set synctime = now(), status = 1" . + " where server = $server and syncid = $syncid"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + $result = $conn->exec("delete from _RSERV_SYNC_" . + " where server = $server and syncid < $syncid"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + $result = $conn->exec("COMMIT"); + if ($result->resultStatus ne PGRES_COMMAND_OK) + { + print STDERR $conn->errorMessage unless ($quiet); + $conn->exec("ROLLBACK"); + return(-1); + } + + return(1); +} + +1; diff --git a/contrib/rserv/Replicate.in b/contrib/rserv/Replicate.in new file mode 100644 index 00000000000..ccb9749b930 --- /dev/null +++ b/contrib/rserv/Replicate.in @@ -0,0 +1,100 @@ +# -*- perl -*- +# Replicate +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use lib "@LIBDIR@"; + +use IO::File; +use Getopt::Long; +use RServ; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "help", "snapshot=s", + "masterhost=s", "slavehost=s", "host=s", + "masteruser=s", "slaveuser=s", "user=s", + "masterpassword=s", "slavepassword=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; +my $snapshot = $opt_snapshot || "__Snapshot"; + +if (defined($opt_help) || (scalar(@ARGV) < 2)) { + print "Usage: $0 --snapshot=file --host=name --user=name --password=string masterdb slavedb\n"; + print "\t--masterhost=name --masteruser=name --masterpassword=string\n"; + print "\t--slavehost=name --slaveuser=name --slavepassword=string\n"; + exit ((scalar(@ARGV) < 2)? 1:0); +} + +my $master = $ARGV[0] || "master"; +my $slave = $ARGV[1] || "slave"; +my $server = 0; + +my $minfo = "dbname=$master"; +$minfo = "$minfo host=$opt_masterhost" if (defined($opt_masterhost)); +$minfo = "$minfo user=$opt_masteruser" if (defined($opt_masteruser)); +$minfo = "$minfo password=$opt_masterpassword" if (defined($opt_masterpassword)); +my $sinfo = "dbname=$slave"; +$sinfo = "$sinfo host=$opt_slavehost" if (defined($opt_slavehost)); +$sinfo = "$sinfo user=$opt_slaveuser" if (defined($opt_slaveuser)); +$sinfo = "$sinfo password=$opt_slavepassword" if (defined($opt_slavepassword)); + +print "Master connection is $minfo\n" if ($debug); +print "Slave connection is $sinfo\n" if ($debug); + +my $mconn = Pg::connectdb($minfo); +my $sconn = Pg::connectdb($sinfo); + +SyncSync($mconn, $sconn); + +my $outf = new IO::File; +open $outf, ">$snapshot"; +print "\n>>>>>>>>>>>>> Prepare Snapshot\n\n" if ($verbose); +$res = PrepareSnapshot($mconn, $outf, $server); +close $outf; +die "\n>>>>>>>>>>>>> ERROR\n" if $res < 0; +if ($res == 0) +{ + print "\n>>>>>>>>>>>>> DBases are sync-ed\n" if ($verbose); + exit(0); +} + +my $inpf = new IO::File; +open $inpf, "<$snapshot"; +print "\n>>>>>>>>>>>>> Apply Snapshot\n\n" if ($verbose); +$res = ApplySnapshot($sconn, $inpf); +close $inpf; +die "\n>>>>>>>>>>>>> ERROR\n" if $res < 0; + +if ($res > 0) +{ + print "Snapshot applied\n" if ($verbose); + unlink $snapshot unless ($debug); + SyncSync($mconn, $sconn); +} + +exit(0); + +########################################################################### + +sub SyncSync +{ + ($mconn, $sconn) = @_; + + print "\n>>>>>>>>>>>>> Sync SyncID\n\n" if ($verbose); + print "Get last SyncID from Slave DB\n" if ($verbose); + $syncid = GetSyncID($sconn); + if ($syncid > 0) + { + print "Last SyncID applied: $syncid\n" if ($verbose); + print "Sync SyncID\n" if ($verbose); + + $res = SyncSyncID($mconn, $server, $syncid); + + print "Succeeded\n" if (($res > 0) && ($verbose)); + } +} diff --git a/contrib/rserv/RservTest.in b/contrib/rserv/RservTest.in new file mode 100644 index 00000000000..252c3226e1c --- /dev/null +++ b/contrib/rserv/RservTest.in @@ -0,0 +1,313 @@ +#!/bin/sh +# pgserv.tcl +# (c) 2000 Thomas Lockhart, PostgreSQL Inc. +# The next line will reinvoke as wish *DO NOT REMOVE OR ALTER* \ +exec wish "$0" "$@" + +puts "Starting Replication Server demo" + +set RSERV_BIN "@BINDIR@" + +# Bring in the interfaces we will be using... + +#package require Pgtcl +load libpgtcl[info sharedlibextension] + +# elog +# Information or error log and exit handler +proc {elog} {level message} { + global show + switch -exact -- $level { + DEBUG { + if {$show(debug)} { + puts "DEBUG $message" + } + } + ERROR { + if {$show(error)} { + puts "ERROR $message" + } + FATAL { + if ($show(error)} { + puts "FATAL $message" + } + exit 1 + } + default { + puts "INFO $message" + } + } +} + +proc {ShowUsage} {} { + global argv0 + puts "Usage: $argv0 --host name --user name --password string masterdb slavedb" + puts "\t--masterhost name --masteruser name --masterpassword string" + puts "\t--slavehost name --slaveuser name --slavepassword string" +} + +# Initial values for database access +# master, slave variables are tied to text input boxes, +# and will be updated on user input +proc {SetDbInfo} {db name {host ""} {user ""} {pass ""}} { + global dbinfo + set dbinfo($db,name) $name + set dbinfo($db,host) $host + set dbinfo($db,user) $user + set dbinfo($db,pass) $pass +} + +# ConnInfo +# Connection information for pgtcl library +proc {ConnInfo} {{db master}} { + global dbinfo + set ci "dbname=$dbinfo($db,name)" + if {[string length $dbinfo($db,host)] > 0} { + set ci "$ci host=$dbinfo($db,host)" + } + if {[string length $dbinfo($db,user)] > 0} { + set ci "$ci user=$dbinfo($db,user)" + } + if {[string length $dbinfo($db,pass)] > 0} { + set ci "$ci password=$dbinfo($db,pass)" + } +# puts "Construct conninfo $ci" + return $ci +} + +# ConnInfoParams +# Connection information for (perl) callable programs +proc {ConnInfoParams} {{db master}} { + global dbinfo + set ci "" + if {[string length $dbinfo($db,host)] > 0} { + set ci "$ci --host=$dbinfo($db,host)" + } + if {[string length $dbinfo($db,user)] > 0} { + set ci "$ci --user=$dbinfo($db,user)" + } + if {[string length $dbinfo($db,pass)] > 0} { + set ci "$ci --password=$dbinfo($db,pass)" + } +# puts "Construct conninfo $ci" + return $ci +} + +# ConnInfoMaster +# Connection information for (perl) callable programs +proc {ConnInfoMaster} {{db master}} { + global dbinfo + set ci $dbinfo($db,name) + if {[string length $dbinfo($db,host)] > 0} { + set ci "$ci --masterhost=$dbinfo($db,host)" + } + if {[string length $dbinfo($db,user)] > 0} { + set ci "$ci --masteruser=$dbinfo($db,user)" + } + if {[string length $dbinfo($db,pass)] > 0} { + set ci "$ci --masterpassword=$dbinfo($db,pass)" + } +# puts "Construct conninfo $ci" + return $ci +} + +# ConnInfoSlave +# Connection information for (perl) callable programs +proc {ConnInfoSlave} {{db slave}} { + global dbinfo + set ci $dbinfo($db,name) + if {[string length $dbinfo($db,host)] > 0} { + set ci "$ci --slavehost=$dbinfo($db,host)" + } + if {[string length $dbinfo($db,user)] > 0} { + set ci "$ci --slaveuser=$dbinfo($db,user)" + } + if {[string length $dbinfo($db,pass)] > 0} { + set ci "$ci --slavepassword=$dbinfo($db,pass)" + } +# puts "Construct conninfo $ci" + return $ci +} + + +SetDbInfo master master localhost +SetDbInfo slave slave localhost +set dbinfo(snapshot,name) "__Snapshot" + +set update "" + +set show(debug) 1 +set show(error) 1 + +set argi 0 +while {$argi < $argc} { +# puts "argi is $argi; argc is $argc" + set arg [lindex $argv $argi] + switch -glob -- $arg { + -h - + --host { + incr argi + set dbinfo(master,host) [lindex $argv $argi] + set dbinfo(slave,host) [lindex $argv $argi] + } + --masterhost { + incr argi + set dbinfo(master,host) [lindex $argv $argi] + } + --slavehost { + incr argi + set dbinfo(slave,host) [lindex $argv $argi] + } + -u - + --user { + incr argi + set dbinfo(master,user) [lindex $argv $argi] + set dbinfo(slave,user) [lindex $argv $argi] + } + --masteruser { + incr argi + set dbinfo(master,user) [lindex $argv $argi] + } + --slaveuser { + incr argi + set dbinfo(slave,user) [lindex $argv $argi] + } + -s - + --snapshot { + incr argi + set dbinfo(snapshot,name) [lindex $argv $argi] + } + -* { + elog ERROR "$argv0: invalid parameter '$arg'" + ShowUsage + exit 1 + } + default { + break + } + } + incr argi +} + +if {$argi < $argc} { + set dbinfo(master,name) [lindex $argv $argi] + incr argi +} +if {$argi < $argc} { + set dbinfo(slave,name) [lindex $argv $argi] + incr argi +} +if {$argi < $argc} { + elog "$argv0: too many parameters" + ShowUsage + exit 1 +} + +elog DEBUG "User is $dbinfo(master,user) $dbinfo(slave,user)" +elog DEBUG "Host is $dbinfo(master,host) $dbinfo(slave,host)" + +# +# TK layout +# + +wm title . "Async Replication" + +wm geom . 400x400 + +proc {CreateResultFrame} {b l w} { + pack [frame $b -borderwidth 10] -fill x + pack [button $b.run -text $l -command $l -width $w] -side left +# pack [entry $b.e -textvariable NewRow] -side left +} + +set t .top +pack [frame $t -borderwidth 10] -fill x +pack [frame $t.h -borderwidth 10] -fill x +pack [label $t.h.h -text "PostgreSQL Async Replication Server"] + +set b .b +pack [frame $b -borderwidth 10] -fill x +pack [frame $b.l -borderwidth 10] -fill x +pack [label $b.l.ml -text "Master"] -side left +pack [label $b.l.sl -text "Slave"] -side right +pack [entry $b.m -textvariable dbinfo(master,name) -width 25] -side left +pack [entry $b.s -textvariable dbinfo(slave,name) -width 25] -side right + +set b .u +pack [frame $b -borderwidth 10] -fill x +pack [button $b.run -text update -command Update -width 10] -side left +pack [entry $b.e -textvariable update -width 50] -side left + +set r [CreateResultFrame .r Replicate 10] + +set b .s +pack [frame $b -borderwidth 10] -fill x +pack [button $b.b -text Show -command Show -width 10] -side left +pack [label $b.e -text ""] -side left + +set b .button +pack [frame $b -borderwidth 10] -fill x + +pack [button $b.quit -text "Quit" -command Shutdown] + +# +# Functions mapped to buttons +# + +proc {Update} {} { + global dbinfo + global update + + elog DEBUG "Opening database [ConnInfo master]..." + set res [catch {set db [pg_connect -conninfo "[ConnInfo master]"]} msg] + if {$res} { + elog ERROR "Database '$dbinfo(master,name)' is not available: $res ($msg)" + } else { + elog DEBUG "Insert $update into database $dbinfo(master,name)..." + set res [pg_exec $db "insert into test select '$update', max(k)+1, max(l)+1 from test"] + elog DEBUG [pg_result $res -status] + catch {pg_disconnect $db} + } +} + +proc {Replicate} {} { + global dbinfo + global RSERV_BIN + + elog DEBUG "Replicating [ConnInfoCmd master]..." + exec "$RSERV_BIN/Replicate" --snapshot=$dbinfo(snapshot,name) [ConnInfoParams] [ConnInfoMaster] [ConnInfoSlave] +} + +proc {Show} {} { + global dbinfo + global update + + elog DEBUG "Opening database [ConnInfo slave]..." + set res [catch {set db [pg_connect -conninfo "[ConnInfo slave]"]} msg] + if {$res} { + elog ERROR "DB $dbinfo(slave,name) not available: $res ($msg)" + } else { + elog DEBUG "Select $update from database $dbinfo(slave,name)..." + set res [pg_exec $db "select i from test where i='$update'"] + if {[pg_result $res -status] != "PGRES_TUPLES_OK"} { + .s.e config -text "n/a" + } else { + set ntups [pg_result $res -numTuples] + if {$ntups <= 0} { + .s.e config -text "n/a" + } else { + for {set i 0} {$i < $ntups} {incr i} { + set val [pg_result $res -getTuple $i] + .s.e config -text $val + } + } + pg_result $res -clear + } + catch {pg_disconnect $db} + } +} + +proc {Shutdown} {} { + global dbinfo + exit +} diff --git a/contrib/rserv/SlaveAddTable.in b/contrib/rserv/SlaveAddTable.in new file mode 100644 index 00000000000..432e922aa21 --- /dev/null +++ b/contrib/rserv/SlaveAddTable.in @@ -0,0 +1,59 @@ +# -*- perl -*- +# SlaveAddTable +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Pg; +use Getopt::Long; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "help", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; + +if (defined($opt_help) || (scalar(@ARGV) < 3)) { + print "Usage: $0 --host=name --user=name --password=string slavedb table column\n"; + exit ((scalar(@ARGV) < 3)? 1: 0); +} + +my $dbname = $ARGV[0]; +my $table = $ARGV[1]; +my $keyname = $ARGV[2]; + +my $sinfo = "dbname=$dbname"; +$sinfo = "$sinfo host=$opt_host" if (defined($opt_host)); +$sinfo = "$sinfo user=$opt_user" if (defined($opt_user)); +$sinfo = "$sinfo password=$opt_password" if (defined($opt_password)); + +my $dbname = $ARGV[0]; +my $table = $ARGV[1]; +my $keyname = $ARGV[2]; + +my $conn = Pg::connectdb($sinfo); + +my $result = $conn->exec("BEGIN"); +die $conn->errorMessage if $result->resultStatus ne PGRES_COMMAND_OK; + +$result = $conn->exec("select pgc.oid, pga.attnum from pg_class pgc" . + ", pg_attribute pga" . + " where pgc.relname = '$table' and pgc.oid = pga.attrelid" . + " and pga.attname = '$keyname'"); +die $conn->errorMessage if $result->resultStatus ne PGRES_TUPLES_OK; + +my @row = $result->fetchrow; +die "Can't find table/key\n" if ! defined $row[0] || ! defined $row[1]; + +$result = $conn->exec("insert into _RSERV_SLAVE_TABLES_ (tname, cname, reloid, key)" . + " values ('$table', '$keyname', $row[0], $row[1])"); +die $conn->errorMessage if $result->resultStatus ne PGRES_COMMAND_OK; + +$result = $conn->exec("COMMIT"); +die $conn->errorMessage if $result->resultStatus ne PGRES_COMMAND_OK; + +exit(0); diff --git a/contrib/rserv/SlaveInit.in b/contrib/rserv/SlaveInit.in new file mode 100644 index 00000000000..753d4dddb5e --- /dev/null +++ b/contrib/rserv/SlaveInit.in @@ -0,0 +1,65 @@ +# -*- perl -*- +# SlaveInit +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use Pg; +use Getopt::Long; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "quiet!", "help", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; +my $quiet = $opt_quiet || 0; + +if (defined($opt_help) || (scalar(@ARGV) < 1)) { + print "Usage: $0 --host=name --user=name --password=string slavedb\n"; + exit ((scalar(@ARGV) < 1)? 1:0); +} + +my $slave = $ARGV[0] || "slave"; + +my $sinfo = "dbname=$slave"; +$sinfo = "$sinfo host=$opt_host" if (defined($opt_host)); +$sinfo = "$sinfo user=$opt_user" if (defined($opt_user)); +$sinfo = "$sinfo password=$opt_password" if (defined($opt_password)); + +sub RollbackAndQuit { + my $conn = shift @_; + + print STDERR $conn->errorMessage; + $conn->exec("ROLLBACK"); + exit (-1); +} + +print "Connecting to $sinfo\n"; +my $conn = Pg::connectdb($sinfo); +if ($conn->status != PGRES_CONNECTION_OK) { + print STDERR "Failed opening $sinfo\n"; + exit 1; +} + +my $result = $conn->exec("BEGIN"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("set transaction isolation level serializable"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("create table _RSERV_SLAVE_TABLES_" . + " (tname name, cname name, reloid oid, key int4)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("create table _RSERV_SLAVE_SYNC_" . + " (syncid int4, synctime timestamp)"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +$result = $conn->exec("COMMIT"); +RollbackAndQuit($conn) if ($result->resultStatus ne PGRES_COMMAND_OK); + +exit (0); diff --git a/contrib/rserv/SyncSyncID.in b/contrib/rserv/SyncSyncID.in new file mode 100644 index 00000000000..a5ab88bc534 --- /dev/null +++ b/contrib/rserv/SyncSyncID.in @@ -0,0 +1,48 @@ +# -*- perl -*- +# SyncSyncID +# Vadim Mikheev, (c) 2000, PostgreSQL Inc. + +eval '(exit $?0)' && eval 'exec perl -S $0 ${1+"$@"}' + & eval 'exec perl -S $0 $argv:q' + if 0; + +use lib "@LIBDIR@"; + +use Getopt::Long; +use RServ; + +$| = 1; + +$result = GetOptions("debug!", "verbose!", "quiet!", "help", + "host=s", "user=s", "password=s"); + +my $debug = $opt_debug || 0; +my $verbose = $opt_verbose || 0; +my $quiet = $opt_quiet || 0; + +if (defined($opt_help) || (scalar(@ARGV) < 2) || ($ARGV[1] !~ /^\d+$/)) { + print "Usage: $0 --host=name --user=name --password=string masterdb syncid\n"; + exit ((scalar(@ARGV) < 2)? 1:0); +} + +my $master = $ARGV[0] || "master"; +my $server = 0; +my $syncid = $ARGV[1] || die "SyncID not specified"; + +my $minfo = "dbname=$master"; +$minfo = "$minfo host=$opt_host" if (defined($opt_host)); +$minfo = "$minfo user=$opt_user" if (defined($opt_user)); +$minfo = "$minfo password=$opt_password" if (defined($opt_password)); + +my $conn = Pg::connectdb($minfo); + +$res = SyncSyncID($conn, $server, $syncid); + +if ($res == 0) +{ + printf STDERR "SyncID updated on $master\n" if ($verbose); + exit(0); +} + +printf STDERR "ERROR\n" unless ($quiet); +exit(1); diff --git a/contrib/rserv/master.sql.in b/contrib/rserv/master.sql.in new file mode 100644 index 00000000000..e52fc576077 --- /dev/null +++ b/contrib/rserv/master.sql.in @@ -0,0 +1,101 @@ +-- erServer +-- Master server setup for erServer demonstration implementation +-- (c) 2000 Vadim Mikheev, PostgreSQL Inc. +-- + +-- +-- Slave servers +-- +drop table _RSERV_SERVERS_; + +create table _RSERV_SERVERS_ +( + server int4, -- slave server id + host text, -- server' host + port int4, -- server' port + dbase text -- db name +); + + +-- +-- Tables to sync +-- +drop table _RSERV_TABLES_; + +create table _RSERV_TABLES_ +( + tname name, -- table name + cname name, -- column name + reloid oid, -- table oid + key int4 -- key attnum +); + + +-- +-- Log for inserts/updates/deletes to sync-ed tables +-- +drop table _RSERV_LOG_; + +create table _RSERV_LOG_ +( + reloid oid, + logid int4, -- xid of last update xaction + logtime timestamp, -- last update xaction start time + deleted int4, -- deleted or inserted/updated + key text -- +); + +-- This is to speedup lookup deleted tuples +create index _RSERV_LOG_INDX_DLT_ID_ on _RSERV_LOG_ (deleted, logid); + +-- This is to speedup cleanup +create index _RSERV_LOG_INDX_TM_ID_ on _RSERV_LOG_ (logtime, logid); + +-- This is to speedup trigger and lookup updated tuples +create index _RSERV_LOG_INDX_REL_KEY_ on _RSERV_LOG_ (reloid, key); + + +-- +-- How much each slave servers are sync-ed +-- +drop table _RSERV_SYNC_; + +create table _RSERV_SYNC_ +( + server int4, + syncid int4, -- from _rserv_sync_seq_ + synctime timestamp, -- + status int4, -- prepared (0) | applied + minid int4, -- min xid from serializable snapshot + maxid int4, -- max xid from serializable snapshot + active text -- list of active xactions +); + +create index _RSERV_SYNC_INDX_SRV_ID_ on _RSERV_SYNC_ (server, syncid); + +drop sequence _rserv_sync_seq_; +create sequence _rserv_sync_seq_; + +drop function _rserv_log_(); + +CREATE FUNCTION _rserv_log_() + RETURNS opaque + AS '_OBJWD_/rserv_DLSUFFIX_' + LANGUAGE 'c' +; + +drop function _rserv_sync_(int4); + +CREATE FUNCTION _rserv_sync_(int4) + RETURNS int4 + AS '_OBJWD_/rserv_DLSUFFIX_' + LANGUAGE 'c' +; + +drop function _rserv_debug_(int4); + +CREATE FUNCTION _rserv_debug_(int4) + RETURNS int4 + AS '_OBJWD_/rserv_DLSUFFIX_' + LANGUAGE 'c' +; diff --git a/contrib/rserv/regress.sh b/contrib/rserv/regress.sh new file mode 100755 index 00000000000..83d02ea3300 --- /dev/null +++ b/contrib/rserv/regress.sh @@ -0,0 +1,32 @@ +# regress.sh +# rserv regression test script +# (c) 2000 Thomas Lockhart, PostgreSQL Inc. + +dropdb master +dropdb slave + +createdb master +createdb slave + +MasterInit master +SlaveInit slave + +psql -c "create table t1 (i int, t text, d timestamp default text 'now');" master +MasterAddTable master t1 d + +psql -c "create table t1 (i int, t text, d timestamp default text 'now');" slave +SlaveAddTable slave t1 d + +psql -c "insert into t1 values (1, 'one');" master +psql -c "insert into t1 values (2, 'two');" master + +Replicate master slave +SyncSyncID master `GetSyncID --noverbose slave` + +psql -c "insert into t1 values (3, 'three');" master +psql -c "insert into t1 values (4, 'four');" master + +Replicate master slave +SyncSyncID master `GetSyncID --noverbose slave` + +exit diff --git a/contrib/rserv/rserv.c b/contrib/rserv/rserv.c new file mode 100644 index 00000000000..518dd68a539 --- /dev/null +++ b/contrib/rserv/rserv.c @@ -0,0 +1,319 @@ +/* rserv.c + * Support functions for erServer replication. + * (c) 2000 Vadim Mikheev, PostgreSQL Inc. + */ + +#include "executor/spi.h" /* this is what you need to work with SPI */ +#include "commands/trigger.h" /* -"- and triggers */ +#include "utils/tqual.h" /* -"- and SnapshotData */ +#include <ctype.h> /* tolower () */ + +#ifdef PG_FUNCTION_INFO_V1 +#define CurrentTriggerData ((TriggerData *) fcinfo->context) +#endif + +#ifdef PG_FUNCTION_INFO_V1 +PG_FUNCTION_INFO_V1(_rserv_log_); +PG_FUNCTION_INFO_V1(_rserv_sync_); +PG_FUNCTION_INFO_V1(_rserv_debug_); +Datum _rserv_log_(PG_FUNCTION_ARGS); +Datum _rserv_sync_(PG_FUNCTION_ARGS); +Datum _rserv_debug_(PG_FUNCTION_ARGS); +#else +HeapTuple _rserv_log_(void); +int32 _rserv_sync_(int32); +int32 _rserv_debug_(int32); +#endif + +static int debug = 0; + +static char* OutputValue(char *key, char *buf, int size); + +#ifdef PG_FUNCTION_INFO_V1 +Datum +_rserv_log_(PG_FUNCTION_ARGS) +#else +HeapTuple +_rserv_log_() +#endif +{ + Trigger *trigger; /* to get trigger name */ + int nargs; /* # of args specified in CREATE TRIGGER */ + char **args; /* argument: argnum */ + Relation rel; /* triggered relation */ + HeapTuple tuple; /* tuple to return */ + HeapTuple newtuple = NULL;/* tuple to return */ + TupleDesc tupdesc; /* tuple description */ + int keynum; + char *key; + char *okey; + char *newkey = NULL; + int deleted; + char sql[8192]; + char outbuf[8192]; + char oidbuf[64]; + int ret; + + /* Called by trigger manager ? */ + if (!CurrentTriggerData) + elog(ERROR, "_rserv_log_: triggers are not initialized"); + + /* Should be called for ROW trigger */ + if (TRIGGER_FIRED_FOR_STATEMENT(CurrentTriggerData->tg_event)) + elog(ERROR, "_rserv_log_: can't process STATEMENT events"); + + tuple = CurrentTriggerData->tg_trigtuple; + + trigger = CurrentTriggerData->tg_trigger; + nargs = trigger->tgnargs; + args = trigger->tgargs; + + if (nargs != 1) /* odd number of arguments! */ + elog(ERROR, "_rserv_log_: need in *one* argument"); + + keynum = atoi(args[0]); + + if (keynum < 0 && keynum != ObjectIdAttributeNumber) + elog(ERROR, "_rserv_log_: invalid keynum %d", keynum); + + rel = CurrentTriggerData->tg_relation; + tupdesc = rel->rd_att; + + deleted = (TRIGGER_FIRED_BY_DELETE(CurrentTriggerData->tg_event)) ? + 1 : 0; + + if (TRIGGER_FIRED_BY_UPDATE(CurrentTriggerData->tg_event)) + newtuple = CurrentTriggerData->tg_newtuple; + + /* + * Setting CurrentTriggerData to NULL prevents direct calls to trigger + * functions in queries. Normally, trigger functions have to be called + * by trigger manager code only. + */ + CurrentTriggerData = NULL; + + /* Connect to SPI manager */ + if ((ret = SPI_connect()) < 0) + elog(ERROR, "_rserv_log_: SPI_connect returned %d", ret); + + if (keynum == ObjectIdAttributeNumber) + { + sprintf(oidbuf, "%u", tuple->t_data->t_oid); + key = oidbuf; + } + else + key = SPI_getvalue(tuple, tupdesc, keynum); + + if (key == NULL) + elog(ERROR, "_rserv_log_: key must be not null"); + + if (newtuple && keynum != ObjectIdAttributeNumber) + { + newkey = SPI_getvalue(newtuple, tupdesc, keynum); + if (newkey == NULL) + elog(ERROR, "_rserv_log_: key must be not null"); + if (strcmp(newkey, key) == 0) + newkey = NULL; + else + deleted = 1; /* old key was deleted */ + } + + if (strpbrk(key, "\\ \n'")) + okey = OutputValue(key, outbuf, sizeof(outbuf)); + else + okey = key; + + sprintf(sql, "update _RSERV_LOG_ set logid = %d, logtime = now(), " + "deleted = %d where reloid = %u and key = '%s'", + GetCurrentTransactionId(), deleted, rel->rd_id, okey); + + if (debug) + elog(NOTICE, sql); + + ret = SPI_exec(sql, 0); + + if (ret < 0) + elog(ERROR, "_rserv_log_: SPI_exec(update) returned %d", ret); + + /* + * If no tuple was UPDATEd then do INSERT... + */ + if (SPI_processed > 1) + elog(ERROR, "_rserv_log_: duplicate tuples"); + else if (SPI_processed == 0) + { + sprintf(sql, "insert into _RSERV_LOG_ " + "(reloid, logid, logtime, deleted, key) " + "values (%u, %d, now(), %d, '%s')", + rel->rd_id, GetCurrentTransactionId(), + deleted, okey); + + if (debug) + elog(NOTICE, sql); + + ret = SPI_exec(sql, 0); + + if (ret < 0) + elog(ERROR, "_rserv_log_: SPI_exec(insert) returned %d", ret); + } + + if (okey != key && okey != outbuf) + pfree(okey); + + if (newkey) + { + if (strpbrk(newkey, "\\ \n'")) + okey = OutputValue(newkey, outbuf, sizeof(outbuf)); + else + okey = newkey; + + sprintf(sql, "insert into _RSERV_LOG_ " + "(reloid, logid, logtime, deleted, key) " + "values (%u, %d, now(), 0, '%s')", + rel->rd_id, GetCurrentTransactionId(), okey); + + if (debug) + elog(NOTICE, sql); + + ret = SPI_exec(sql, 0); + + if (ret < 0) + elog(ERROR, "_rserv_log_: SPI_exec returned %d", ret); + + if (okey != newkey && okey != outbuf) + pfree(okey); + } + + SPI_finish(); + +#ifdef PG_FUNCTION_INFO_V1 + return (PointerGetDatum(tuple)); +#else + return (tuple); +#endif +} + +#ifdef PG_FUNCTION_INFO_V1 +Datum +_rserv_sync_(PG_FUNCTION_ARGS) +#else +int32 +_rserv_sync_(int32 server) +#endif +{ +#ifdef PG_FUNCTION_INFO_V1 + int32 server = PG_GETARG_INT32(0); +#endif + char sql[8192]; + char buf[8192]; + char *active = buf; + uint32 xcnt; + int ret; + + if (SerializableSnapshot == NULL) + elog(ERROR, "_rserv_sync_: SerializableSnapshot is NULL"); + + buf[0] = 0; + for (xcnt = 0; xcnt < SerializableSnapshot->xcnt; xcnt++) + { + sprintf(buf + strlen(buf), "%s%u", (xcnt) ? ", " : "", + SerializableSnapshot->xip[xcnt]); + } + + if ((ret = SPI_connect()) < 0) + elog(ERROR, "_rserv_sync_: SPI_connect returned %d", ret); + + sprintf(sql, "insert into _RSERV_SYNC_ " + "(server, syncid, synctime, status, minid, maxid, active) " + "values (%u, currval('_rserv_sync_seq_'), now(), 0, %d, %d, '%s')", + server, SerializableSnapshot->xmin, SerializableSnapshot->xmax, active); + + ret = SPI_exec(sql, 0); + + if (ret < 0) + elog(ERROR, "_rserv_sync_: SPI_exec returned %d", ret); + + SPI_finish(); + + return (0); +} + +#ifdef PG_FUNCTION_INFO_V1 +Datum +_rserv_debug_(PG_FUNCTION_ARGS) +#else +int32 +_rserv_debug_(int32 newval) +#endif +{ +#ifdef PG_FUNCTION_INFO_V1 + int32 newval = PG_GETARG_INT32(0); +#endif + int32 oldval = debug; + + debug = newval; + + return (oldval); +} + +#define ExtendBy 1024 + +static char* +OutputValue(char *key, char *buf, int size) +{ + int i = 0; + char *out = buf; + char *subst = NULL; + int slen = 0; + + size--; + for ( ; ; ) + { + switch (*key) + { + case '\\': subst ="\\\\"; + slen = 2; + break; + case ' ': subst = "\\011"; + slen = 4; + break; + case '\n': subst = "\\012"; + slen = 4; + break; + case '\'': subst = "\\047"; + slen = 4; + break; + case '\0': out[i] = 0; + return(out); + default: slen = 1; + break; + } + + if (i + slen >= size) + { + if (out == buf) + { + out = (char*) palloc(size + ExtendBy); + strncpy(out, buf, i); + size += ExtendBy; + } + else + { + out = (char*) repalloc(out, size + ExtendBy); + size += ExtendBy; + } + } + + if (slen == 1) + out[i++] = *key; + else + { + memcpy(out + i, subst, slen); + i += slen; + } + key++; + } + + return(out); + +} diff --git a/contrib/rserv/slave.sql.in b/contrib/rserv/slave.sql.in new file mode 100644 index 00000000000..93ac5caced4 --- /dev/null +++ b/contrib/rserv/slave.sql.in @@ -0,0 +1,22 @@ +-- erServer +-- Slave server setup for erServer demonstration implementation +-- (c) 2000 Vadim Mikheev, PostgreSQL Inc. +-- + +drop table _RSERV_SLAVE_TABLES_; + +create table _RSERV_SLAVE_TABLES_ +( + tname name, -- table name + cname name, -- column name + reloid oid, -- table oid + key int4 -- key attnum +); + +drop table _RSERV_SLAVE_SYNC_; + +create table _RSERV_SLAVE_SYNC_ +( + syncid int4, + synctime timestamp +); -- GitLab