diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 926bb895fa4e42544ed9e6f975b58356de8dc987..9eca63cbbfa2e5c79673ca08e30599baf59451c0 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -49,6 +49,7 @@ #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" +#include "storage/large_object.h" #include "storage/latch.h" #include "storage/pmsignal.h" #include "storage/predicate.h" @@ -4352,6 +4353,7 @@ WriteControlFile(void) ControlFile->indexMaxKeys = INDEX_MAX_KEYS; ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE; + ControlFile->loblksize = LOBLKSIZE; #ifdef HAVE_INT64_TIMESTAMP ControlFile->enableIntTimes = true; @@ -4545,6 +4547,13 @@ ReadControlFile(void) " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.", ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE), errhint("It looks like you need to recompile or initdb."))); + if (ControlFile->loblksize != LOBLKSIZE) + ereport(FATAL, + (errmsg("database files are incompatible with server"), + errdetail("The database cluster was initialized with LOBLKSIZE %d," + " but the server was compiled with LOBLKSIZE %d.", + ControlFile->loblksize, (int) LOBLKSIZE), + errhint("It looks like you need to recompile or initdb."))); #ifdef HAVE_INT64_TIMESTAMP if (ControlFile->enableIntTimes != true) diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c index 57ec1c2a6f97aff4b5a571cc9835c21240aee156..091814230113ecbe3cc8c0a3ba5872dd2fcb3ef5 100644 --- a/src/backend/storage/large_object/inv_api.c +++ b/src/backend/storage/large_object/inv_api.c @@ -173,13 +173,38 @@ myLargeObjectExists(Oid loid, Snapshot snapshot) } -static int32 -getbytealen(bytea *data) +/* + * Extract data field from a pg_largeobject tuple, detoasting if needed + * and verifying that the length is sane. Returns data pointer (a bytea *), + * data length, and an indication of whether to pfree the data pointer. + */ +static void +getdatafield(Form_pg_largeobject tuple, + bytea **pdatafield, + int *plen, + bool *pfreeit) { - Assert(!VARATT_IS_EXTENDED(data)); - if (VARSIZE(data) < VARHDRSZ) - elog(ERROR, "invalid VARSIZE(data)"); - return (VARSIZE(data) - VARHDRSZ); + bytea *datafield; + int len; + bool freeit; + + datafield = &(tuple->data); /* see note at top of file */ + freeit = false; + if (VARATT_IS_EXTENDED(datafield)) + { + datafield = (bytea *) + heap_tuple_untoast_attr((struct varlena *) datafield); + freeit = true; + } + len = VARSIZE(datafield) - VARHDRSZ; + if (len < 0 || len > LOBLKSIZE) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d", + tuple->loid, tuple->pageno, len))); + *pdatafield = datafield; + *plen = len; + *pfreeit = freeit; } @@ -366,20 +391,14 @@ inv_getsize(LargeObjectDesc *obj_desc) { Form_pg_largeobject data; bytea *datafield; + int len; bool pfreeit; if (HeapTupleHasNulls(tuple)) /* paranoia */ elog(ERROR, "null field found in pg_largeobject"); data = (Form_pg_largeobject) GETSTRUCT(tuple); - datafield = &(data->data); /* see note at top of file */ - pfreeit = false; - if (VARATT_IS_EXTENDED(datafield)) - { - datafield = (bytea *) - heap_tuple_untoast_attr((struct varlena *) datafield); - pfreeit = true; - } - lastbyte = (uint64) data->pageno * LOBLKSIZE + getbytealen(datafield); + getdatafield(data, &datafield, &len, &pfreeit); + lastbyte = (uint64) data->pageno * LOBLKSIZE + len; if (pfreeit) pfree(datafield); } @@ -506,15 +525,7 @@ inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes) off = (int) (obj_desc->offset - pageoff); Assert(off >= 0 && off < LOBLKSIZE); - datafield = &(data->data); /* see note at top of file */ - pfreeit = false; - if (VARATT_IS_EXTENDED(datafield)) - { - datafield = (bytea *) - heap_tuple_untoast_attr((struct varlena *) datafield); - pfreeit = true; - } - len = getbytealen(datafield); + getdatafield(data, &datafield, &len, &pfreeit); if (len > off) { n = len - off; @@ -630,16 +641,7 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes) * * First, load old data into workbuf */ - datafield = &(olddata->data); /* see note at top of file */ - pfreeit = false; - if (VARATT_IS_EXTENDED(datafield)) - { - datafield = (bytea *) - heap_tuple_untoast_attr((struct varlena *) datafield); - pfreeit = true; - } - len = getbytealen(datafield); - Assert(len <= LOBLKSIZE); + getdatafield(olddata, &datafield, &len, &pfreeit); memcpy(workb, VARDATA(datafield), len); if (pfreeit) pfree(datafield); @@ -815,19 +817,11 @@ inv_truncate(LargeObjectDesc *obj_desc, int64 len) if (olddata != NULL && olddata->pageno == pageno) { /* First, load old data into workbuf */ - bytea *datafield = &(olddata->data); /* see note at top of - * file */ - bool pfreeit = false; + bytea *datafield; int pagelen; + bool pfreeit; - if (VARATT_IS_EXTENDED(datafield)) - { - datafield = (bytea *) - heap_tuple_untoast_attr((struct varlena *) datafield); - pfreeit = true; - } - pagelen = getbytealen(datafield); - Assert(pagelen <= LOBLKSIZE); + getdatafield(olddata, &datafield, &pagelen, &pfreeit); memcpy(workb, VARDATA(datafield), pagelen); if (pfreeit) pfree(datafield); diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 24e211668d158726003b0c66c83da6eed0d37e20..f81502466e59771add1bee2acdea74e305a2f236 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -287,6 +287,8 @@ main(int argc, char *argv[]) ControlFile.indexMaxKeys); printf(_("Maximum size of a TOAST chunk: %u\n"), ControlFile.toast_max_chunk_size); + printf(_("Size of a large-object chunk: %u\n"), + ControlFile.loblksize); printf(_("Date/time type storage: %s\n"), (ControlFile.enableIntTimes ? _("64-bit integers") : _("floating-point numbers"))); printf(_("Float4 argument passing: %s\n"), diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c index d11280e1d4d7319ef4e4764e6e04caf3394c3972..915a1ed64634ee1bace75a5358f2a3975d1c3aae 100644 --- a/src/bin/pg_resetxlog/pg_resetxlog.c +++ b/src/bin/pg_resetxlog/pg_resetxlog.c @@ -52,6 +52,7 @@ #include "catalog/catversion.h" #include "catalog/pg_control.h" #include "common/fe_memutils.h" +#include "storage/large_object.h" #include "pg_getopt.h" @@ -536,6 +537,7 @@ GuessControlValues(void) ControlFile.nameDataLen = NAMEDATALEN; ControlFile.indexMaxKeys = INDEX_MAX_KEYS; ControlFile.toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE; + ControlFile.loblksize = LOBLKSIZE; #ifdef HAVE_INT64_TIMESTAMP ControlFile.enableIntTimes = true; #else @@ -620,6 +622,8 @@ PrintControlValues(bool guessed) ControlFile.indexMaxKeys); printf(_("Maximum size of a TOAST chunk: %u\n"), ControlFile.toast_max_chunk_size); + printf(_("Size of a large-object chunk: %u\n"), + ControlFile.loblksize); printf(_("Date/time type storage: %s\n"), (ControlFile.enableIntTimes ? _("64-bit integers") : _("floating-point numbers"))); printf(_("Float4 argument passing: %s\n"), diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 4579eb6cef6abf49c8ec5134d882dc02891732b0..ba79d253ae5ff587bfb2afec4624e5bd99a7c93c 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -21,7 +21,7 @@ /* Version identifier for this pg_control format */ -#define PG_CONTROL_VERSION 941 +#define PG_CONTROL_VERSION 942 /* * Body of CheckPoint XLOG records. This is declared here because we keep @@ -207,6 +207,7 @@ typedef struct ControlFileData uint32 indexMaxKeys; /* max number of columns in an index */ uint32 toast_max_chunk_size; /* chunk size in TOAST tables */ + uint32 loblksize; /* chunk size in pg_largeobject */ /* flag indicating internal format of timestamp, interval, time */ bool enableIntTimes; /* int64 storage enabled? */ diff --git a/src/include/storage/large_object.h b/src/include/storage/large_object.h index 0d81a4bc1b6d4636978c94fa636a6d4b5bef4079..30438a98cf3ce087e58e30b1081a7d80ca1c54ef 100644 --- a/src/include/storage/large_object.h +++ b/src/include/storage/large_object.h @@ -66,6 +66,8 @@ typedef struct LargeObjectDesc * Also, it seems to be a smart move to make the page size be a power of 2, * since clients will often be written to send data in power-of-2 blocks. * This avoids unnecessary tuple updates caused by partial-page writes. + * + * NB: Changing LOBLKSIZE requires an initdb. */ #define LOBLKSIZE (BLCKSZ / 4)