Skip to content
Snippets Groups Projects
Select Git revision
  • benchmark-tools
  • postgres-lambda
  • master default
  • REL9_4_25
  • REL9_5_20
  • REL9_6_16
  • REL_10_11
  • REL_11_6
  • REL_12_1
  • REL_12_0
  • REL_12_RC1
  • REL_12_BETA4
  • REL9_4_24
  • REL9_5_19
  • REL9_6_15
  • REL_10_10
  • REL_11_5
  • REL_12_BETA3
  • REL9_4_23
  • REL9_5_18
  • REL9_6_14
  • REL_10_9
  • REL_11_4
23 results

basebackup.c

Blame
  • basebackup.c 30.99 KiB
    /*-------------------------------------------------------------------------
     *
     * basebackup.c
     *	  code for taking a base backup and streaming it to a standby
     *
     * Portions Copyright (c) 2010-2014, PostgreSQL Global Development Group
     *
     * IDENTIFICATION
     *	  src/backend/replication/basebackup.c
     *
     *-------------------------------------------------------------------------
     */
    #include "postgres.h"
    
    #include <sys/types.h>
    #include <sys/stat.h>
    #include <unistd.h>
    #include <time.h>
    
    #include "access/xlog_internal.h"		/* for pg_start/stop_backup */
    #include "catalog/pg_type.h"
    #include "common/relpath.h"
    #include "lib/stringinfo.h"
    #include "libpq/libpq.h"
    #include "libpq/pqformat.h"
    #include "miscadmin.h"
    #include "nodes/pg_list.h"
    #include "pgstat.h"
    #include "replication/basebackup.h"
    #include "replication/walsender.h"
    #include "replication/walsender_private.h"
    #include "storage/fd.h"
    #include "storage/ipc.h"
    #include "utils/builtins.h"
    #include "utils/elog.h"
    #include "utils/ps_status.h"
    #include "pgtar.h"
    
    typedef struct
    {
    	const char *label;
    	bool		progress;
    	bool		fastcheckpoint;
    	bool		nowait;
    	bool		includewal;
    } basebackup_options;
    
    
    static int64 sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces);
    static int64 sendTablespace(char *path, bool sizeonly);
    static bool sendFile(char *readfilename, char *tarfilename,
    		 struct stat * statbuf, bool missing_ok);
    static void sendFileWithContent(const char *filename, const char *content);
    static void _tarWriteHeader(const char *filename, const char *linktarget,
    				struct stat * statbuf);
    static void send_int8_string(StringInfoData *buf, int64 intval);
    static void SendBackupHeader(List *tablespaces);
    static void base_backup_cleanup(int code, Datum arg);
    static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
    static void parse_basebackup_options(List *options, basebackup_options *opt);
    static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
    static int	compareWalFileNames(const void *a, const void *b);
    
    /* Was the backup currently in-progress initiated in recovery mode? */
    static bool backup_started_in_recovery = false;
    
    /* Relative path of temporary statistics directory */
    static char *statrelpath = NULL;
    
    /*
     * Size of each block sent into the tar stream for larger files.
     */
    #define TAR_SEND_SIZE 32768
    
    typedef struct
    {
    	char	   *oid;
    	char	   *path;
    	char	   *rpath;			/* relative path within PGDATA, or NULL */
    	int64		size;
    } tablespaceinfo;
    
    
    /*
     * Called when ERROR or FATAL happens in perform_base_backup() after
     * we have started the backup - make sure we end it!
     */
    static void
    base_backup_cleanup(int code, Datum arg)
    {
    	do_pg_abort_backup();
    }
    
    /*
     * Actually do a base backup for the specified tablespaces.
     *
     * This is split out mainly to avoid complaints about "variable might be
     * clobbered by longjmp" from stupider versions of gcc.
     */
    static void
    perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
    {
    	XLogRecPtr	startptr;
    	TimeLineID	starttli;
    	XLogRecPtr	endptr;
    	TimeLineID	endtli;
    	char	   *labelfile;
    	int			datadirpathlen;
    
    	datadirpathlen = strlen(DataDir);
    
    	backup_started_in_recovery = RecoveryInProgress();
    
    	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
    								  &labelfile);
    	SendXlogRecPtrResult(startptr, starttli);
    
    	/*
    	 * Calculate the relative path of temporary statistics directory
    	 * in order to skip the files which are located in that directory later.
    	 */
    	if (is_absolute_path(pgstat_stat_directory) &&
    		strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
    		statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
    	else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
    		statrelpath = psprintf("./%s", pgstat_stat_directory);
    	else
    		statrelpath = pgstat_stat_directory;
    
    	PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
    	{
    		List	   *tablespaces = NIL;
    		ListCell   *lc;
    		struct dirent *de;
    		tablespaceinfo *ti;
    
    		/* Collect information about all tablespaces */
    		while ((de = ReadDir(tblspcdir, "pg_tblspc")) != NULL)
    		{
    			char		fullpath[MAXPGPATH];
    			char		linkpath[MAXPGPATH];
    			char	   *relpath = NULL;
    			int			rllen;
    
    			/* Skip special stuff */
    			if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
    				continue;
    
    			snprintf(fullpath, sizeof(fullpath), "pg_tblspc/%s", de->d_name);
    
    #if defined(HAVE_READLINK) || defined(WIN32)
    			rllen = readlink(fullpath, linkpath, sizeof(linkpath));
    			if (rllen < 0)
    			{
    				ereport(WARNING,
    						(errmsg("could not read symbolic link \"%s\": %m",
    								fullpath)));
    				continue;
    			}
    			else if (rllen >= sizeof(linkpath))
    			{
    				ereport(WARNING,
    						(errmsg("symbolic link \"%s\" target is too long",
    								fullpath)));
    				continue;
    			}
    			linkpath[rllen] = '\0';
    
    			/*
    			 * Relpath holds the relative path of the tablespace directory
    			 * when it's located within PGDATA, or NULL if it's located
    			 * elsewhere.
    			 */
    			if (rllen > datadirpathlen &&
    				strncmp(linkpath, DataDir, datadirpathlen) == 0 &&
    				IS_DIR_SEP(linkpath[datadirpathlen]))
    				relpath = linkpath + datadirpathlen + 1;
    
    			ti = palloc(sizeof(tablespaceinfo));
    			ti->oid = pstrdup(de->d_name);
    			ti->path = pstrdup(linkpath);
    			ti->rpath = relpath ? pstrdup(relpath) : NULL;
    			ti->size = opt->progress ? sendTablespace(fullpath, true) : -1;
    			tablespaces = lappend(tablespaces, ti);
    #else
    
    			/*
    			 * If the platform does not have symbolic links, it should not be
    			 * possible to have tablespaces - clearly somebody else created
    			 * them. Warn about it and ignore.
    			 */
    			ereport(WARNING,
    					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    				  errmsg("tablespaces are not supported on this platform")));
    #endif
    		}
    
    		/* Add a node for the base directory at the end */
    		ti = palloc0(sizeof(tablespaceinfo));
    		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces) : -1;
    		tablespaces = lappend(tablespaces, ti);
    
    		/* Send tablespace header */
    		SendBackupHeader(tablespaces);
    
    		/* Send off our tablespaces one by one */
    		foreach(lc, tablespaces)
    		{
    			tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
    			StringInfoData buf;
    
    			/* Send CopyOutResponse message */
    			pq_beginmessage(&buf, 'H');
    			pq_sendbyte(&buf, 0);		/* overall format */
    			pq_sendint(&buf, 0, 2);		/* natts */
    			pq_endmessage(&buf);
    
    			if (ti->path == NULL)
    			{
    				struct stat statbuf;
    
    				/* In the main tar, include the backup_label first... */
    				sendFileWithContent(BACKUP_LABEL_FILE, labelfile);
    
    				/* ... then the bulk of the files ... */
    				sendDir(".", 1, false, tablespaces);
    
    				/* ... and pg_control after everything else. */
    				if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
    					ereport(ERROR,
    							(errcode_for_file_access(),
    							 errmsg("could not stat control file \"%s\": %m",
    									XLOG_CONTROL_FILE)));
    				sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false);
    			}
    			else
    				sendTablespace(ti->path, false);
    
    			/*
    			 * If we're including WAL, and this is the main data directory we
    			 * don't terminate the tar stream here. Instead, we will append
    			 * the xlog files below and terminate it then. This is safe since
    			 * the main data directory is always sent *last*.
    			 */
    			if (opt->includewal && ti->path == NULL)
    			{
    				Assert(lnext(lc) == NULL);
    			}
    			else
    				pq_putemptymessage('c');		/* CopyDone */
    		}
    	}
    	PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
    
    	endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
    
    	if (opt->includewal)
    	{
    		/*
    		 * We've left the last tar file "open", so we can now append the
    		 * required WAL files to it.
    		 */
    		char		pathbuf[MAXPGPATH];
    		XLogSegNo	segno;
    		XLogSegNo	startsegno;
    		XLogSegNo	endsegno;
    		struct stat statbuf;
    		List	   *historyFileList = NIL;
    		List	   *walFileList = NIL;
    		char	  **walFiles;
    		int			nWalFiles;
    		char		firstoff[MAXFNAMELEN];
    		char		lastoff[MAXFNAMELEN];
    		DIR		   *dir;
    		struct dirent *de;
    		int			i;
    		ListCell   *lc;
    		TimeLineID	tli;
    
    		/*
    		 * I'd rather not worry about timelines here, so scan pg_xlog and
    		 * include all WAL files in the range between 'startptr' and 'endptr',
    		 * regardless of the timeline the file is stamped with. If there are
    		 * some spurious WAL files belonging to timelines that don't belong in
    		 * this server's history, they will be included too. Normally there
    		 * shouldn't be such files, but if there are, there's little harm in
    		 * including them.
    		 */
    		XLByteToSeg(startptr, startsegno);
    		XLogFileName(firstoff, ThisTimeLineID, startsegno);
    		XLByteToPrevSeg(endptr, endsegno);
    		XLogFileName(lastoff, ThisTimeLineID, endsegno);
    
    		dir = AllocateDir("pg_xlog");
    		if (!dir)
    			ereport(ERROR,
    				 (errmsg("could not open directory \"%s\": %m", "pg_xlog")));
    		while ((de = ReadDir(dir, "pg_xlog")) != NULL)
    		{
    			/* Does it look like a WAL segment, and is it in the range? */
    			if (strlen(de->d_name) == 24 &&
    				strspn(de->d_name, "0123456789ABCDEF") == 24 &&
    				strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
    				strcmp(de->d_name + 8, lastoff + 8) <= 0)
    			{
    				walFileList = lappend(walFileList, pstrdup(de->d_name));
    			}
    			/* Does it look like a timeline history file? */
    			else if (strlen(de->d_name) == 8 + strlen(".history") &&
    					 strspn(de->d_name, "0123456789ABCDEF") == 8 &&
    					 strcmp(de->d_name + 8, ".history") == 0)
    			{
    				historyFileList = lappend(historyFileList, pstrdup(de->d_name));
    			}
    		}
    		FreeDir(dir);
    
    		/*
    		 * Before we go any further, check that none of the WAL segments we
    		 * need were removed.
    		 */
    		CheckXLogRemoved(startsegno, ThisTimeLineID);
    
    		/*
    		 * Put the WAL filenames into an array, and sort. We send the files in
    		 * order from oldest to newest, to reduce the chance that a file is
    		 * recycled before we get a chance to send it over.
    		 */
    		nWalFiles = list_length(walFileList);
    		walFiles = palloc(nWalFiles * sizeof(char *));
    		i = 0;
    		foreach(lc, walFileList)
    		{
    			walFiles[i++] = lfirst(lc);
    		}
    		qsort(walFiles, nWalFiles, sizeof(char *), compareWalFileNames);
    
    		/*
    		 * There must be at least one xlog file in the pg_xlog directory,
    		 * since we are doing backup-including-xlog.
    		 */
    		if (nWalFiles < 1)
    			ereport(ERROR,
    					(errmsg("could not find any WAL files")));
    
    		/*
    		 * Sanity check: the first and last segment should cover startptr and
    		 * endptr, with no gaps in between.
    		 */
    		XLogFromFileName(walFiles[0], &tli, &segno);
    		if (segno != startsegno)
    		{
    			char		startfname[MAXFNAMELEN];
    
    			XLogFileName(startfname, ThisTimeLineID, startsegno);
    			ereport(ERROR,
    					(errmsg("could not find WAL file \"%s\"", startfname)));
    		}
    		for (i = 0; i < nWalFiles; i++)
    		{
    			XLogSegNo	currsegno = segno;
    			XLogSegNo	nextsegno = segno + 1;
    
    			XLogFromFileName(walFiles[i], &tli, &segno);
    			if (!(nextsegno == segno || currsegno == segno))
    			{
    				char		nextfname[MAXFNAMELEN];
    
    				XLogFileName(nextfname, ThisTimeLineID, nextsegno);
    				ereport(ERROR,
    					  (errmsg("could not find WAL file \"%s\"", nextfname)));
    			}
    		}
    		if (segno != endsegno)
    		{
    			char		endfname[MAXFNAMELEN];
    
    			XLogFileName(endfname, ThisTimeLineID, endsegno);
    			ereport(ERROR,
    					(errmsg("could not find WAL file \"%s\"", endfname)));
    		}
    
    		/* Ok, we have everything we need. Send the WAL files. */
    		for (i = 0; i < nWalFiles; i++)
    		{
    			FILE	   *fp;
    			char		buf[TAR_SEND_SIZE];
    			size_t		cnt;
    			pgoff_t		len = 0;
    
    			snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFiles[i]);
    			XLogFromFileName(walFiles[i], &tli, &segno);
    
    			fp = AllocateFile(pathbuf, "rb");
    			if (fp == NULL)
    			{
    				/*
    				 * Most likely reason for this is that the file was already
    				 * removed by a checkpoint, so check for that to get a better
    				 * error message.
    				 */
    				CheckXLogRemoved(segno, tli);
    
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not open file \"%s\": %m", pathbuf)));
    			}
    
    			if (fstat(fileno(fp), &statbuf) != 0)
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not stat file \"%s\": %m",
    								pathbuf)));
    			if (statbuf.st_size != XLogSegSize)
    			{
    				CheckXLogRemoved(segno, tli);
    				ereport(ERROR,
    						(errcode_for_file_access(),
    					errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
    			}
    
    			_tarWriteHeader(pathbuf, NULL, &statbuf);
    
    			while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
    			{
    				CheckXLogRemoved(segno, tli);
    				/* Send the chunk as a CopyData message */
    				if (pq_putmessage('d', buf, cnt))
    					ereport(ERROR,
    							(errmsg("base backup could not send data, aborting backup")));
    
    				len += cnt;
    				if (len == XLogSegSize)
    					break;
    			}
    
    			if (len != XLogSegSize)
    			{
    				CheckXLogRemoved(segno, tli);
    				ereport(ERROR,
    						(errcode_for_file_access(),
    					errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
    			}
    
    			/* XLogSegSize is a multiple of 512, so no need for padding */
    			FreeFile(fp);
    		}
    
    		/*
    		 * Send timeline history files too. Only the latest timeline history
    		 * file is required for recovery, and even that only if there happens
    		 * to be a timeline switch in the first WAL segment that contains the
    		 * checkpoint record, or if we're taking a base backup from a standby
    		 * server and the target timeline changes while the backup is taken.
    		 * But they are small and highly useful for debugging purposes, so
    		 * better include them all, always.
    		 */
    		foreach(lc, historyFileList)
    		{
    			char	   *fname = lfirst(lc);
    
    			snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
    
    			if (lstat(pathbuf, &statbuf) != 0)
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not stat file \"%s\": %m", pathbuf)));
    
    			sendFile(pathbuf, pathbuf, &statbuf, false);
    		}
    
    		/* Send CopyDone message for the last tar file */
    		pq_putemptymessage('c');
    	}
    	SendXlogRecPtrResult(endptr, endtli);
    }
    
    /*
     * qsort comparison function, to compare log/seg portion of WAL segment
     * filenames, ignoring the timeline portion.
     */
    static int
    compareWalFileNames(const void *a, const void *b)
    {
    	char	   *fna = *((char **) a);
    	char	   *fnb = *((char **) b);
    
    	return strcmp(fna + 8, fnb + 8);
    }
    
    /*
     * Parse the base backup options passed down by the parser
     */
    static void
    parse_basebackup_options(List *options, basebackup_options *opt)
    {
    	ListCell   *lopt;
    	bool		o_label = false;
    	bool		o_progress = false;
    	bool		o_fast = false;
    	bool		o_nowait = false;
    	bool		o_wal = false;
    
    	MemSet(opt, 0, sizeof(*opt));
    	foreach(lopt, options)
    	{
    		DefElem    *defel = (DefElem *) lfirst(lopt);
    
    		if (strcmp(defel->defname, "label") == 0)
    		{
    			if (o_label)
    				ereport(ERROR,
    						(errcode(ERRCODE_SYNTAX_ERROR),
    						 errmsg("duplicate option \"%s\"", defel->defname)));
    			opt->label = strVal(defel->arg);
    			o_label = true;
    		}
    		else if (strcmp(defel->defname, "progress") == 0)
    		{
    			if (o_progress)
    				ereport(ERROR,
    						(errcode(ERRCODE_SYNTAX_ERROR),
    						 errmsg("duplicate option \"%s\"", defel->defname)));
    			opt->progress = true;
    			o_progress = true;
    		}
    		else if (strcmp(defel->defname, "fast") == 0)
    		{
    			if (o_fast)
    				ereport(ERROR,
    						(errcode(ERRCODE_SYNTAX_ERROR),
    						 errmsg("duplicate option \"%s\"", defel->defname)));
    			opt->fastcheckpoint = true;
    			o_fast = true;
    		}
    		else if (strcmp(defel->defname, "nowait") == 0)
    		{
    			if (o_nowait)
    				ereport(ERROR,
    						(errcode(ERRCODE_SYNTAX_ERROR),
    						 errmsg("duplicate option \"%s\"", defel->defname)));
    			opt->nowait = true;
    			o_nowait = true;
    		}
    		else if (strcmp(defel->defname, "wal") == 0)
    		{
    			if (o_wal)
    				ereport(ERROR,
    						(errcode(ERRCODE_SYNTAX_ERROR),
    						 errmsg("duplicate option \"%s\"", defel->defname)));
    			opt->includewal = true;
    			o_wal = true;
    		}
    		else
    			elog(ERROR, "option \"%s\" not recognized",
    				 defel->defname);
    	}
    	if (opt->label == NULL)
    		opt->label = "base backup";
    }
    
    
    /*
     * SendBaseBackup() - send a complete base backup.
     *
     * The function will put the system into backup mode like pg_start_backup()
     * does, so that the backup is consistent even though we read directly from
     * the filesystem, bypassing the buffer cache.
     */
    void
    SendBaseBackup(BaseBackupCmd *cmd)
    {
    	DIR		   *dir;
    	basebackup_options opt;
    
    	parse_basebackup_options(cmd->options, &opt);
    
    	WalSndSetState(WALSNDSTATE_BACKUP);
    
    	if (update_process_title)
    	{
    		char		activitymsg[50];
    
    		snprintf(activitymsg, sizeof(activitymsg), "sending backup \"%s\"",
    				 opt.label);
    		set_ps_display(activitymsg, false);
    	}
    
    	/* Make sure we can open the directory with tablespaces in it */
    	dir = AllocateDir("pg_tblspc");
    	if (!dir)
    		ereport(ERROR,
    				(errmsg("could not open directory \"%s\": %m", "pg_tblspc")));
    
    	perform_base_backup(&opt, dir);
    
    	FreeDir(dir);
    }
    
    static void
    send_int8_string(StringInfoData *buf, int64 intval)
    {
    	char		is[32];
    
    	sprintf(is, INT64_FORMAT, intval);
    	pq_sendint(buf, strlen(is), 4);
    	pq_sendbytes(buf, is, strlen(is));
    }
    
    static void
    SendBackupHeader(List *tablespaces)
    {
    	StringInfoData buf;
    	ListCell   *lc;
    
    	/* Construct and send the directory information */
    	pq_beginmessage(&buf, 'T'); /* RowDescription */
    	pq_sendint(&buf, 3, 2);		/* 3 fields */
    
    	/* First field - spcoid */
    	pq_sendstring(&buf, "spcoid");
    	pq_sendint(&buf, 0, 4);		/* table oid */
    	pq_sendint(&buf, 0, 2);		/* attnum */
    	pq_sendint(&buf, OIDOID, 4);	/* type oid */
    	pq_sendint(&buf, 4, 2);		/* typlen */
    	pq_sendint(&buf, 0, 4);		/* typmod */
    	pq_sendint(&buf, 0, 2);		/* format code */
    
    	/* Second field - spcpath */
    	pq_sendstring(&buf, "spclocation");
    	pq_sendint(&buf, 0, 4);
    	pq_sendint(&buf, 0, 2);
    	pq_sendint(&buf, TEXTOID, 4);
    	pq_sendint(&buf, -1, 2);
    	pq_sendint(&buf, 0, 4);
    	pq_sendint(&buf, 0, 2);
    
    	/* Third field - size */
    	pq_sendstring(&buf, "size");
    	pq_sendint(&buf, 0, 4);
    	pq_sendint(&buf, 0, 2);
    	pq_sendint(&buf, INT8OID, 4);
    	pq_sendint(&buf, 8, 2);
    	pq_sendint(&buf, 0, 4);
    	pq_sendint(&buf, 0, 2);
    	pq_endmessage(&buf);
    
    	foreach(lc, tablespaces)
    	{
    		tablespaceinfo *ti = lfirst(lc);
    
    		/* Send one datarow message */
    		pq_beginmessage(&buf, 'D');
    		pq_sendint(&buf, 3, 2); /* number of columns */
    		if (ti->path == NULL)
    		{
    			pq_sendint(&buf, -1, 4);	/* Length = -1 ==> NULL */
    			pq_sendint(&buf, -1, 4);
    		}
    		else
    		{
    			pq_sendint(&buf, strlen(ti->oid), 4);		/* length */
    			pq_sendbytes(&buf, ti->oid, strlen(ti->oid));
    			pq_sendint(&buf, strlen(ti->path), 4);		/* length */
    			pq_sendbytes(&buf, ti->path, strlen(ti->path));
    		}
    		if (ti->size >= 0)
    			send_int8_string(&buf, ti->size / 1024);
    		else
    			pq_sendint(&buf, -1, 4);	/* NULL */
    
    		pq_endmessage(&buf);
    	}
    
    	/* Send a CommandComplete message */
    	pq_puttextmessage('C', "SELECT");
    }
    
    /*
     * Send a single resultset containing just a single
     * XlogRecPtr record (in text format)
     */
    static void
    SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
    {
    	StringInfoData buf;
    	char		str[MAXFNAMELEN];
    
    	pq_beginmessage(&buf, 'T'); /* RowDescription */
    	pq_sendint(&buf, 2, 2);		/* 2 fields */
    
    	/* Field headers */
    	pq_sendstring(&buf, "recptr");
    	pq_sendint(&buf, 0, 4);		/* table oid */
    	pq_sendint(&buf, 0, 2);		/* attnum */
    	pq_sendint(&buf, TEXTOID, 4);		/* type oid */
    	pq_sendint(&buf, -1, 2);
    	pq_sendint(&buf, 0, 4);
    	pq_sendint(&buf, 0, 2);
    
    	pq_sendstring(&buf, "tli");
    	pq_sendint(&buf, 0, 4);		/* table oid */
    	pq_sendint(&buf, 0, 2);		/* attnum */
    
    	/*
    	 * int8 may seem like a surprising data type for this, but in thory int4
    	 * would not be wide enough for this, as TimeLineID is unsigned.
    	 */
    	pq_sendint(&buf, INT8OID, 4);		/* type oid */
    	pq_sendint(&buf, -1, 2);
    	pq_sendint(&buf, 0, 4);
    	pq_sendint(&buf, 0, 2);
    	pq_endmessage(&buf);
    
    	/* Data row */
    	pq_beginmessage(&buf, 'D');
    	pq_sendint(&buf, 2, 2);		/* number of columns */
    
    	snprintf(str, sizeof(str), "%X/%X", (uint32) (ptr >> 32), (uint32) ptr);
    	pq_sendint(&buf, strlen(str), 4);	/* length */
    	pq_sendbytes(&buf, str, strlen(str));
    
    	snprintf(str, sizeof(str), "%u", tli);
    	pq_sendint(&buf, strlen(str), 4);	/* length */
    	pq_sendbytes(&buf, str, strlen(str));
    	pq_endmessage(&buf);
    
    	/* Send a CommandComplete message */
    	pq_puttextmessage('C', "SELECT");
    }
    
    /*
     * Inject a file with given name and content in the output tar stream.
     */
    static void
    sendFileWithContent(const char *filename, const char *content)
    {
    	struct stat statbuf;
    	int			pad,
    				len;
    
    	len = strlen(content);
    
    	/*
    	 * Construct a stat struct for the backup_label file we're injecting in
    	 * the tar.
    	 */
    	/* Windows doesn't have the concept of uid and gid */
    #ifdef WIN32
    	statbuf.st_uid = 0;
    	statbuf.st_gid = 0;
    #else
    	statbuf.st_uid = geteuid();
    	statbuf.st_gid = getegid();
    #endif
    	statbuf.st_mtime = time(NULL);
    	statbuf.st_mode = S_IRUSR | S_IWUSR;
    	statbuf.st_size = len;
    
    	_tarWriteHeader(filename, NULL, &statbuf);
    	/* Send the contents as a CopyData message */
    	pq_putmessage('d', content, len);
    
    	/* Pad to 512 byte boundary, per tar format requirements */
    	pad = ((len + 511) & ~511) - len;
    	if (pad > 0)
    	{
    		char		buf[512];
    
    		MemSet(buf, 0, pad);
    		pq_putmessage('d', buf, pad);
    	}
    }
    
    /*
     * Include the tablespace directory pointed to by 'path' in the output tar
     * stream.	If 'sizeonly' is true, we just calculate a total length and return
     * it, without actually sending anything.
     *
     * Only used to send auxiliary tablespaces, not PGDATA.
     */
    static int64
    sendTablespace(char *path, bool sizeonly)
    {
    	int64		size;
    	char		pathbuf[MAXPGPATH];
    	struct stat statbuf;
    
    	/*
    	 * 'path' points to the tablespace location, but we only want to include
    	 * the version directory in it that belongs to us.
    	 */
    	snprintf(pathbuf, sizeof(pathbuf), "%s/%s", path,
    			 TABLESPACE_VERSION_DIRECTORY);
    
    	/*
    	 * Store a directory entry in the tar file so we get the permissions
    	 * right.
    	 */
    	if (lstat(pathbuf, &statbuf) != 0)
    	{
    		if (errno != ENOENT)
    			ereport(ERROR,
    					(errcode_for_file_access(),
    					 errmsg("could not stat file or directory \"%s\": %m",
    							pathbuf)));
    
    		/* If the tablespace went away while scanning, it's no error. */
    		return 0;
    	}
    	if (!sizeonly)
    		_tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf);
    	size = 512;					/* Size of the header just added */
    
    	/* Send all the files in the tablespace version directory */
    	size += sendDir(pathbuf, strlen(path), sizeonly, NIL);
    
    	return size;
    }
    
    /*
     * Include all files from the given directory in the output tar stream. If
     * 'sizeonly' is true, we just calculate a total length and return it, without
     * actually sending anything.
     *
     * Omit any directory in the tablespaces list, to avoid backing up
     * tablespaces twice when they were created inside PGDATA.
     */
    static int64
    sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
    {
    	DIR		   *dir;
    	struct dirent *de;
    	char		pathbuf[MAXPGPATH];
    	struct stat statbuf;
    	int64		size = 0;
    
    	dir = AllocateDir(path);
    	while ((de = ReadDir(dir, path)) != NULL)
    	{
    		/* Skip special stuff */
    		if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
    			continue;
    
    		/* Skip temporary files */
    		if (strncmp(de->d_name,
    					PG_TEMP_FILE_PREFIX,
    					strlen(PG_TEMP_FILE_PREFIX)) == 0)
    			continue;
    
    		/* skip auto conf temporary file */
    		if (strncmp(de->d_name,
    					PG_AUTOCONF_FILENAME ".tmp",
    					sizeof(PG_AUTOCONF_FILENAME) + 4) == 0)
    			continue;
    
    		/*
    		 * If there's a backup_label file, it belongs to a backup started by
    		 * the user with pg_start_backup(). It is *not* correct for this
    		 * backup, our backup_label is injected into the tar separately.
    		 */
    		if (strcmp(de->d_name, BACKUP_LABEL_FILE) == 0)
    			continue;
    
    		/*
    		 * Check if the postmaster has signaled us to exit, and abort with an
    		 * error in that case. The error handler further up will call
    		 * do_pg_abort_backup() for us. Also check that if the backup was
    		 * started while still in recovery, the server wasn't promoted.
    		 * dp_pg_stop_backup() will check that too, but it's better to stop
    		 * the backup early than continue to the end and fail there.
    		 */
    		CHECK_FOR_INTERRUPTS();
    		if (RecoveryInProgress() != backup_started_in_recovery)
    			ereport(ERROR,
    					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    					 errmsg("the standby was promoted during online backup"),
    				 errhint("This means that the backup being taken is corrupt "
    						 "and should not be used. "
    						 "Try taking another online backup.")));
    
    		snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);
    
    		/* Skip postmaster.pid and postmaster.opts in the data directory */
    		if (strcmp(pathbuf, "./postmaster.pid") == 0 ||
    			strcmp(pathbuf, "./postmaster.opts") == 0)
    			continue;
    
    		/* Skip pg_control here to back up it last */
    		if (strcmp(pathbuf, "./global/pg_control") == 0)
    			continue;
    
    		if (lstat(pathbuf, &statbuf) != 0)
    		{
    			if (errno != ENOENT)
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not stat file or directory \"%s\": %m",
    								pathbuf)));
    
    			/* If the file went away while scanning, it's no error. */
    			continue;
    		}
    
    		/*
    		 * Skip temporary statistics files. PG_STAT_TMP_DIR must be skipped
    		 * even when stats_temp_directory is set because PGSS_TEXT_FILE is
    		 * always created there.
    		 */
    		if ((statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0) ||
    			strncmp(de->d_name, PG_STAT_TMP_DIR, strlen(PG_STAT_TMP_DIR)) == 0)
    		{
    			if (!sizeonly)
    				_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
    			size += 512;
    			continue;
    		}
    
    		/*
    		 * Skip pg_replslot, not useful to copy. But include it as an empty
    		 * directory anyway, so we get permissions right.
    		 */
    		if (strcmp(de->d_name, "pg_replslot") == 0)
    		{
    			if (!sizeonly)
    				_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
    			size += 512;		/* Size of the header just added */
    			continue;
    		}
    
    		/*
    		 * We can skip pg_xlog, the WAL segments need to be fetched from the
    		 * WAL archive anyway. But include it as an empty directory anyway, so
    		 * we get permissions right.
    		 */
    		if (strcmp(pathbuf, "./pg_xlog") == 0)
    		{
    			if (!sizeonly)
    			{
    				/* If pg_xlog is a symlink, write it as a directory anyway */
    #ifndef WIN32
    				if (S_ISLNK(statbuf.st_mode))
    #else
    				if (pgwin32_is_junction(pathbuf))
    #endif
    					statbuf.st_mode = S_IFDIR | S_IRWXU;
    				_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
    			}
    			size += 512;		/* Size of the header just added */
    			continue;			/* don't recurse into pg_xlog */
    		}
    
    		/* Allow symbolic links in pg_tblspc only */
    		if (strcmp(path, "./pg_tblspc") == 0 &&
    #ifndef WIN32
    			S_ISLNK(statbuf.st_mode)
    #else
    			pgwin32_is_junction(pathbuf)
    #endif
    			)
    		{
    #if defined(HAVE_READLINK) || defined(WIN32)
    			char		linkpath[MAXPGPATH];
    			int			rllen;
    
    			rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
    			if (rllen < 0)
    				ereport(ERROR,
    						(errcode_for_file_access(),
    						 errmsg("could not read symbolic link \"%s\": %m",
    								pathbuf)));
    			if (rllen >= sizeof(linkpath))
    				ereport(ERROR,
    						(errmsg("symbolic link \"%s\" target is too long",
    								pathbuf)));
    			linkpath[rllen] = '\0';
    
    			if (!sizeonly)
    				_tarWriteHeader(pathbuf + basepathlen + 1, linkpath, &statbuf);
    			size += 512;		/* Size of the header just added */
    #else
    
    			/*
    			 * If the platform does not have symbolic links, it should not be
    			 * possible to have tablespaces - clearly somebody else created
    			 * them. Warn about it and ignore.
    			 */
    			ereport(WARNING,
    					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    				  errmsg("tablespaces are not supported on this platform")));
    			continue;
    #endif   /* HAVE_READLINK */
    		}
    		else if (S_ISDIR(statbuf.st_mode))
    		{
    			bool		skip_this_dir = false;
    			ListCell   *lc;
    
    			/*
    			 * Store a directory entry in the tar file so we can get the
    			 * permissions right.
    			 */
    			if (!sizeonly)
    				_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf);
    			size += 512;		/* Size of the header just added */
    
    			/*
    			 * Call ourselves recursively for a directory, unless it happens
    			 * to be a separate tablespace located within PGDATA.
    			 */
    			foreach(lc, tablespaces)
    			{
    				tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
    
    				/*
    				 * ti->rpath is the tablespace relative path within PGDATA, or
    				 * NULL if the tablespace has been properly located somewhere
    				 * else.
    				 *
    				 * Skip past the leading "./" in pathbuf when comparing.
    				 */
    				if (ti->rpath && strcmp(ti->rpath, pathbuf + 2) == 0)
    				{
    					skip_this_dir = true;
    					break;
    				}
    			}
    			if (!skip_this_dir)
    				size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces);
    		}
    		else if (S_ISREG(statbuf.st_mode))
    		{
    			bool		sent = false;
    
    			if (!sizeonly)
    				sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
    								true);
    
    			if (sent || sizeonly)
    			{
    				/* Add size, rounded up to 512byte block */
    				size += ((statbuf.st_size + 511) & ~511);
    				size += 512;	/* Size of the header of the file */
    			}
    		}
    		else
    			ereport(WARNING,
    					(errmsg("skipping special file \"%s\"", pathbuf)));
    	}
    	FreeDir(dir);
    	return size;
    }
    
    /*****
     * Functions for handling tar file format
     *
     * Copied from pg_dump, but modified to work with libpq for sending
     */
    
    
    /*
     * Maximum file size for a tar member: The limit inherent in the
     * format is 2^33-1 bytes (nearly 8 GB).  But we don't want to exceed
     * what we can represent in pgoff_t.
     */
    #define MAX_TAR_MEMBER_FILELEN (((int64) 1 << Min(33, sizeof(pgoff_t)*8 - 1)) - 1)
    
    /*
     * Given the member, write the TAR header & send the file.
     *
     * If 'missing_ok' is true, will not throw an error if the file is not found.
     *
     * Returns true if the file was successfully sent, false if 'missing_ok',
     * and the file did not exist.
     */
    static bool
    sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
    		 bool missing_ok)
    {
    	FILE	   *fp;
    	char		buf[TAR_SEND_SIZE];
    	size_t		cnt;
    	pgoff_t		len = 0;
    	size_t		pad;
    
    	fp = AllocateFile(readfilename, "rb");
    	if (fp == NULL)
    	{
    		if (errno == ENOENT && missing_ok)
    			return false;
    		ereport(ERROR,
    				(errcode_for_file_access(),
    				 errmsg("could not open file \"%s\": %m", readfilename)));
    	}
    
    	/*
    	 * Some compilers will throw a warning knowing this test can never be true
    	 * because pgoff_t can't exceed the compared maximum on their platform.
    	 */
    	if (statbuf->st_size > MAX_TAR_MEMBER_FILELEN)
    		ereport(ERROR,
    				(errmsg("archive member \"%s\" too large for tar format",
    						tarfilename)));
    
    	_tarWriteHeader(tarfilename, NULL, statbuf);
    
    	while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
    	{
    		/* Send the chunk as a CopyData message */
    		if (pq_putmessage('d', buf, cnt))
    			ereport(ERROR,
    			   (errmsg("base backup could not send data, aborting backup")));
    
    		len += cnt;
    
    		if (len >= statbuf->st_size)
    		{
    			/*
    			 * Reached end of file. The file could be longer, if it was
    			 * extended while we were sending it, but for a base backup we can
    			 * ignore such extended data. It will be restored from WAL.
    			 */
    			break;
    		}
    	}
    
    	/* If the file was truncated while we were sending it, pad it with zeros */
    	if (len < statbuf->st_size)
    	{
    		MemSet(buf, 0, sizeof(buf));
    		while (len < statbuf->st_size)
    		{
    			cnt = Min(sizeof(buf), statbuf->st_size - len);
    			pq_putmessage('d', buf, cnt);
    			len += cnt;
    		}
    	}
    
    	/* Pad to 512 byte boundary, per tar format requirements */
    	pad = ((len + 511) & ~511) - len;
    	if (pad > 0)
    	{
    		MemSet(buf, 0, pad);
    		pq_putmessage('d', buf, pad);
    	}
    
    	FreeFile(fp);
    
    	return true;
    }
    
    
    static void
    _tarWriteHeader(const char *filename, const char *linktarget,
    				struct stat * statbuf)
    {
    	char		h[512];
    
    	tarCreateHeader(h, filename, linktarget, statbuf->st_size,
    					statbuf->st_mode, statbuf->st_uid, statbuf->st_gid,
    					statbuf->st_mtime);
    
    	pq_putmessage('d', h, 512);
    }