diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c index cd7da900b4f08afcdcf6b71c08325dceaad6a203..452e3a187dfdd612c01b4accb8e42e719aa93f99 100644 --- a/src/backend/storage/file/buffile.c +++ b/src/backend/storage/file/buffile.c @@ -6,7 +6,7 @@ * Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/file/buffile.c,v 1.1 1999/10/13 15:02:29 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/file/buffile.c,v 1.2 1999/10/16 19:49:26 tgl Exp $ * * NOTES: * @@ -27,10 +27,7 @@ * * BufFile also supports temporary files that exceed the OS file size limit * (by opening multiple fd.c temporary files). This is an essential feature - * for sorts and hashjoins on large amounts of data. It is possible to have - * more than one BufFile reading/writing the same temp file, although the - * caller is responsible for avoiding ill effects from buffer overlap when - * this is done. + * for sorts and hashjoins on large amounts of data. *------------------------------------------------------------------------- */ @@ -48,33 +45,24 @@ #define MAX_PHYSICAL_FILESIZE (RELSEG_SIZE * BLCKSZ) /* - * To handle multiple BufFiles on a single logical temp file, we use this - * data structure representing a logical file (which can be made up of - * multiple physical files to get around the OS file size limit). + * This data structure represents a buffered file that consists of one or + * more physical files (each accessed through a virtual file descriptor + * managed by fd.c). */ -typedef struct LogicalFile +struct BufFile { - int refCount; /* number of BufFiles using me */ - bool isTemp; /* can only add files if this is TRUE */ int numFiles; /* number of physical files in set */ /* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */ - File *files; /* palloc'd array with numFiles entries */ long *offsets; /* palloc'd array with numFiles entries */ /* offsets[i] is the current seek position of files[i]. We use this * to avoid making redundant FileSeek calls. */ -} LogicalFile; -/* - * A single file buffer looks like this. - */ -struct BufFile -{ - LogicalFile *logFile; /* the underlying LogicalFile */ + bool isTemp; /* can only add files if this is TRUE */ bool dirty; /* does buffer need to be written? */ /* - * "current pos" is position of start of buffer within LogicalFile. + * "current pos" is position of start of buffer within the logical file. * Position as seen by user of BufFile is (curFile, curOffset + pos). */ int curFile; /* file index (0..n) part of current pos */ @@ -84,30 +72,33 @@ struct BufFile char buffer[BLCKSZ]; }; -static LogicalFile *makeLogicalFile(File firstfile); -static void extendLogicalFile(LogicalFile *file); -static void deleteLogicalFile(LogicalFile *file); +static BufFile *makeBufFile(File firstfile); +static void extendBufFile(BufFile *file); static void BufFileLoadBuffer(BufFile *file); static void BufFileDumpBuffer(BufFile *file); static int BufFileFlush(BufFile *file); /* - * Create a LogicalFile with one component file and refcount 1. + * Create a BufFile given the first underlying physical file. * NOTE: caller must set isTemp true if appropriate. */ -static LogicalFile * -makeLogicalFile(File firstfile) +static BufFile * +makeBufFile(File firstfile) { - LogicalFile *file = (LogicalFile *) palloc(sizeof(LogicalFile)); + BufFile *file = (BufFile *) palloc(sizeof(BufFile)); - file->refCount = 1; - file->isTemp = false; file->numFiles = 1; file->files = (File *) palloc(sizeof(File)); file->files[0] = firstfile; file->offsets = (long *) palloc(sizeof(long)); file->offsets[0] = 0L; + file->isTemp = false; + file->dirty = false; + file->curFile = 0; + file->curOffset = 0L; + file->pos = 0; + file->nbytes = 0; return file; } @@ -116,7 +107,7 @@ makeLogicalFile(File firstfile) * Add another component temp file. */ static void -extendLogicalFile(LogicalFile *file) +extendBufFile(BufFile *file) { File pfile; @@ -133,21 +124,6 @@ extendLogicalFile(LogicalFile *file) file->numFiles++; } -/* - * Close and delete a LogicalFile when its refCount has gone to zero. - */ -static void -deleteLogicalFile(LogicalFile *file) -{ - int i; - - for (i = 0; i < file->numFiles; i++) - FileClose(file->files[i]); - pfree(file->files); - pfree(file->offsets); - pfree(file); -} - /* * Create a BufFile for a new temporary file (which will expand to become * multiple temporary files if more than MAX_PHYSICAL_FILESIZE bytes are @@ -156,24 +132,16 @@ deleteLogicalFile(LogicalFile *file) BufFile * BufFileCreateTemp(void) { - BufFile *bfile = (BufFile *) palloc(sizeof(BufFile)); + BufFile *file; File pfile; - LogicalFile *lfile; pfile = OpenTemporaryFile(); Assert(pfile >= 0); - lfile = makeLogicalFile(pfile); - lfile->isTemp = true; - - bfile->logFile = lfile; - bfile->dirty = false; - bfile->curFile = 0; - bfile->curOffset = 0L; - bfile->pos = 0; - bfile->nbytes = 0; + file = makeBufFile(pfile); + file->isTemp = true; - return bfile; + return file; } /* @@ -186,42 +154,7 @@ BufFileCreateTemp(void) BufFile * BufFileCreate(File file) { - BufFile *bfile = (BufFile *) palloc(sizeof(BufFile)); - LogicalFile *lfile; - - lfile = makeLogicalFile(file); - - bfile->logFile = lfile; - bfile->dirty = false; - bfile->curFile = 0; - bfile->curOffset = 0L; - bfile->pos = 0; - bfile->nbytes = 0; - - return bfile; -} - -/* - * Create an additional BufFile accessing the same underlying file as an - * existing BufFile. This is useful for having multiple read/write access - * positions in a single temporary file. Note the caller is responsible - * for avoiding trouble due to overlapping buffer positions! (Caller may - * assume that buffer size is BLCKSZ...) - */ -BufFile * -BufFileReaccess(BufFile *file) -{ - BufFile *bfile = (BufFile *) palloc(sizeof(BufFile)); - - bfile->logFile = file->logFile; - bfile->logFile->refCount++; - bfile->dirty = false; - bfile->curFile = 0; - bfile->curOffset = 0L; - bfile->pos = 0; - bfile->nbytes = 0; - - return bfile; + return makeBufFile(file); } /* @@ -232,16 +165,21 @@ BufFileReaccess(BufFile *file) void BufFileClose(BufFile *file) { + int i; + /* flush any unwritten data */ BufFileFlush(file); - /* close the underlying (with delete if it's a temp file) */ - if (--(file->logFile->refCount) <= 0) - deleteLogicalFile(file->logFile); + /* close the underlying file(s) (with delete if it's a temp file) */ + for (i = 0; i < file->numFiles; i++) + FileClose(file->files[i]); /* release the buffer space */ + pfree(file->files); + pfree(file->offsets); pfree(file); } -/* BufFileLoadBuffer +/* + * BufFileLoadBuffer * * Load some data into buffer, if possible, starting from curOffset. * At call, must have dirty = false, pos and nbytes = 0. @@ -250,7 +188,6 @@ BufFileClose(BufFile *file) static void BufFileLoadBuffer(BufFile *file) { - LogicalFile *lfile = file->logFile; File thisfile; /* @@ -261,30 +198,33 @@ BufFileLoadBuffer(BufFile *file) * MAX_PHYSICAL_FILESIZE. */ if (file->curOffset >= MAX_PHYSICAL_FILESIZE && - file->curFile+1 < lfile->numFiles) + file->curFile+1 < file->numFiles) { file->curFile++; file->curOffset = 0L; } - thisfile = lfile->files[file->curFile]; /* - * May need to reposition physical file, if more than one BufFile - * is using it. + * May need to reposition physical file. */ - if (file->curOffset != lfile->offsets[file->curFile]) + thisfile = file->files[file->curFile]; + if (file->curOffset != file->offsets[file->curFile]) { if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset) return; /* seek failed, read nothing */ - lfile->offsets[file->curFile] = file->curOffset; + file->offsets[file->curFile] = file->curOffset; } + /* + * Read whatever we can get, up to a full bufferload. + */ file->nbytes = FileRead(thisfile, file->buffer, sizeof(file->buffer)); if (file->nbytes < 0) file->nbytes = 0; - lfile->offsets[file->curFile] += file->nbytes; + file->offsets[file->curFile] += file->nbytes; /* we choose not to advance curOffset here */ } -/* BufFileDumpBuffer +/* + * BufFileDumpBuffer * * Dump buffer contents starting at curOffset. * At call, should have dirty = true, nbytes > 0. @@ -293,7 +233,6 @@ BufFileLoadBuffer(BufFile *file) static void BufFileDumpBuffer(BufFile *file) { - LogicalFile *lfile = file->logFile; int wpos = 0; int bytestowrite; File thisfile; @@ -307,10 +246,10 @@ BufFileDumpBuffer(BufFile *file) /* * Advance to next component file if necessary and possible. */ - if (file->curOffset >= MAX_PHYSICAL_FILESIZE && lfile->isTemp) + if (file->curOffset >= MAX_PHYSICAL_FILESIZE && file->isTemp) { - while (file->curFile+1 >= lfile->numFiles) - extendLogicalFile(lfile); + while (file->curFile+1 >= file->numFiles) + extendBufFile(file); file->curFile++; file->curOffset = 0L; } @@ -319,28 +258,27 @@ BufFileDumpBuffer(BufFile *file) * to write as much as asked... */ bytestowrite = file->nbytes - wpos; - if (lfile->isTemp) + if (file->isTemp) { long availbytes = MAX_PHYSICAL_FILESIZE - file->curOffset; if ((long) bytestowrite > availbytes) bytestowrite = (int) availbytes; } - thisfile = lfile->files[file->curFile]; /* - * May need to reposition physical file, if more than one BufFile - * is using it. + * May need to reposition physical file. */ - if (file->curOffset != lfile->offsets[file->curFile]) + thisfile = file->files[file->curFile]; + if (file->curOffset != file->offsets[file->curFile]) { if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset) return; /* seek failed, give up */ - lfile->offsets[file->curFile] = file->curOffset; + file->offsets[file->curFile] = file->curOffset; } bytestowrite = FileWrite(thisfile, file->buffer, bytestowrite); if (bytestowrite <= 0) return; /* failed to write */ - lfile->offsets[file->curFile] += bytestowrite; + file->offsets[file->curFile] += bytestowrite; file->curOffset += bytestowrite; wpos += bytestowrite; } @@ -363,7 +301,8 @@ BufFileDumpBuffer(BufFile *file) file->nbytes = 0; } -/* BufFileRead +/* + * BufFileRead * * Like fread() except we assume 1-byte element size. */ @@ -409,7 +348,8 @@ BufFileRead(BufFile *file, void *ptr, size_t size) return nread; } -/* BufFileWrite +/* + * BufFileWrite * * Like fwrite() except we assume 1-byte element size. */ @@ -458,7 +398,8 @@ BufFileWrite(BufFile *file, void *ptr, size_t size) return nwritten; } -/* BufFileFlush +/* + * BufFileFlush * * Like fflush() */ @@ -475,9 +416,15 @@ BufFileFlush(BufFile *file) return 0; } -/* BufFileSeek +/* + * BufFileSeek * - * Like fseek(). Result is 0 if OK, EOF if not. + * Like fseek(), except that target position needs two values in order to + * work when logical filesize exceeds maximum value representable by long. + * We do not support relative seeks across more than LONG_MAX, however. + * + * Result is 0 if OK, EOF if not. Logical position is not moved if an + * impossible seek is attempted. */ int BufFileSeek(BufFile *file, int fileno, long offset, int whence) @@ -487,7 +434,7 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence) switch (whence) { case SEEK_SET: - if (fileno < 0 || fileno >= file->logFile->numFiles || + if (fileno < 0 || fileno >= file->numFiles || offset < 0) return EOF; newFile = fileno; @@ -516,11 +463,11 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence) return EOF; newOffset += MAX_PHYSICAL_FILESIZE; } - if (file->logFile->isTemp) + if (file->isTemp) { while (newOffset > MAX_PHYSICAL_FILESIZE) { - if (++newFile >= file->logFile->numFiles) + if (++newFile >= file->numFiles) return EOF; newOffset -= MAX_PHYSICAL_FILESIZE; } @@ -548,9 +495,44 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence) return 0; } -extern void +void BufFileTell(BufFile *file, int *fileno, long *offset) { *fileno = file->curFile; *offset = file->curOffset + file->pos; } + +/* + * BufFileSeekBlock --- block-oriented seek + * + * Performs absolute seek to the start of the n'th BLCKSZ-sized block of + * the file. Note that users of this interface will fail if their files + * exceed BLCKSZ * LONG_MAX bytes, but that is quite a lot; we don't work + * with tables bigger than that, either... + * + * Result is 0 if OK, EOF if not. Logical position is not moved if an + * impossible seek is attempted. + */ +int +BufFileSeekBlock(BufFile *file, long blknum) +{ + return BufFileSeek(file, + (int) (blknum / RELSEG_SIZE), + (blknum % RELSEG_SIZE) * BLCKSZ, + SEEK_SET); +} + +/* + * BufFileTellBlock --- block-oriented tell + * + * Any fractional part of a block in the current seek position is ignored. + */ +long +BufFileTellBlock(BufFile *file) +{ + long blknum; + + blknum = (file->curOffset + file->pos) / BLCKSZ; + blknum += file->curFile * RELSEG_SIZE; + return blknum; +} diff --git a/src/backend/utils/sort/Makefile b/src/backend/utils/sort/Makefile index f2fb18dc6ce3905ea807e20b83cb4c03b3f7b9fd..d411a89c735a3b1e667a67c20a6b6ebedb8fb6ee 100644 --- a/src/backend/utils/sort/Makefile +++ b/src/backend/utils/sort/Makefile @@ -4,7 +4,7 @@ # Makefile for utils/sort # # IDENTIFICATION -# $Header: /cvsroot/pgsql/src/backend/utils/sort/Makefile,v 1.5 1998/04/06 00:27:37 momjian Exp $ +# $Header: /cvsroot/pgsql/src/backend/utils/sort/Makefile,v 1.6 1999/10/16 19:49:27 tgl Exp $ # #------------------------------------------------------------------------- @@ -13,7 +13,7 @@ include ../../../Makefile.global CFLAGS += -I../.. -OBJS = lselect.o psort.o +OBJS = logtape.o lselect.o psort.o all: SUBSYS.o diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c new file mode 100644 index 0000000000000000000000000000000000000000..8d5d34c00a73aec9239d2efd90cc1040b3a14f69 --- /dev/null +++ b/src/backend/utils/sort/logtape.c @@ -0,0 +1,903 @@ +/*------------------------------------------------------------------------- + * + * logtape.c + * Management of "logical tapes" within temporary files. + * + * This module exists to support sorting via multiple merge passes (see + * psort.c). Merging is an ideal algorithm for tape devices, but if we + * implement it on disk by creating a separate file for each "tape", + * there is an annoying problem: the peak space usage is at least twice + * the volume of actual data to be sorted. (This must be so because each + * datum will appear in both the input and output tapes of the final + * merge pass. For seven-tape polyphase merge, which is otherwise a + * pretty good algorithm, peak usage is more like 4x actual data volume.) + * + * We can work around this problem by recognizing that any one tape + * dataset (with the possible exception of the final output) is written + * and read exactly once in a perfectly sequential manner. Therefore, + * a datum once read will not be required again, and we can recycle its + * space for use by the new tape dataset(s) being generated. In this way, + * the total space usage is essentially just the actual data volume, plus + * insignificant bookkeeping and start/stop overhead. + * + * Few OSes allow arbitrary parts of a file to be released back to the OS, + * so we have to implement this space-recycling ourselves within a single + * logical file. logtape.c exists to perform this bookkeeping and provide + * the illusion of N independent tape devices to psort.c. Note that + * logtape.c itself depends on buffile.c to provide a "logical file" of + * larger size than the underlying OS may support. + * + * For simplicity, we allocate and release space in the underlying file + * in BLCKSZ-size blocks. Space allocation boils down to keeping track + * of which blocks in the underlying file belong to which logical tape, + * plus any blocks that are free (recycled and not yet reused). Normally + * there are not very many free blocks, so we just keep those in a list. + * The blocks in each logical tape are remembered using a method borrowed + * from the Unix HFS filesystem: we store data block numbers in an + * "indirect block". If an indirect block fills up, we write it out to + * the underlying file and remember its location in a second-level indirect + * block. In the same way second-level blocks are remembered in third- + * level blocks, and so on if necessary (of course we're talking huge + * amounts of data here). The topmost indirect block of a given logical + * tape is never actually written out to the physical file, but all lower- + * level indirect blocks will be. + * + * The initial write pass is guaranteed to fill the underlying file + * perfectly sequentially, no matter how data is divided into logical tapes. + * Once we begin merge passes, the access pattern becomes considerably + * less predictable --- but the seeking involved should be comparable to + * what would happen if we kept each logical tape in a separate file, + * so there's no serious performance penalty paid to obtain the space + * savings of recycling. We try to localize the write accesses by always + * writing to the lowest-numbered free block when we have a choice; it's + * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO + * policy for free blocks would be better?) + * + * Since all the bookkeeping and buffer memory is allocated with palloc(), + * and the underlying file(s) are made with OpenTemporaryFile, all resources + * for a logical tape set are certain to be cleaned up even if processing + * is aborted by elog(ERROR). To avoid confusion, the caller should take + * care that all calls for a single LogicalTapeSet are made in the same + * palloc context. + * + * Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/utils/sort/logtape.c,v 1.1 1999/10/16 19:49:27 tgl Exp $ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "storage/buffile.h" +#include "utils/logtape.h" + +/* + * Block indexes are "long"s, so we can fit this many per indirect block. + * NB: we assume this is an exact fit! + */ +#define BLOCKS_PER_INDIR_BLOCK (BLCKSZ / sizeof(long)) + +/* + * We use a struct like this for each active indirection level of each + * logical tape. If the indirect block is not the highest level of its + * tape, the "nextup" link points to the next higher level. Only the + * "ptrs" array is written out if we have to dump the indirect block to + * disk. If "ptrs" is not completely full, we store -1L in the first + * unused slot at completion of the write phase for the logical tape. + */ +typedef struct IndirectBlock +{ + int nextSlot; /* next pointer slot to write or read */ + struct IndirectBlock *nextup; /* parent indirect level, or NULL if top */ + long ptrs[BLOCKS_PER_INDIR_BLOCK]; /* indexes of contained blocks */ +} IndirectBlock; + +/* + * This data structure represents a single "logical tape" within the set + * of logical tapes stored in the same file. We must keep track of the + * current partially-read-or-written data block as well as the active + * indirect block level(s). + */ +typedef struct LogicalTape +{ + IndirectBlock *indirect; /* bottom of my indirect-block hierarchy */ + bool writing; /* T while in write phase */ + bool frozen; /* T if blocks should not be freed when read */ + bool dirty; /* does buffer need to be written? */ + /* + * The total data volume in the logical tape is numFullBlocks * BLCKSZ + * + lastBlockBytes. BUT: we do not update lastBlockBytes during writing, + * only at completion of a write phase. + */ + long numFullBlocks; /* number of complete blocks in log tape */ + int lastBlockBytes; /* valid bytes in last (incomplete) block */ + /* + * Buffer for current data block. Note we don't bother to store the + * actual file block number of the data block (during the write phase + * it hasn't been assigned yet, and during read we don't care anymore). + * But we do need the relative block number so we can detect end-of-tape + * while reading. + */ + long curBlockNumber; /* this block's logical blk# within tape */ + int pos; /* next read/write position in buffer */ + int nbytes; /* total # of valid bytes in buffer */ + char buffer[BLCKSZ]; +} LogicalTape; + +/* + * This data structure represents a set of related "logical tapes" sharing + * space in a single underlying file. (But that "file" may be multiple files + * if needed to escape OS limits on file size; buffile.c handles that for us.) + * The number of tapes is fixed at creation. + */ +struct LogicalTapeSet +{ + BufFile *pfile; /* underlying file for whole tape set */ + long nFileBlocks; /* # of blocks used in underlying file */ + /* + * We store the numbers of recycled-and-available blocks in freeBlocks[]. + * When there are no such blocks, we extend the underlying file. Note + * that the block numbers in freeBlocks are always in *decreasing* order, + * so that removing the last entry gives us the lowest free block. + */ + long *freeBlocks; /* resizable array */ + int nFreeBlocks; /* # of currently free blocks */ + int freeBlocksLen; /* current allocated length of freeBlocks[] */ + /* + * tapes[] is declared size 1 since C wants a fixed size, but actually + * it is of length nTapes. + */ + int nTapes; /* # of logical tapes in set */ + LogicalTape *tapes[1]; /* must be last in struct! */ +}; + +static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer); +static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer); +static long ltsGetFreeBlock(LogicalTapeSet *lts); +static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum); +static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect, + long blocknum); +static long ltsRewindIndirectBlock(LogicalTapeSet *lts, + IndirectBlock *indirect, + bool freezing); +static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts, + IndirectBlock *indirect); +static long ltsRecallNextBlockNum(LogicalTapeSet *lts, + IndirectBlock *indirect, + bool frozen); +static long ltsRecallPrevBlockNum(LogicalTapeSet *lts, + IndirectBlock *indirect); +static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt); + + +/* + * Write a block-sized buffer to the specified block of the underlying file. + * + * NB: should not attempt to write beyond current end of file (ie, create + * "holes" in file), since BufFile doesn't allow that. The first write pass + * must write blocks sequentially. + * + * No need for an error return convention; we elog() on any error. + */ +static void +ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer) +{ + if (BufFileSeekBlock(lts->pfile, blocknum) != 0 || + BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ) + elog(ERROR, "ltsWriteBlock: failed to write block %ld of temporary file\n\t\tPerhaps out of disk space?", + blocknum); +} + +/* + * Read a block-sized buffer from the specified block of the underlying file. + * + * No need for an error return convention; we elog() on any error. This + * module should never attempt to read a block it doesn't know is there. + */ +static void +ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer) +{ + if (BufFileSeekBlock(lts->pfile, blocknum) != 0 || + BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ) + elog(ERROR, "ltsReadBlock: failed to read block %ld of temporary file", + blocknum); +} + +/* + * Select a currently unused block for writing to. + * + * NB: should only be called when writer is ready to write immediately, + * to ensure that first write pass is sequential. + */ +static long +ltsGetFreeBlock(LogicalTapeSet *lts) +{ + /* If there are multiple free blocks, we select the one appearing last + * in freeBlocks[]. If there are none, assign the next block at the end + * of the file. + */ + if (lts->nFreeBlocks > 0) + return lts->freeBlocks[--lts->nFreeBlocks]; + else + return lts->nFileBlocks++; +} + +/* + * Return a block# to the freelist. + */ +static void +ltsReleaseBlock(LogicalTapeSet *lts, long blocknum) +{ + int ndx; + long *ptr; + + /* + * Enlarge freeBlocks array if full. + */ + if (lts->nFreeBlocks >= lts->freeBlocksLen) + { + lts->freeBlocksLen *= 2; + lts->freeBlocks = (long *) repalloc(lts->freeBlocks, + lts->freeBlocksLen * sizeof(long)); + } + /* + * Insert blocknum into array, preserving decreasing order (so that + * ltsGetFreeBlock returns the lowest available block number). + * This could get fairly slow if there were many free blocks, but + * we don't expect there to be very many at one time. + */ + ndx = lts->nFreeBlocks++; + ptr = lts->freeBlocks + ndx; + while (ndx > 0 && ptr[-1] < blocknum) + { + ptr[0] = ptr[-1]; + ndx--, ptr--; + } + ptr[0] = blocknum; +} + +/* + * These routines manipulate indirect-block hierarchies. All are recursive + * so that they don't have any specific limit on the depth of hierarchy. + */ + +/* + * Record a data block number in a logical tape's lowest indirect block, + * or record an indirect block's number in the next higher indirect level. + */ +static void +ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect, + long blocknum) +{ + if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK) + { + /* + * This indirect block is full, so dump it out and recursively + * save its address in the next indirection level. Create a + * new indirection level if there wasn't one before. + */ + long indirblock = ltsGetFreeBlock(lts); + + ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs); + if (indirect->nextup == NULL) + { + indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock)); + indirect->nextup->nextSlot = 0; + indirect->nextup->nextup = NULL; + } + ltsRecordBlockNum(lts, indirect->nextup, indirblock); + /* + * Reset to fill another indirect block at this level. + */ + indirect->nextSlot = 0; + } + indirect->ptrs[indirect->nextSlot++] = blocknum; +} + +/* + * Reset a logical tape's indirect-block hierarchy after a write pass + * to prepare for reading. We dump out partly-filled blocks except + * at the top of the hierarchy, and we rewind each level to the start. + * This call returns the first data block number, or -1L if the tape + * is empty. + * + * Unless 'freezing' is true, release indirect blocks to the free pool after + * reading them. + */ +static long +ltsRewindIndirectBlock(LogicalTapeSet *lts, + IndirectBlock *indirect, + bool freezing) +{ + /* Insert sentinel if block is not full */ + if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK) + indirect->ptrs[indirect->nextSlot] = -1L; + /* + * If block is not topmost, write it out, and recurse to obtain + * address of first block in this hierarchy level. Read that one in. + */ + if (indirect->nextup != NULL) + { + long indirblock = ltsGetFreeBlock(lts); + + ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs); + ltsRecordBlockNum(lts, indirect->nextup, indirblock); + indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing); + Assert(indirblock != -1L); + ltsReadBlock(lts, indirblock, (void *) indirect->ptrs); + if (! freezing) + ltsReleaseBlock(lts, indirblock); + } + /* + * Reset my next-block pointer, and then fetch a block number if any. + */ + indirect->nextSlot = 0; + if (indirect->ptrs[0] == -1L) + return -1L; + return indirect->ptrs[indirect->nextSlot++]; +} + +/* + * Rewind a previously-frozen indirect-block hierarchy for another read pass. + * This call returns the first data block number, or -1L if the tape + * is empty. + */ +static long +ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts, + IndirectBlock *indirect) +{ + /* + * If block is not topmost, recurse to obtain + * address of first block in this hierarchy level. Read that one in. + */ + if (indirect->nextup != NULL) + { + long indirblock; + + indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup); + Assert(indirblock != -1L); + ltsReadBlock(lts, indirblock, (void *) indirect->ptrs); + } + /* + * Reset my next-block pointer, and then fetch a block number if any. + */ + indirect->nextSlot = 0; + if (indirect->ptrs[0] == -1L) + return -1L; + return indirect->ptrs[indirect->nextSlot++]; +} + +/* + * Obtain next data block number in the forward direction, or -1L if no more. + * + * Unless 'frozen' is true, release indirect blocks to the free pool after + * reading them. + */ +static long +ltsRecallNextBlockNum(LogicalTapeSet *lts, + IndirectBlock *indirect, + bool frozen) +{ + if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK || + indirect->ptrs[indirect->nextSlot] == -1L) + { + long indirblock; + + if (indirect->nextup == NULL) + return -1L; /* nothing left at this level */ + indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen); + if (indirblock == -1L) + return -1L; /* nothing left at this level */ + ltsReadBlock(lts, indirblock, (void *) indirect->ptrs); + if (! frozen) + ltsReleaseBlock(lts, indirblock); + indirect->nextSlot = 0; + } + if (indirect->ptrs[indirect->nextSlot] == -1L) + return -1L; + return indirect->ptrs[indirect->nextSlot++]; +} + +/* + * Obtain next data block number in the reverse direction, or -1L if no more. + * + * Note this fetches the block# before the one last returned, no matter which + * direction of call returned that one. If we fail, no change in state. + * + * This routine can only be used in 'frozen' state, so there's no need to + * pass a parameter telling whether to release blocks ... we never do. + */ +static long +ltsRecallPrevBlockNum(LogicalTapeSet *lts, + IndirectBlock *indirect) +{ + if (indirect->nextSlot <= 1) + { + long indirblock; + + if (indirect->nextup == NULL) + return -1L; /* nothing left at this level */ + indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup); + if (indirblock == -1L) + return -1L; /* nothing left at this level */ + ltsReadBlock(lts, indirblock, (void *) indirect->ptrs); + /* The previous block would only have been written out if full, + * so we need not search it for a -1 sentinel. + */ + indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK+1; + } + indirect->nextSlot--; + return indirect->ptrs[indirect->nextSlot-1]; +} + + +/* + * Create a set of logical tapes in a temporary underlying file. + * + * Each tape is initialized in write state. + */ +LogicalTapeSet * +LogicalTapeSetCreate(int ntapes) +{ + LogicalTapeSet *lts; + LogicalTape *lt; + int i; + + /* + * Create top-level struct. First LogicalTape pointer is already + * counted in sizeof(LogicalTapeSet). + */ + Assert(ntapes > 0); + lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet) + + (ntapes-1) * sizeof(LogicalTape *)); + lts->pfile = BufFileCreateTemp(); + lts->nFileBlocks = 0L; + lts->freeBlocksLen = 32; /* reasonable initial guess */ + lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long)); + lts->nFreeBlocks = 0; + lts->nTapes = ntapes; + /* + * Create per-tape structs, including first-level indirect blocks. + */ + for (i = 0; i < ntapes; i++) + { + lt = (LogicalTape *) palloc(sizeof(LogicalTape)); + lts->tapes[i] = lt; + lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock)); + lt->indirect->nextSlot = 0; + lt->indirect->nextup = NULL; + lt->writing = true; + lt->frozen = false; + lt->dirty = false; + lt->numFullBlocks = 0L; + lt->lastBlockBytes = 0; + lt->curBlockNumber = 0L; + lt->pos = 0; + lt->nbytes = 0; + } + return lts; +} + +/* + * Close a logical tape set and release all resources. + */ +void LogicalTapeSetClose(LogicalTapeSet *lts) +{ + LogicalTape *lt; + IndirectBlock *ib, + *nextib; + int i; + + BufFileClose(lts->pfile); + for (i = 0; i < lts->nTapes; i++) + { + lt = lts->tapes[i]; + for (ib = lt->indirect; ib != NULL; ib = nextib) + { + nextib = ib->nextup; + pfree(ib); + } + pfree(lt); + } + pfree(lts->freeBlocks); + pfree(lts); +} + +/* + * Dump the dirty buffer of a logical tape. + */ +static void +ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt) +{ + long datablock = ltsGetFreeBlock(lts); + + Assert(lt->dirty); + ltsWriteBlock(lts, datablock, (void *) lt->buffer); + ltsRecordBlockNum(lts, lt->indirect, datablock); + lt->dirty = false; + /* Caller must do other state update as needed */ +} + +/* + * Write to a logical tape. + * + * There are no error returns; we elog() on failure. + */ +void +LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, + void *ptr, size_t size) +{ + LogicalTape *lt; + size_t nthistime; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = lts->tapes[tapenum]; + Assert(lt->writing); + + while (size > 0) + { + if (lt->pos >= BLCKSZ) + { + /* Buffer full, dump it out */ + if (lt->dirty) + { + ltsDumpBuffer(lts, lt); + } + else + { + /* Hmm, went directly from reading to writing? */ + elog(ERROR, "LogicalTapeWrite: impossible state"); + } + lt->numFullBlocks++; + lt->curBlockNumber++; + lt->pos = 0; + lt->nbytes = 0; + } + + nthistime = BLCKSZ - lt->pos; + if (nthistime > size) + nthistime = size; + Assert(nthistime > 0); + + memcpy(lt->buffer + lt->pos, ptr, nthistime); + + lt->dirty = true; + lt->pos += nthistime; + if (lt->nbytes < lt->pos) + lt->nbytes = lt->pos; + ptr = (void *) ((char *) ptr + nthistime); + size -= nthistime; + } +} + +/* + * Rewind logical tape and switch from writing to reading or vice versa. + * + * Unless the tape has been "frozen" in read state, forWrite must be the + * opposite of the previous tape state. + */ +void +LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite) +{ + LogicalTape *lt; + long datablocknum; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = lts->tapes[tapenum]; + + if (! forWrite) + { + if (lt->writing) + { + /* + * Completion of a write phase. Flush last partial data + * block, flush any partial indirect blocks, rewind for + * normal (destructive) read. + */ + if (lt->dirty) + ltsDumpBuffer(lts, lt); + lt->lastBlockBytes = lt->nbytes; + lt->writing = false; + datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false); + } + else + { + /* + * This is only OK if tape is frozen; we rewind for (another) + * read pass. + */ + Assert(lt->frozen); + datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect); + } + /* Read the first block, or reset if tape is empty */ + lt->curBlockNumber = 0L; + lt->pos = 0; + lt->nbytes = 0; + if (datablocknum != -1L) + { + ltsReadBlock(lts, datablocknum, (void *) lt->buffer); + if (! lt->frozen) + ltsReleaseBlock(lts, datablocknum); + lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ? + BLCKSZ : lt->lastBlockBytes; + } + } + else + { + /* + * Completion of a read phase. Rewind and prepare for write. + * + * NOTE: we assume the caller has read the tape to the end; + * otherwise untouched data and indirect blocks will not have + * been freed. We could add more code to free any unread blocks, + * but in current usage of this module it'd be useless code. + */ + IndirectBlock *ib, + *nextib; + + Assert(! lt->writing && ! lt->frozen); + /* Must truncate the indirect-block hierarchy down to one level. */ + for (ib = lt->indirect->nextup; ib != NULL; ib = nextib) + { + nextib = ib->nextup; + pfree(ib); + } + lt->indirect->nextSlot = 0; + lt->indirect->nextup = NULL; + lt->writing = true; + lt->dirty = false; + lt->numFullBlocks = 0L; + lt->lastBlockBytes = 0; + lt->curBlockNumber = 0L; + lt->pos = 0; + lt->nbytes = 0; + } +} + +/* + * Read from a logical tape. + * + * Early EOF is indicated by return value less than #bytes requested. + */ +size_t +LogicalTapeRead(LogicalTapeSet *lts, int tapenum, + void *ptr, size_t size) +{ + LogicalTape *lt; + size_t nread = 0; + size_t nthistime; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = lts->tapes[tapenum]; + Assert(! lt->writing); + + while (size > 0) + { + if (lt->pos >= lt->nbytes) + { + /* Try to load more data into buffer. */ + long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, + lt->frozen); + + if (datablocknum == -1L) + break; /* EOF */ + lt->curBlockNumber++; + lt->pos = 0; + ltsReadBlock(lts, datablocknum, (void *) lt->buffer); + if (! lt->frozen) + ltsReleaseBlock(lts, datablocknum); + lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ? + BLCKSZ : lt->lastBlockBytes; + if (lt->nbytes <= 0) + break; /* EOF (possible here?) */ + } + + nthistime = lt->nbytes - lt->pos; + if (nthistime > size) + nthistime = size; + Assert(nthistime > 0); + + memcpy(ptr, lt->buffer + lt->pos, nthistime); + + lt->pos += nthistime; + ptr = (void *) ((char *) ptr + nthistime); + size -= nthistime; + nread += nthistime; + } + + return nread; +} + +/* + * "Freeze" the contents of a tape so that it can be read multiple times + * and/or read backwards. Once a tape is frozen, its contents will not + * be released until the LogicalTapeSet is destroyed. This is expected + * to be used only for the final output pass of a merge. + * + * This *must* be called just at the end of a write pass, before the + * tape is rewound (after rewind is too late!). It performs a rewind + * and switch to read mode "for free". An immediately following rewind- + * for-read call is OK but not necessary. + */ +void +LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum) +{ + LogicalTape *lt; + long datablocknum; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = lts->tapes[tapenum]; + Assert(lt->writing); + + /* + * Completion of a write phase. Flush last partial data + * block, flush any partial indirect blocks, rewind for + * nondestructive read. + */ + if (lt->dirty) + ltsDumpBuffer(lts, lt); + lt->lastBlockBytes = lt->nbytes; + lt->writing = false; + lt->frozen = true; + datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true); + /* Read the first block, or reset if tape is empty */ + lt->curBlockNumber = 0L; + lt->pos = 0; + lt->nbytes = 0; + if (datablocknum != -1L) + { + ltsReadBlock(lts, datablocknum, (void *) lt->buffer); + lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ? + BLCKSZ : lt->lastBlockBytes; + } +} + +/* + * Backspace the tape a given number of bytes. (We also support a more + * general seek interface, see below.) + * + * *Only* a frozen-for-read tape can be backed up; we don't support + * random access during write, and an unfrozen read tape may have + * already discarded the desired data! + * + * Return value is TRUE if seek successful, FALSE if there isn't that much + * data before the current point (in which case there's no state change). + */ +bool +LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size) +{ + LogicalTape *lt; + long nblocks; + int newpos; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = lts->tapes[tapenum]; + Assert(lt->frozen); + + /* + * Easy case for seek within current block. + */ + if (size <= (size_t) lt->pos) + { + lt->pos -= (int) size; + return true; + } + /* + * Not-so-easy case. Figure out whether it's possible at all. + */ + size -= (size_t) lt->pos; /* part within this block */ + nblocks = size / BLCKSZ; + size = size % BLCKSZ; + if (size) + { + nblocks++; + newpos = (int) (BLCKSZ - size); + } + else + newpos = 0; + if (nblocks > lt->curBlockNumber) + return false; /* a seek too far... */ + /* + * OK, we need to back up nblocks blocks. This implementation + * would be pretty inefficient for long seeks, but we really + * aren't expecting that (a seek over one tuple is typical). + */ + while (nblocks-- > 0) + { + long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect); + + if (datablocknum == -1L) + elog(ERROR, "LogicalTapeBackspace: unexpected end of tape"); + lt->curBlockNumber--; + if (nblocks == 0) + { + ltsReadBlock(lts, datablocknum, (void *) lt->buffer); + lt->nbytes = BLCKSZ; + } + } + lt->pos = newpos; + return true; +} + +/* + * Seek to an arbitrary position in a logical tape. + * + * *Only* a frozen-for-read tape can be seeked. + * + * Return value is TRUE if seek successful, FALSE if there isn't that much + * data in the tape (in which case there's no state change). + */ +bool +LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, + long blocknum, int offset) +{ + LogicalTape *lt; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = lts->tapes[tapenum]; + Assert(lt->frozen); + Assert(offset >= 0 && offset <= BLCKSZ); + + /* + * Easy case for seek within current block. + */ + if (blocknum == lt->curBlockNumber && offset <= lt->nbytes) + { + lt->pos = offset; + return true; + } + /* + * Not-so-easy case. Figure out whether it's possible at all. + */ + if (blocknum < 0 || blocknum > lt->numFullBlocks || + (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes)) + return false; + /* + * OK, advance or back up to the target block. This implementation + * would be pretty inefficient for long seeks, but we really + * aren't expecting that (a seek over one tuple is typical). + */ + while (lt->curBlockNumber > blocknum) + { + long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect); + + if (datablocknum == -1L) + elog(ERROR, "LogicalTapeSeek: unexpected end of tape"); + if (--lt->curBlockNumber == blocknum) + ltsReadBlock(lts, datablocknum, (void *) lt->buffer); + } + while (lt->curBlockNumber < blocknum) + { + long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect, + lt->frozen); + + if (datablocknum == -1L) + elog(ERROR, "LogicalTapeSeek: unexpected end of tape"); + if (++lt->curBlockNumber == blocknum) + ltsReadBlock(lts, datablocknum, (void *) lt->buffer); + } + lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ? + BLCKSZ : lt->lastBlockBytes; + lt->pos = offset; + return true; +} + +/* + * Obtain current position in a form suitable for a later LogicalTapeSeek. + * + * NOTE: it'd be OK to do this during write phase with intention of using + * the position for a seek after freezing. Not clear if anyone needs that. + */ +void +LogicalTapeTell(LogicalTapeSet *lts, int tapenum, + long *blocknum, int *offset) +{ + LogicalTape *lt; + + Assert(tapenum >= 0 && tapenum < lts->nTapes); + lt = lts->tapes[tapenum]; + *blocknum = lt->curBlockNumber; + *offset = lt->pos; +} diff --git a/src/backend/utils/sort/psort.c b/src/backend/utils/sort/psort.c index 14db10c119837607cdce1bfa043faf793697bbb9..67cdfc292e873c992f58563b0dad1d7c7bc2e37d 100644 --- a/src/backend/utils/sort/psort.c +++ b/src/backend/utils/sort/psort.c @@ -1,136 +1,146 @@ -/* +/*------------------------------------------------------------------------- + * * psort.c * Polyphase merge sort. * - * Copyright (c) 1994, Regents of the University of California - * - * $Id: psort.c,v 1.57 1999/10/13 15:02:31 tgl Exp $ + * See Knuth, volume 3, for more than you want to know about this algorithm. * * NOTES - * Sorts the first relation into the second relation. * - * The old psort.c's routines formed a temporary relation from the merged - * sort files. This version keeps the files around instead of generating the - * relation from them, and provides interface functions to the file so that - * you can grab tuples, mark a position in the file, restore a position in the - * file. You must now explicitly call an interface function to end the sort, - * psort_end, when you are done. - * Now most of the global variables are stuck in the Sort nodes, and - * accessed from there (they are passed to all the psort routines) so that - * each sort running has its own separate state. This is facilitated by having - * the Sort nodes passed in to all the interface functions. - * The one global variable that all the sorts still share is SortMemory. - * You should now be allowed to run two or more psorts concurrently, - * so long as the memory they eat up is not greater than SORTMEM, the initial - * value of SortMemory. -Rex 2.15.1995 + * This needs to be generalized to handle index tuples as well as heap tuples, + * so that the near-duplicate code in nbtsort.c can be eliminated. Also, + * I think it's got memory leak problems. * - * Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future. + * Copyright (c) 1994, Regents of the University of California * - * Arguments? Variables? - * MAXMERGE, MAXTAPES + * IDENTIFICATION + * $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.58 1999/10/16 19:49:27 tgl Exp $ * + *------------------------------------------------------------------------- */ + #include <math.h> -#include <sys/types.h> -#include <unistd.h> #include "postgres.h" #include "access/heapam.h" +#include "access/relscan.h" #include "executor/execdebug.h" #include "executor/executor.h" #include "miscadmin.h" +#include "utils/logtape.h" +#include "utils/lselect.h" #include "utils/psort.h" +#define MAXTAPES 7 /* See Knuth Fig. 70, p273 */ + +struct tape +{ + int tp_dummy; /* (D) */ + int tp_fib; /* (A) */ + int tp_tapenum; /* (TAPE) */ + struct tape *tp_prev; +}; + +/* + * Private state of a Psort operation. The "psortstate" field in a Sort node + * points to one of these. This replaces a lot of global variables that used + * to be here... + */ +typedef struct Psortstate +{ + LeftistContextData treeContext; + + int TapeRange; /* number of tapes less 1 (T) */ + int Level; /* Knuth's l */ + int TotalDummy; /* sum of tp_dummy across all tapes */ + struct tape Tape[MAXTAPES]; + + LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ + + int BytesRead; /* I/O statistics (useless) */ + int BytesWritten; + int tupcount; + + struct leftist *Tuples; /* current tuple tree */ + + int psort_grab_tape; /* tape number of finished output data */ + long psort_current; /* array index (only used if not tape) */ + /* psort_saved(_offset) holds marked position for mark and restore */ + long psort_saved; /* could be tape block#, or array index */ + int psort_saved_offset; /* lower bits of psort_saved, if tape */ + bool using_tape_files; + bool all_fetched; /* this is for cursors */ + + HeapTuple *memtuples; +} Psortstate; + +/* + * PS - Macro to access and cast psortstate from a Sort node + */ +#define PS(N) ((Psortstate *)(N)->psortstate) + static bool createfirstrun(Sort *node); -static bool createrun(Sort *node, BufFile *file); -static void destroytape(BufFile *file); -static void dumptuples(BufFile *file, Sort *node); -static BufFile *gettape(void); +static bool createrun(Sort *node, int desttapenum); +static void dumptuples(Sort *node, int desttapenum); static void initialrun(Sort *node); static void inittapes(Sort *node); static void merge(Sort *node, struct tape * dest); -static BufFile *mergeruns(Sort *node); +static int mergeruns(Sort *node); static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup); - -/* - * tlenzero used to delimit runs; both vars below must have - * the same size as HeapTuple->t_len - */ -static unsigned int tlenzero = 0; -static unsigned int tlendummy; - /* these are used by _psort_cmp, and are set just before calling qsort() */ static TupleDesc PsortTupDesc; static ScanKey PsortKeys; static int PsortNkeys; /* - * old psort global variables - * - * (These are the global variables from the old psort. They are still used, - * but are now accessed from Sort nodes using the PS macro. Note that while - * these variables will be accessed by PS(node)->whatever, they will still - * be called by their original names within the comments! -Rex 2.10.1995) + * tlenzero is used to write a zero to delimit runs, tlendummy is used + * to read in length words that we don't care about. * - * LeftistContextData treeContext; - * - * static int TapeRange; number of tapes - 1 (T) - * static int Level; (l) - * static int TotalDummy; summation of tp_dummy - * static struct tape *Tape; - * - * static int BytesRead; to keep track of # of IO - * static int BytesWritten; - * - * struct leftist *Tuples; current tuples in memory - * - * BufFile *psort_grab_file; this holds tuples grabbed - * from merged sort runs - * long psort_current; current file position - * long psort_saved; file position saved for - * mark and restore + * both vars must have the same size as HeapTuple->t_len */ +static unsigned int tlenzero = 0; +static unsigned int tlendummy; -/* - * PS - Macro to access and cast psortstate from a Sort node - */ -#define PS(N) ((Psortstate *)N->psortstate) /* - * psort_begin - polyphase merge sort entry point. Sorts the subplan - * into a temporary file psort_grab_file. After - * this is called, calling the interface function - * psort_grabtuple iteratively will get you the sorted - * tuples. psort_end then finishes the sort off, after - * all the tuples have been grabbed. + * psort_begin * - * Allocates and initializes sort node's psort state. + * polyphase merge sort entry point. Sorts the subplan + * into memory or a temporary file. After + * this is called, calling the interface function + * psort_grabtuple iteratively will get you the sorted + * tuples. psort_end releases storage when done. + * + * Allocates and initializes sort node's psort state. */ bool psort_begin(Sort *node, int nkeys, ScanKey key) { - - node->psortstate = (struct Psortstate *) palloc(sizeof(struct Psortstate)); - AssertArg(nkeys >= 1); AssertArg(key[0].sk_attno != 0); AssertArg(key[0].sk_procedure != 0); - PS(node)->BytesRead = 0; - PS(node)->BytesWritten = 0; + node->psortstate = (void *) palloc(sizeof(struct Psortstate)); + PS(node)->treeContext.tupDesc = ExecGetTupType(outerPlan((Plan *) node)); PS(node)->treeContext.nKeys = nkeys; PS(node)->treeContext.scanKeys = key; PS(node)->treeContext.sortMem = SortMem * 1024; - PS(node)->Tuples = NULL; + PS(node)->tapeset = NULL; + + PS(node)->BytesRead = 0; + PS(node)->BytesWritten = 0; PS(node)->tupcount = 0; + PS(node)->Tuples = NULL; + PS(node)->using_tape_files = false; PS(node)->all_fetched = false; - PS(node)->psort_grab_file = NULL; + PS(node)->psort_grab_tape = -1; + PS(node)->memtuples = NULL; initialrun(node); @@ -138,12 +148,12 @@ psort_begin(Sort *node, int nkeys, ScanKey key) if (PS(node)->tupcount == 0) return false; - if (PS(node)->using_tape_files && PS(node)->psort_grab_file == NULL) - PS(node)->psort_grab_file = mergeruns(node); + if (PS(node)->using_tape_files && PS(node)->psort_grab_tape == -1) + PS(node)->psort_grab_tape = mergeruns(node); - PS(node)->psort_current = 0; - PS(node)->psort_saved_fileno = 0; + PS(node)->psort_current = 0L; PS(node)->psort_saved = 0L; + PS(node)->psort_saved_offset = 0; return true; } @@ -151,8 +161,8 @@ psort_begin(Sort *node, int nkeys, ScanKey key) /* * inittapes - initializes the tapes * - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270) - * Returns: - * number of allocated tapes + * + * This is called only if we have found we don't have room to sort in memory. */ static void inittapes(Sort *node) @@ -163,16 +173,14 @@ inittapes(Sort *node) Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); - /* - * ASSERT(ntapes >= 3 && ntapes <= MAXTAPES, "inittapes: Invalid - * number of tapes to initialize.\n"); - */ + PS(node)->tapeset = LogicalTapeSetCreate(MAXTAPES); tp = PS(node)->Tape; - for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++) + for (i = 0; i < MAXTAPES; i++) { tp->tp_dummy = 1; tp->tp_fib = 1; + tp->tp_tapenum = i; tp->tp_prev = tp - 1; tp++; } @@ -181,10 +189,6 @@ inittapes(Sort *node) tp->tp_fib = 0; PS(node)->Tape[0].tp_prev = tp; - if (PS(node)->TapeRange <= 1) - elog(ERROR, "inittapes: Could only allocate %d < 3 tapes\n", - PS(node)->TapeRange + 1); - PS(node)->Level = 1; PS(node)->TotalDummy = PS(node)->TapeRange; @@ -194,9 +198,9 @@ inittapes(Sort *node) /* * PUTTUP - writes the next tuple * ENDRUN - mark end of run - * GETLEN - reads the length of the next tuple + * TRYGETLEN - reads the length of the next tuple, if any + * GETLEN - reads the length of the next tuple, must be one * ALLOCTUP - returns space for the new tuple - * SETTUPLEN - stores the length into the tuple * GETTUP - reads the tuple * * Note: @@ -204,31 +208,47 @@ inittapes(Sort *node) */ -#define PUTTUP(NODE, TUP, FP) \ +#define PUTTUP(NODE, TUP, TAPE) \ ( \ (TUP)->t_len += HEAPTUPLESIZE, \ - ((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, \ - BufFileWrite(FP, (char *)TUP, (TUP)->t_len), \ - BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)), \ + PS(NODE)->BytesWritten += (TUP)->t_len, \ + LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void*)(TUP), (TUP)->t_len), \ + LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void*)&((TUP)->t_len), sizeof(tlendummy)), \ (TUP)->t_len -= HEAPTUPLESIZE \ ) -#define ENDRUN(FP) BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero)) -#define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero)) -#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN)) -#define FREE(x) pfree((char *) x) -#define GETTUP(NODE, TUP, LEN, FP) \ -( \ - IncrProcessed(), \ - ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero), \ - BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)), \ - (TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), \ - BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy)) \ -) +#define ENDRUN(NODE, TAPE) \ + LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void *)&tlenzero, sizeof(tlenzero)) + +#define TRYGETLEN(NODE, LEN, TAPE) \ + (LogicalTapeRead(PS(NODE)->tapeset, (TAPE), \ + (void *) &(LEN), sizeof(tlenzero)) == sizeof(tlenzero) \ + && (LEN) != 0) -#define SETTUPLEN(TUP, LEN) ((TUP)->t_len = (LEN) - HEAPTUPLESIZE) +#define GETLEN(NODE, LEN, TAPE) \ + do { \ + if (! TRYGETLEN(NODE, LEN, TAPE)) \ + elog(ERROR, "psort: unexpected end of data"); \ + } while(0) -#define rewind(FP) BufFileSeek(FP, 0, 0L, SEEK_SET) +static void GETTUP(Sort *node, HeapTuple tup, unsigned int len, int tape) +{ + IncrProcessed(); + PS(node)->BytesRead += len; + if (LogicalTapeRead(PS(node)->tapeset, tape, + ((char *) tup) + sizeof(tlenzero), + len - sizeof(tlenzero)) != len - sizeof(tlenzero)) + elog(ERROR, "psort: unexpected end of data"); + tup->t_len = len - HEAPTUPLESIZE; + tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE); + if (LogicalTapeRead(PS(node)->tapeset, tape, + (void *) &tlendummy, + sizeof(tlendummy)) != sizeof(tlendummy)) + elog(ERROR, "psort: unexpected end of data"); +} + +#define ALLOCTUP(LEN) ((HeapTuple) palloc(LEN)) +#define FREE(x) pfree((char *) (x)) /* * USEMEM - record use of memory FREEMEM - record @@ -268,10 +288,10 @@ inittapes(Sort *node) static void initialrun(Sort *node) { - /* struct tuple *tup; */ struct tape *tp; int baseruns; /* D:(a) */ int extrapasses; /* EOF */ + int tapenum; Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); @@ -284,8 +304,8 @@ initialrun(Sort *node) extrapasses = 0; } else -/* all tuples fetched */ { + /* all tuples fetched */ if (!PS(node)->using_tape_files) /* empty or sorted in * memory */ return; @@ -297,8 +317,9 @@ initialrun(Sort *node) */ if (PS(node)->Tuples == NULL) { - PS(node)->psort_grab_file = PS(node)->Tape->tp_file; - rewind(PS(node)->psort_grab_file); + PS(node)->psort_grab_tape = PS(node)->Tape[0].tp_tapenum; + /* freeze and rewind the finished output tape */ + LogicalTapeFreeze(PS(node)->tapeset, PS(node)->psort_grab_tape); return; } extrapasses = 2; @@ -334,19 +355,20 @@ initialrun(Sort *node) { if (--extrapasses) { - dumptuples(tp->tp_file, node); - ENDRUN(tp->tp_file); + dumptuples(node, tp->tp_tapenum); + ENDRUN(node, tp->tp_tapenum); continue; } else break; } - if ((bool) createrun(node, tp->tp_file) == false) + if (createrun(node, tp->tp_tapenum) == false) extrapasses = 1 + (PS(node)->Tuples != NULL); /* D2 */ } - for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--) - rewind(tp->tp_file); /* D. */ + /* End of step D2: rewind all output tapes to prepare for merging */ + for (tapenum = 0; tapenum < PS(node)->TapeRange; tapenum++) + LogicalTapeRewind(PS(node)->tapeset, tapenum, false); } /* @@ -374,7 +396,7 @@ createfirstrun(Sort *node) Assert(PS(node)->memtuples == NULL); Assert(PS(node)->tupcount == 0); if (LACKMEM(node)) - elog(ERROR, "psort: LACKMEM in createfirstrun"); + elog(ERROR, "psort: LACKMEM before createfirstrun"); memtuples = palloc(t_free * sizeof(HeapTuple)); @@ -439,7 +461,7 @@ createfirstrun(Sort *node) for (t = t_last - 1; t >= 0; t--) puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext); pfree(memtuples); - foundeor = !createrun(node, PS(node)->Tape->tp_file); + foundeor = ! createrun(node, PS(node)->Tape->tp_tapenum); } else { @@ -451,8 +473,10 @@ createfirstrun(Sort *node) } /* - * createrun - places the next run on file, grabbing the tuples by - * executing the subplan passed in + * createrun + * + * Create the next run and write it to desttapenum, grabbing the tuples by + * executing the subplan passed in * * Uses: * Tuples, which should contain any tuples for this run @@ -462,7 +486,7 @@ createfirstrun(Sort *node) * Tuples contains the tuples for the following run upon exit */ static bool -createrun(Sort *node, BufFile *file) +createrun(Sort *node, int desttapenum) { HeapTuple lasttuple; HeapTuple tup; @@ -492,7 +516,7 @@ createrun(Sort *node, BufFile *file) } lasttuple = gettuple(&PS(node)->Tuples, &junk, &PS(node)->treeContext); - PUTTUP(node, lasttuple, file); + PUTTUP(node, lasttuple, desttapenum); TRACEOUT(createrun, lasttuple); } @@ -545,8 +569,8 @@ createrun(Sort *node, BufFile *file) FREE(lasttuple); TRACEMEM(createrun); } - dumptuples(file, node); - ENDRUN(file); /* delimit the end of the run */ + dumptuples(node, desttapenum); + ENDRUN(node, desttapenum); /* delimit the end of the run */ t_last++; /* put tuples for the next run into leftist tree */ @@ -573,28 +597,31 @@ createrun(Sort *node, BufFile *file) * (polyphase merge Alg.D(D6)--Knuth, Vol.3, p271) * * Returns: - * file of tuples in order + * tape number of finished tape containing all tuples in order */ -static BufFile * +static int mergeruns(Sort *node) { struct tape *tp; Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); - Assert(PS(node)->using_tape_files == true); + Assert(PS(node)->using_tape_files); tp = PS(node)->Tape + PS(node)->TapeRange; merge(node, tp); - rewind(tp->tp_file); while (--PS(node)->Level != 0) { + /* rewind output tape to use as new input */ + LogicalTapeRewind(PS(node)->tapeset, tp->tp_tapenum, false); tp = tp->tp_prev; - rewind(tp->tp_file); + /* rewind new output tape and prepare it for write pass */ + LogicalTapeRewind(PS(node)->tapeset, tp->tp_tapenum, true); merge(node, tp); - rewind(tp->tp_file); } - return tp->tp_file; + /* freeze and rewind the final output tape */ + LogicalTapeFreeze(PS(node)->tapeset, tp->tp_tapenum); + return tp->tp_tapenum; } /* @@ -608,7 +635,7 @@ merge(Sort *node, struct tape * dest) struct tape *lasttp; /* (TAPE[P]) */ struct tape *tp; struct leftist *tuples; - BufFile *destfile; + int desttapenum; int times; /* runs left to merge */ int outdummy; /* complete dummy runs */ short fromtape; @@ -616,7 +643,7 @@ merge(Sort *node, struct tape * dest) Assert(node != (Sort *) NULL); Assert(PS(node) != (Psortstate *) NULL); - Assert(PS(node)->using_tape_files == true); + Assert(PS(node)->using_tape_files); lasttp = dest->tp_prev; times = lasttp->tp_fib; @@ -641,19 +668,18 @@ merge(Sort *node, struct tape * dest) /* do not add the outdummy runs yet */ times -= outdummy; } - destfile = dest->tp_file; + desttapenum = dest->tp_tapenum; while (times-- != 0) { /* merge one run */ tuples = NULL; if (PS(node)->TotalDummy == 0) for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev) { - GETLEN(tuplen, tp->tp_file); + GETLEN(node, tuplen, tp->tp_tapenum); tup = ALLOCTUP(tuplen); USEMEM(node, tuplen); TRACEMEM(merge); - SETTUPLEN(tup, tuplen); - GETTUP(node, tup, tuplen, tp->tp_file); + GETTUP(node, tup, tuplen, tp->tp_tapenum); puttuple(&tuples, tup, tp - PS(node)->Tape, &PS(node)->treeContext); } @@ -668,12 +694,11 @@ merge(Sort *node, struct tape * dest) } else { - GETLEN(tuplen, tp->tp_file); + GETLEN(node, tuplen, tp->tp_tapenum); tup = ALLOCTUP(tuplen); USEMEM(node, tuplen); TRACEMEM(merge); - SETTUPLEN(tup, tuplen); - GETTUP(node, tup, tuplen, tp->tp_file); + GETTUP(node, tup, tuplen, tp->tp_tapenum); puttuple(&tuples, tup, tp - PS(node)->Tape, &PS(node)->treeContext); } @@ -683,38 +708,34 @@ merge(Sort *node, struct tape * dest) { /* possible optimization by using count in tuples */ tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext); - PUTTUP(node, tup, destfile); + PUTTUP(node, tup, desttapenum); FREEMEM(node, tup->t_len); FREE(tup); TRACEMEM(merge); - GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file); - if (tuplen == 0) - ; - else + if (TRYGETLEN(node, tuplen, PS(node)->Tape[fromtape].tp_tapenum)) { tup = ALLOCTUP(tuplen); USEMEM(node, tuplen); TRACEMEM(merge); - SETTUPLEN(tup, tuplen); - GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file); + GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_tapenum); puttuple(&tuples, tup, fromtape, &PS(node)->treeContext); } } - ENDRUN(destfile); + ENDRUN(node, desttapenum); } PS(node)->TotalDummy += outdummy; } /* - * dumptuples - stores all the tuples in tree into file + * dumptuples - stores all the tuples remaining in tree to dest tape */ static void -dumptuples(BufFile *file, Sort *node) +dumptuples(Sort *node, int desttapenum) { + LeftistContext context = &PS(node)->treeContext; + struct leftist **treep = &PS(node)->Tuples; struct leftist *tp; struct leftist *newp; - struct leftist **treep = &PS(node)->Tuples; - LeftistContext context = &PS(node)->treeContext; HeapTuple tup; Assert(PS(node)->using_tape_files); @@ -728,7 +749,7 @@ dumptuples(BufFile *file, Sort *node) else newp = lmerge(tp->lt_left, tp->lt_right, context); pfree(tp); - PUTTUP(node, tup, file); + PUTTUP(node, tup, desttapenum); FREEMEM(node, tup->t_len); FREE(tup); @@ -760,11 +781,10 @@ psort_grabtuple(Sort *node, bool *should_free) { if (PS(node)->all_fetched) return NULL; - if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0) + if (TRYGETLEN(node, tuplen, PS(node)->psort_grab_tape)) { tup = ALLOCTUP(tuplen); - SETTUPLEN(tup, tuplen); - GETTUP(node, tup, tuplen, PS(node)->psort_grab_file); + GETTUP(node, tup, tuplen, PS(node)->psort_grab_tape); return tup; } else @@ -786,10 +806,11 @@ psort_grabtuple(Sort *node, bool *should_free) * length word. If seek fails we must have a completely empty * file. */ - if (BufFileSeek(PS(node)->psort_grab_file, 0, - - (long) (2 * sizeof(tlendummy)), SEEK_CUR)) + if (! LogicalTapeBackspace(PS(node)->tapeset, + PS(node)->psort_grab_tape, + 2 * sizeof(tlendummy))) return NULL; - GETLEN(tuplen, PS(node)->psort_grab_file); + GETLEN(node, tuplen, PS(node)->psort_grab_tape); PS(node)->all_fetched = false; } else @@ -798,28 +819,29 @@ psort_grabtuple(Sort *node, bool *should_free) * Back up and fetch prev tuple's ending length word. * If seek fails, assume we are at start of file. */ - if (BufFileSeek(PS(node)->psort_grab_file, 0, - - (long) sizeof(tlendummy), SEEK_CUR)) + if (! LogicalTapeBackspace(PS(node)->tapeset, + PS(node)->psort_grab_tape, + sizeof(tlendummy))) return NULL; - GETLEN(tuplen, PS(node)->psort_grab_file); - if (tuplen == 0) - elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan"); + GETLEN(node, tuplen, PS(node)->psort_grab_tape); /* * Back up to get ending length word of tuple before it. */ - if (BufFileSeek(PS(node)->psort_grab_file, 0, - - (long) (tuplen + 2*sizeof(tlendummy)), SEEK_CUR)) + if (! LogicalTapeBackspace(PS(node)->tapeset, + PS(node)->psort_grab_tape, + tuplen + 2*sizeof(tlendummy))) { /* If fail, presumably the prev tuple is the first in the file. * Back up so that it becomes next to read in forward direction * (not obviously right, but that is what in-memory case does) */ - if (BufFileSeek(PS(node)->psort_grab_file, 0, - - (long) (tuplen + sizeof(tlendummy)), SEEK_CUR)) + if (! LogicalTapeBackspace(PS(node)->tapeset, + PS(node)->psort_grab_tape, + tuplen + sizeof(tlendummy))) elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan"); return NULL; } - GETLEN(tuplen, PS(node)->psort_grab_file); + GETLEN(node, tuplen, PS(node)->psort_grab_tape); } /* @@ -827,12 +849,12 @@ psort_grabtuple(Sort *node, bool *should_free) * Note: GETTUP expects we are positioned after the initial length * word of the tuple, so back up to that point. */ - if (BufFileSeek(PS(node)->psort_grab_file, 0, - - (long) tuplen, SEEK_CUR)) + if (! LogicalTapeBackspace(PS(node)->tapeset, + PS(node)->psort_grab_tape, + tuplen)) elog(ERROR, "psort_grabtuple: too big tuple len in backward scan"); tup = ALLOCTUP(tuplen); - SETTUPLEN(tup, tuplen); - GETTUP(node, tup, tuplen, PS(node)->psort_grab_file); + GETTUP(node, tup, tuplen, PS(node)->psort_grab_tape); return tup; } else @@ -880,9 +902,10 @@ psort_markpos(Sort *node) Assert(PS(node) != (Psortstate *) NULL); if (PS(node)->using_tape_files == true) - BufFileTell(PS(node)->psort_grab_file, - & PS(node)->psort_saved_fileno, - & PS(node)->psort_saved); + LogicalTapeTell(PS(node)->tapeset, + PS(node)->psort_grab_tape, + & PS(node)->psort_saved, + & PS(node)->psort_saved_offset); else PS(node)->psort_saved = PS(node)->psort_current; } @@ -898,46 +921,41 @@ psort_restorepos(Sort *node) Assert(PS(node) != (Psortstate *) NULL); if (PS(node)->using_tape_files == true) - BufFileSeek(PS(node)->psort_grab_file, - PS(node)->psort_saved_fileno, - PS(node)->psort_saved, - SEEK_SET); + { + if (! LogicalTapeSeek(PS(node)->tapeset, + PS(node)->psort_grab_tape, + PS(node)->psort_saved, + PS(node)->psort_saved_offset)) + elog(ERROR, "psort_restorepos failed"); + } else PS(node)->psort_current = PS(node)->psort_saved; } /* - * psort_end - unlinks the tape files, and cleans up. Should not be - * called unless psort_grabtuple has returned a NULL. + * psort_end + * + * Release resources and clean up. */ void psort_end(Sort *node) { - struct tape *tp; - - if (!node->cleaned) + /* node->cleaned is probably redundant? */ + if (!node->cleaned && PS(node) != (Psortstate *) NULL) { + if (PS(node)->tapeset) + LogicalTapeSetClose(PS(node)->tapeset); + if (PS(node)->memtuples) + pfree(PS(node)->memtuples); - /* - * I'm changing this because if we are sorting a relation with no - * tuples, psortstate is NULL. - */ - if (PS(node) != (Psortstate *) NULL) - { - if (PS(node)->using_tape_files == true) - for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--) - destroytape(tp->tp_file); - else if (PS(node)->memtuples) - pfree(PS(node)->memtuples); - - NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ); - NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ); + /* XXX what about freeing leftist tree and tuples in memory? */ - pfree((void *) node->psortstate); - node->psortstate = NULL; + NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ); + NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ); - node->cleaned = TRUE; - } + pfree((void *) node->psortstate); + node->psortstate = NULL; + node->cleaned = TRUE; } } @@ -951,46 +969,22 @@ psort_rescan(Sort *node) if (((Plan *) node)->lefttree->chgParam != NULL) { psort_end(node); - node->cleaned = false; + node->cleaned = false; /* huh? */ } else if (PS(node) != (Psortstate *) NULL) { PS(node)->all_fetched = false; PS(node)->psort_current = 0; - PS(node)->psort_saved_fileno = 0; PS(node)->psort_saved = 0L; + PS(node)->psort_saved_offset = 0; if (PS(node)->using_tape_files == true) - rewind(PS(node)->psort_grab_file); + LogicalTapeRewind(PS(node)->tapeset, + PS(node)->psort_grab_tape, + false); } } -/* - * gettape - returns an open stream for writing/reading - * - * Returns: - * Open stream for writing/reading. - * NULL if unable to open temporary file. - * - * There used to be a lot of cruft here to try to ensure that we destroyed - * all the tape files; but it didn't really work. Now we rely on fd.c to - * clean up temp files if an error occurs. - */ -static BufFile * -gettape() -{ - return BufFileCreateTemp(); -} - -/* - * destroytape - unlinks the tape - */ -static void -destroytape(BufFile *file) -{ - BufFileClose(file); -} - static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup) { diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h index 2416d645cfe56e8b7f8eb040a4e49d0395b4cdcc..bc472b9d14ebbeec6c9ed58eeea2b13cb773f75c 100644 --- a/src/include/storage/buffile.h +++ b/src/include/storage/buffile.h @@ -17,7 +17,7 @@ * * Copyright (c) 1994, Regents of the University of California * - * $Id: buffile.h,v 1.1 1999/10/13 15:02:32 tgl Exp $ + * $Id: buffile.h,v 1.2 1999/10/16 19:49:27 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -37,11 +37,12 @@ typedef struct BufFile BufFile; extern BufFile *BufFileCreateTemp(void); extern BufFile *BufFileCreate(File file); -extern BufFile *BufFileReaccess(BufFile *file); extern void BufFileClose(BufFile *file); extern size_t BufFileRead(BufFile *file, void *ptr, size_t size); extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size); extern int BufFileSeek(BufFile *file, int fileno, long offset, int whence); extern void BufFileTell(BufFile *file, int *fileno, long *offset); +extern int BufFileSeekBlock(BufFile *file, long blknum); +extern long BufFileTellBlock(BufFile *file); #endif /* BUFFILE_H */ diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h new file mode 100644 index 0000000000000000000000000000000000000000..16fc2c12f672e942e648b508763c790415106a7b --- /dev/null +++ b/src/include/utils/logtape.h @@ -0,0 +1,41 @@ +/*------------------------------------------------------------------------- + * + * logtape.h + * Management of "logical tapes" within temporary files. + * + * See logtape.c for explanations. + * + * Copyright (c) 1994, Regents of the University of California + * + * $Id: logtape.h,v 1.1 1999/10/16 19:49:28 tgl Exp $ + * + *------------------------------------------------------------------------- + */ + +#ifndef LOGTAPE_H +#define LOGTAPE_H + +/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */ + +typedef struct LogicalTapeSet LogicalTapeSet; + +/* + * prototypes for functions in logtape.c + */ + +extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes); +extern void LogicalTapeSetClose(LogicalTapeSet *lts); +extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum, + void *ptr, size_t size); +extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum, + void *ptr, size_t size); +extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite); +extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum); +extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, + size_t size); +extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum, + long blocknum, int offset); +extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum, + long *blocknum, int *offset); + +#endif /* LOGTAPE_H */ diff --git a/src/include/utils/psort.h b/src/include/utils/psort.h index 9a100bad0d8b6ae5a5c38620ebe60bb0fe39e8cb..5f7a638442deb3c6c19141b34bdd1fff782d05ae 100644 --- a/src/include/utils/psort.h +++ b/src/include/utils/psort.h @@ -1,101 +1,21 @@ /*------------------------------------------------------------------------- * * psort.h - * - * + * Polyphase merge sort. * * Copyright (c) 1994, Regents of the University of California * - * $Id: psort.h,v 1.22 1999/10/13 15:02:28 tgl Exp $ + * $Id: psort.h,v 1.23 1999/10/16 19:49:28 tgl Exp $ * *------------------------------------------------------------------------- */ #ifndef PSORT_H #define PSORT_H -#include "access/relscan.h" +#include "access/htup.h" +#include "access/skey.h" #include "nodes/plannodes.h" -#include "storage/buffile.h" -#include "utils/lselect.h" - -#define MAXTAPES 7 /* See Knuth Fig. 70, p273 */ - -struct tape -{ - int tp_dummy; /* (D) */ - int tp_fib; /* (A) */ - BufFile *tp_file; /* (TAPE) */ - struct tape *tp_prev; -}; - -struct cmplist -{ - int cp_attn; /* attribute number */ - int cp_num; /* comparison function code */ - int cp_rev; /* invert comparison flag */ - struct cmplist *cp_next; /* next in chain */ -}; - -/* This structure preserves the state of psort between calls from different - * nodes to its interface functions. Basically, it includes all of the global - * variables in psort. In case you were wondering, pointers to these structures - * are included in Sort node structures. -Rex 2.6.1995 - */ -typedef struct Psortstate -{ - LeftistContextData treeContext; - - int TapeRange; - int Level; - int TotalDummy; - struct tape Tape[MAXTAPES]; - - int BytesRead; - int BytesWritten; - int tupcount; - - struct leftist *Tuples; - - BufFile *psort_grab_file; - long psort_current; /* array index (only used if not tape) */ - int psort_saved_fileno; /* upper bits of psort_saved, if tape */ - long psort_saved; /* could be file offset, or array index */ - bool using_tape_files; - bool all_fetched; /* this is for cursors */ - - HeapTuple *memtuples; -} Psortstate; - -#ifdef EBUG -#include "storage/buf.h" -#include "storage/bufmgr.h" - -#define PDEBUG(PROC, S1)\ -elog(DEBUG, "%s:%d>> PROC: %s.", __FILE__, __LINE__, S1) - -#define PDEBUG2(PROC, S1, D1)\ -elog(DEBUG, "%s:%d>> PROC: %s %d.", __FILE__, __LINE__, S1, D1) - -#define PDEBUG4(PROC, S1, D1, S2, D2)\ -elog(DEBUG, "%s:%d>> PROC: %s %d, %s %d.", __FILE__, __LINE__, S1, D1, S2, D2) - -#define VDEBUG(VAR, FMT)\ -elog(DEBUG, "%s:%d>> VAR =FMT", __FILE__, __LINE__, VAR) - -#define ASSERT(EXPR, STR)\ -if (!(EXPR)) elog(FATAL, "%s:%d>> %s", __FILE__, __LINE__, STR) - -#define TRACE(VAL, CODE)\ -if (1) CODE; else - -#else -#define PDEBUG(MSG) -#define VDEBUG(VAR, FMT) -#define ASSERT(EXPR, MSG) -#define TRACE(VAL, CODE) -#endif -/* psort.c */ extern bool psort_begin(Sort *node, int nkeys, ScanKey key); extern HeapTuple psort_grabtuple(Sort *node, bool *should_free); extern void psort_markpos(Sort *node);