diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 634b4444f9f72140a0fef17e4fb675d5ef300ed1..0e2bfa106a741d99ff83d84459152c62d0c3ec87 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -20,32 +20,25 @@ * the desired number of worker processes, which each enter WaitForCommands(). * * The master process dispatches an individual work item to one of the worker - * processes in DispatchJobForTocEntry(). That calls - * AH->MasterStartParallelItemPtr, a routine of the output format. This - * function's arguments are the parents archive handle AH (containing the full - * catalog information), the TocEntry that the worker should work on and a - * T_Action value indicating whether this is a backup or a restore task. The - * function simply converts the TocEntry assignment into a command string that - * is then sent over to the worker process. In the simplest case that would be - * something like "DUMP 1234", with 1234 being the TocEntry id. - * + * processes in DispatchJobForTocEntry(). We send a command string such as + * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID. * The worker process receives and decodes the command and passes it to the * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr, * which are routines of the current archive format. That routine performs - * the required action (dump or restore) and returns a malloc'd status string. - * The status string is passed back to the master where it is interpreted by - * AH->MasterEndParallelItemPtr, another format-specific routine. That - * function can update format-specific information on the master's side, - * depending on the reply from the worker process. In the end it returns a - * status code, which we pass to the ParallelCompletionPtr callback function - * that was passed to DispatchJobForTocEntry(). The callback function does - * state updating for the master control logic in pg_backup_archiver.c. + * the required action (dump or restore) and returns an integer status code. + * This is passed back to the master where we pass it to the + * ParallelCompletionPtr callback function that was passed to + * DispatchJobForTocEntry(). The callback function does state updating + * for the master control logic in pg_backup_archiver.c. * - * Remember that we have forked off the workers only after we have read in - * the catalog. That's why our worker processes can also access the catalog - * information. (In the Windows case, the workers are threads in the same - * process. To avoid problems, they work with cloned copies of the Archive - * data structure; see RunWorker().) + * In principle additional archive-format-specific information might be needed + * in commands or worker status responses, but so far that hasn't proved + * necessary, since workers have full copies of the ArchiveHandle/TocEntry + * data structures. Remember that we have forked off the workers only after + * we have read in the catalog. That's why our worker processes can also + * access the catalog information. (In the Windows case, the workers are + * threads in the same process. To avoid problems, they work with cloned + * copies of the Archive data structure; see RunWorker().) * * In the master process, the workerStatus field for each worker has one of * the following values: @@ -1073,6 +1066,110 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) free(pstate); } +/* + * These next four functions handle construction and parsing of the command + * strings and response strings for parallel workers. + * + * Currently, these can be the same regardless of which archive format we are + * processing. In future, we might want to let format modules override these + * functions to add format-specific data to a command or response. + */ + +/* + * buildWorkerCommand: format a command string to send to a worker. + * + * The string is built in the caller-supplied buffer of size buflen. + */ +static void +buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act, + char *buf, int buflen) +{ + if (act == ACT_DUMP) + snprintf(buf, buflen, "DUMP %d", te->dumpId); + else if (act == ACT_RESTORE) + snprintf(buf, buflen, "RESTORE %d", te->dumpId); + else + Assert(false); +} + +/* + * parseWorkerCommand: interpret a command string in a worker. + */ +static void +parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act, + const char *msg) +{ + DumpId dumpId; + int nBytes; + + if (messageStartsWith(msg, "DUMP ")) + { + *act = ACT_DUMP; + sscanf(msg, "DUMP %d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(msg)); + *te = getTocEntryByDumpId(AH, dumpId); + Assert(*te != NULL); + } + else if (messageStartsWith(msg, "RESTORE ")) + { + *act = ACT_RESTORE; + sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes); + Assert(nBytes == strlen(msg)); + *te = getTocEntryByDumpId(AH, dumpId); + Assert(*te != NULL); + } + else + exit_horribly(modulename, + "unrecognized command received from master: \"%s\"\n", + msg); +} + +/* + * buildWorkerResponse: format a response string to send to the master. + * + * The string is built in the caller-supplied buffer of size buflen. + */ +static void +buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status, + char *buf, int buflen) +{ + snprintf(buf, buflen, "OK %d %d %d", + te->dumpId, + status, + status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); +} + +/* + * parseWorkerResponse: parse the status message returned by a worker. + * + * Returns the integer status code, and may update fields of AH and/or te. + */ +static int +parseWorkerResponse(ArchiveHandle *AH, TocEntry *te, + const char *msg) +{ + DumpId dumpId; + int nBytes, + n_errors; + int status = 0; + + if (messageStartsWith(msg, "OK ")) + { + sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes); + + Assert(dumpId == te->dumpId); + Assert(nBytes == strlen(msg)); + + AH->public.n_errors += n_errors; + } + else + exit_horribly(modulename, + "invalid message received from worker: \"%s\"\n", + msg); + + return status; +} + /* * Dispatch a job to some free worker. * @@ -1091,18 +1188,16 @@ DispatchJobForTocEntry(ArchiveHandle *AH, void *callback_data) { int worker; - char *arg; + char buf[256]; /* Get a worker, waiting if none are idle */ while ((worker = GetIdleWorker(pstate)) == NO_SLOT) WaitForWorkers(AH, pstate, WFW_ONE_IDLE); /* Construct and send command string */ - arg = (AH->MasterStartParallelItemPtr) (AH, te, act); - - sendMessageToWorker(pstate, worker, arg); + buildWorkerCommand(AH, te, act, buf, sizeof(buf)); - /* XXX aren't we leaking string here? (no, because it's static. Ick.) */ + sendMessageToWorker(pstate, worker, buf); /* Remember worker is busy, and which TocEntry it's working on */ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; @@ -1220,10 +1315,10 @@ static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]) { char *command; - DumpId dumpId; - int nBytes; - char *str; TocEntry *te; + T_Action act; + int status = 0; + char buf[256]; for (;;) { @@ -1233,47 +1328,29 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) return; } - if (messageStartsWith(command, "DUMP ")) - { - /* Decode the command */ - sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes); - Assert(nBytes == strlen(command) - strlen("DUMP ")); - te = getTocEntryByDumpId(AH, dumpId); - Assert(te != NULL); + /* Decode the command */ + parseWorkerCommand(AH, &te, &act, command); + if (act == ACT_DUMP) + { /* Acquire lock on this table within the worker's session */ lockTableForWorker(AH, te); /* Perform the dump command */ - str = (AH->WorkerJobDumpPtr) (AH, te); - - /* Return status to master */ - sendMessageToMaster(pipefd, str); - - /* we are responsible for freeing the status string */ - free(str); + status = (AH->WorkerJobDumpPtr) (AH, te); } - else if (messageStartsWith(command, "RESTORE ")) + else if (act == ACT_RESTORE) { - /* Decode the command */ - sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes); - Assert(nBytes == strlen(command) - strlen("RESTORE ")); - te = getTocEntryByDumpId(AH, dumpId); - Assert(te != NULL); - /* Perform the restore command */ - str = (AH->WorkerJobRestorePtr) (AH, te); - - /* Return status to master */ - sendMessageToMaster(pipefd, str); - - /* we are responsible for freeing the status string */ - free(str); + status = (AH->WorkerJobRestorePtr) (AH, te); } else - exit_horribly(modulename, - "unrecognized command received from master: \"%s\"\n", - command); + Assert(false); + + /* Return status to master */ + buildWorkerResponse(AH, te, act, status, buf, sizeof(buf)); + + sendMessageToMaster(pipefd, buf); /* command was pg_malloc'd and we are responsible for free()ing it. */ free(command); @@ -1286,9 +1363,9 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2]) * If do_wait is true, wait to get a status message; otherwise, just return * immediately if there is none available. * - * When we get a status message, we let MasterEndParallelItemPtr process it, - * then pass the resulting status code to the callback function that was - * specified to DispatchJobForTocEntry, then reset the worker status to IDLE. + * When we get a status message, we pass the status code to the callback + * function that was specified to DispatchJobForTocEntry, then reset the + * worker status to IDLE. * * Returns true if we collected a status message, else false. * @@ -1318,29 +1395,10 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) { ParallelSlot *slot = &pstate->parallelSlot[worker]; TocEntry *te = slot->te; - char *statusString; int status; - if (messageStartsWith(msg, "OK RESTORE ")) - { - statusString = msg + strlen("OK RESTORE "); - status = - (AH->MasterEndParallelItemPtr) - (AH, te, statusString, ACT_RESTORE); - slot->callback(AH, te, status, slot->callback_data); - } - else if (messageStartsWith(msg, "OK DUMP ")) - { - statusString = msg + strlen("OK DUMP "); - status = - (AH->MasterEndParallelItemPtr) - (AH, te, statusString, ACT_DUMP); - slot->callback(AH, te, status, slot->callback_data); - } - else - exit_horribly(modulename, - "invalid message received from worker: \"%s\"\n", - msg); + status = parseWorkerResponse(AH, te, msg); + slot->callback(AH, te, status, slot->callback_data); slot->workerStatus = WRKR_IDLE; slot->te = NULL; } @@ -1364,8 +1422,8 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) * WFW_ONE_IDLE: wait for at least one worker to be idle * WFW_ALL_IDLE: wait for all workers to be idle * - * Any received results are passed to MasterEndParallelItemPtr and then - * to the callback specified to DispatchJobForTocEntry. + * Any received results are passed to the callback specified to + * DispatchJobForTocEntry. * * This function is executed in the master process. */ diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index 123aa5dc848f2a13e52ef32160918449da5e8985..97d34a52974fbe4502bc67b7e4265c93634fb0ab 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -161,12 +161,8 @@ typedef void (*PrintTocDataPtr) (ArchiveHandle *AH, TocEntry *te); typedef void (*ClonePtr) (ArchiveHandle *AH); typedef void (*DeClonePtr) (ArchiveHandle *AH); -typedef char *(*WorkerJobRestorePtr) (ArchiveHandle *AH, TocEntry *te); -typedef char *(*WorkerJobDumpPtr) (ArchiveHandle *AH, TocEntry *te); -typedef char *(*MasterStartParallelItemPtr) (ArchiveHandle *AH, TocEntry *te, - T_Action act); -typedef int (*MasterEndParallelItemPtr) (ArchiveHandle *AH, TocEntry *te, - const char *str, T_Action act); +typedef int (*WorkerJobDumpPtr) (ArchiveHandle *AH, TocEntry *te); +typedef int (*WorkerJobRestorePtr) (ArchiveHandle *AH, TocEntry *te); typedef size_t (*CustomOutPtr) (ArchiveHandle *AH, const void *buf, size_t len); @@ -266,9 +262,6 @@ struct _archiveHandle StartBlobPtr StartBlobPtr; EndBlobPtr EndBlobPtr; - MasterStartParallelItemPtr MasterStartParallelItemPtr; - MasterEndParallelItemPtr MasterEndParallelItemPtr; - SetupWorkerPtr SetupWorkerPtr; WorkerJobDumpPtr WorkerJobDumpPtr; WorkerJobRestorePtr WorkerJobRestorePtr; diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index c4f487a7cab87490760e6811eefad7144b60883c..5388c08b29d1754e4494d448b800f93bde60aad4 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -61,9 +61,7 @@ static void _LoadBlobs(ArchiveHandle *AH, bool drop); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); -static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act); -static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act); -char *_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te); +static int _WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te); typedef struct { @@ -133,9 +131,6 @@ InitArchiveFmt_Custom(ArchiveHandle *AH) AH->ClonePtr = _Clone; AH->DeClonePtr = _DeClone; - AH->MasterStartParallelItemPtr = _MasterStartParallelItem; - AH->MasterEndParallelItemPtr = _MasterEndParallelItem; - /* no parallel dump in the custom archive, only parallel restore */ AH->WorkerJobDumpPtr = NULL; AH->WorkerJobRestorePtr = _WorkerJobRestoreCustom; @@ -808,73 +803,13 @@ _DeClone(ArchiveHandle *AH) } /* - * This function is executed in the child of a parallel backup for the - * custom format archive and dumps the actual data. - */ -char * -_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) -{ - /* - * short fixed-size string + some ID so far, this needs to be malloc'ed - * instead of static because we work with threads on windows - */ - const int buflen = 64; - char *buf = (char *) pg_malloc(buflen); - int status; - - status = parallel_restore(AH, te); - - snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status, - status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); - - return buf; -} - -/* - * This function is executed in the parent process. Depending on the desired - * action (dump or restore) it creates a string that is understood by the - * _WorkerJobDump /_WorkerJobRestore functions of the dump format. - */ -static char * -_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act) -{ - /* - * A static char is okay here, even on Windows because we call this - * function only from one process (the master). - */ - static char buf[64]; /* short fixed-size string + number */ - - /* no parallel dump in the custom archive format */ - Assert(act == ACT_RESTORE); - - snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId); - - return buf; -} - -/* - * This function is executed in the parent process. It analyzes the response of - * the _WorkerJobDump / _WorkerJobRestore functions of the dump format. + * This function is executed in the child of a parallel restore from a + * custom-format archive and restores the actual data for one TOC entry. */ static int -_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act) +_WorkerJobRestoreCustom(ArchiveHandle *AH, TocEntry *te) { - DumpId dumpId; - int nBytes, - status, - n_errors; - - /* no parallel dump in the custom archive */ - Assert(act == ACT_RESTORE); - - sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes); - - Assert(nBytes == strlen(str)); - Assert(dumpId == te->dumpId); - - AH->public.n_errors += n_errors; - - return status; + return parallel_restore(AH, te); } /*-------------------------------------------------- diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index 8071259acba86c0afc7db7850d4d178f4c9ea123..ae443717cff66287fbb31082d0322aefb9950543 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -89,11 +89,8 @@ static void _LoadBlobs(ArchiveHandle *AH); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); -static char *_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act); -static int _MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, - const char *str, T_Action act); -static char *_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te); -static char *_WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te); +static int _WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te); +static int _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te); static void setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename); @@ -140,9 +137,6 @@ InitArchiveFmt_Directory(ArchiveHandle *AH) AH->WorkerJobRestorePtr = _WorkerJobRestoreDirectory; AH->WorkerJobDumpPtr = _WorkerJobDumpDirectory; - AH->MasterStartParallelItemPtr = _MasterStartParallelItem; - AH->MasterEndParallelItemPtr = _MasterEndParallelItem; - /* Set up our private context */ ctx = (lclContext *) pg_malloc0(sizeof(lclContext)); AH->formatData = (void *) ctx; @@ -754,53 +748,12 @@ _DeClone(ArchiveHandle *AH) } /* - * This function is executed in the parent process. Depending on the desired - * action (dump or restore) it creates a string that is understood by the - * _WorkerJobDump /_WorkerJobRestore functions of the dump format. - */ -static char * -_MasterStartParallelItem(ArchiveHandle *AH, TocEntry *te, T_Action act) -{ - /* - * A static char is okay here, even on Windows because we call this - * function only from one process (the master). - */ - static char buf[64]; - - if (act == ACT_DUMP) - snprintf(buf, sizeof(buf), "DUMP %d", te->dumpId); - else if (act == ACT_RESTORE) - snprintf(buf, sizeof(buf), "RESTORE %d", te->dumpId); - - return buf; -} - -/* - * This function is executed in the child of a parallel backup for the - * directory archive and dumps the actual data. - * - * We are currently returning only the DumpId so theoretically we could - * make this function returning an int (or a DumpId). However, to - * facilitate further enhancements and because sooner or later we need to - * convert this to a string and send it via a message anyway, we stick with - * char *. It is parsed on the other side by the _EndMasterParallel() - * function of the respective dump format. + * This function is executed in the child of a parallel backup for a + * directory-format archive and dumps the actual data for one TOC entry. */ -static char * +static int _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te) { - /* - * short fixed-size string + some ID so far, this needs to be malloc'ed - * instead of static because we work with threads on windows - */ - const int buflen = 64; - char *buf = (char *) pg_malloc(buflen); - lclTocEntry *tctx = (lclTocEntry *) te->formatData; - - /* This should never happen */ - if (!tctx) - exit_horribly(modulename, "error during backup\n"); - /* * This function returns void. We either fail and die horribly or * succeed... A failure will be detected by the parent when the child dies @@ -808,63 +761,15 @@ _WorkerJobDumpDirectory(ArchiveHandle *AH, TocEntry *te) */ WriteDataChunksForTocEntry(AH, te); - snprintf(buf, buflen, "OK DUMP %d", te->dumpId); - - return buf; + return 0; } /* - * This function is executed in the child of a parallel backup for the - * directory archive and dumps the actual data. - */ -static char * -_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te) -{ - /* - * short fixed-size string + some ID so far, this needs to be malloc'ed - * instead of static because we work with threads on windows - */ - const int buflen = 64; - char *buf = (char *) pg_malloc(buflen); - int status; - - status = parallel_restore(AH, te); - - snprintf(buf, buflen, "OK RESTORE %d %d %d", te->dumpId, status, - status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0); - - return buf; -} - -/* - * This function is executed in the parent process. It analyzes the response of - * the _WorkerJobDumpDirectory/_WorkerJobRestoreDirectory functions of the - * respective dump format. + * This function is executed in the child of a parallel restore from a + * directory-format archive and restores the actual data for one TOC entry. */ static int -_MasterEndParallelItem(ArchiveHandle *AH, TocEntry *te, const char *str, T_Action act) +_WorkerJobRestoreDirectory(ArchiveHandle *AH, TocEntry *te) { - DumpId dumpId; - int nBytes, - n_errors; - int status = 0; - - if (act == ACT_DUMP) - { - sscanf(str, "%d%n", &dumpId, &nBytes); - - Assert(dumpId == te->dumpId); - Assert(nBytes == strlen(str)); - } - else if (act == ACT_RESTORE) - { - sscanf(str, "%d %d %d%n", &dumpId, &status, &n_errors, &nBytes); - - Assert(dumpId == te->dumpId); - Assert(nBytes == strlen(str)); - - AH->public.n_errors += n_errors; - } - - return status; + return parallel_restore(AH, te); } diff --git a/src/bin/pg_dump/pg_backup_tar.c b/src/bin/pg_dump/pg_backup_tar.c index 8dfc6a98de1117e62c74654a72aa18c394efe36b..9cadd0c4a459b714742e6cf512e09e61a6febf9c 100644 --- a/src/bin/pg_dump/pg_backup_tar.c +++ b/src/bin/pg_dump/pg_backup_tar.c @@ -152,9 +152,6 @@ InitArchiveFmt_Tar(ArchiveHandle *AH) AH->ClonePtr = NULL; AH->DeClonePtr = NULL; - AH->MasterStartParallelItemPtr = NULL; - AH->MasterEndParallelItemPtr = NULL; - AH->WorkerJobDumpPtr = NULL; AH->WorkerJobRestorePtr = NULL;