diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 34b74f6c3844a3a172cd660357f8ccbc48f57f3c..fbb6489915ec4068e77805829f4058b1dd6f82bf 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -38,7 +38,7 @@ /* * We don't want to log each fetching of a value from a sequence, * so we pre-log a few fetches in advance. In the event of - * crash we can lose as much as we pre-logged. + * crash we can lose (skip over) as many values as we pre-logged. */ #define SEQ_LOG_VALS 32 @@ -73,7 +73,7 @@ typedef struct SeqTableData int64 cached; /* last value already cached for nextval */ /* if last != cached, we have not used up all the cached values */ int64 increment; /* copy of sequence's increment field */ - /* note that increment is zero until we first do read_info() */ + /* note that increment is zero until we first do read_seq_tuple() */ } SeqTableData; typedef SeqTableData *SeqTable; @@ -90,7 +90,8 @@ static void fill_seq_with_data(Relation rel, HeapTuple tuple); static int64 nextval_internal(Oid relid); static Relation open_share_lock(SeqTable seq); static void init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel); -static Form_pg_sequence read_info(SeqTable elm, Relation rel, Buffer *buf); +static Form_pg_sequence read_seq_tuple(SeqTable elm, Relation rel, + Buffer *buf, HeapTuple seqtuple); static void init_params(List *options, bool isInit, Form_pg_sequence new, List **owned_by); static void do_setval(Oid relid, int64 next, bool iscalled); @@ -187,7 +188,7 @@ DefineSequence(CreateSeqStmt *seq) case SEQ_COL_LOG: coldef->typeName = makeTypeNameFromOid(INT8OID, -1); coldef->colname = "log_cnt"; - value[i - 1] = Int64GetDatum((int64) 1); + value[i - 1] = Int64GetDatum((int64) 0); break; case SEQ_COL_CYCLE: coldef->typeName = makeTypeNameFromOid(BOOLOID, -1); @@ -247,10 +248,8 @@ ResetSequence(Oid seq_relid) SeqTable elm; Form_pg_sequence seq; Buffer buf; - Page page; + HeapTupleData seqtuple; HeapTuple tuple; - HeapTupleData tupledata; - ItemId lp; /* * Read the old sequence. This does a bit more work than really @@ -258,18 +257,12 @@ ResetSequence(Oid seq_relid) * indeed a sequence. */ init_sequence(seq_relid, &elm, &seq_rel); - read_info(elm, seq_rel, &buf); + (void) read_seq_tuple(elm, seq_rel, &buf, &seqtuple); /* * Copy the existing sequence tuple. */ - page = BufferGetPage(buf); - lp = PageGetItemId(page, FirstOffsetNumber); - Assert(ItemIdIsNormal(lp)); - - tupledata.t_data = (HeapTupleHeader) PageGetItem(page, lp); - tupledata.t_len = ItemIdGetLength(lp); - tuple = heap_copytuple(&tupledata); + tuple = heap_copytuple(&seqtuple); /* Now we're done with the old page */ UnlockReleaseBuffer(buf); @@ -281,7 +274,7 @@ ResetSequence(Oid seq_relid) seq = (Form_pg_sequence) GETSTRUCT(tuple); seq->last_value = seq->start_value; seq->is_called = false; - seq->log_cnt = 1; + seq->log_cnt = 0; /* * Create a new storage file for the sequence. We want to keep the @@ -378,12 +371,6 @@ fill_seq_with_data(Relation rel, HeapTuple tuple) xl_seq_rec xlrec; XLogRecPtr recptr; XLogRecData rdata[2]; - Form_pg_sequence newseq = (Form_pg_sequence) GETSTRUCT(tuple); - - /* We do not log first nextval call, so "advance" sequence here */ - /* Note we are scribbling on local tuple, not the disk buffer */ - newseq->is_called = true; - newseq->log_cnt = 0; xlrec.node = rel->rd_node; rdata[0].data = (char *) &xlrec; @@ -419,7 +406,7 @@ AlterSequence(AlterSeqStmt *stmt) SeqTable elm; Relation seqrel; Buffer buf; - Page page; + HeapTupleData seqtuple; Form_pg_sequence seq; FormData_pg_sequence new; List *owned_by; @@ -442,8 +429,7 @@ AlterSequence(AlterSeqStmt *stmt) stmt->sequence->relname); /* lock page' buffer and read tuple into new sequence structure */ - seq = read_info(elm, seqrel, &buf); - page = BufferGetPage(buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); /* Copy old values of options into workspace */ memcpy(&new, seq, sizeof(FormData_pg_sequence)); @@ -456,10 +442,10 @@ AlterSequence(AlterSeqStmt *stmt) elm->cached = elm->last; /* Now okay to update the on-disk tuple */ - memcpy(seq, &new, sizeof(FormData_pg_sequence)); - START_CRIT_SECTION(); + memcpy(seq, &new, sizeof(FormData_pg_sequence)); + MarkBufferDirty(buf); /* XLOG stuff */ @@ -468,6 +454,7 @@ AlterSequence(AlterSeqStmt *stmt) xl_seq_rec xlrec; XLogRecPtr recptr; XLogRecData rdata[2]; + Page page = BufferGetPage(buf); xlrec.node = seqrel->rd_node; rdata[0].data = (char *) &xlrec; @@ -475,9 +462,8 @@ AlterSequence(AlterSeqStmt *stmt) rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -541,6 +527,7 @@ nextval_internal(Oid relid) Relation seqrel; Buffer buf; Page page; + HeapTupleData seqtuple; Form_pg_sequence seq; int64 incby, maxv, @@ -579,7 +566,7 @@ nextval_internal(Oid relid) } /* lock page' buffer and read tuple */ - seq = read_info(elm, seqrel, &buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); page = BufferGetPage(buf); last = next = result = seq->last_value; @@ -591,9 +578,8 @@ nextval_internal(Oid relid) if (!seq->is_called) { - rescnt++; /* last_value if not called */ + rescnt++; /* return last_value if not is_called */ fetch--; - log--; } /* @@ -606,7 +592,7 @@ nextval_internal(Oid relid) * checkpoint would fail to advance the sequence past the logged values. * In this case we may as well fetch extra values. */ - if (log < fetch) + if (log < fetch || !seq->is_called) { /* forced log to satisfy local demand for values */ fetch = log = fetch + SEQ_LOG_VALS; @@ -697,8 +683,18 @@ nextval_internal(Oid relid) last_used_seq = elm; + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); + /* + * We must mark the buffer dirty before doing XLogInsert(); see notes in + * SyncOneBuffer(). However, we don't apply the desired changes just yet. + * This looks like a violation of the buffer update protocol, but it is + * in fact safe because we hold exclusive lock on the buffer. Any other + * process, including a checkpoint, that tries to examine the buffer + * contents will block until we release the lock, and then will see the + * final state that we install below. + */ MarkBufferDirty(buf); /* XLOG stuff */ @@ -708,20 +704,26 @@ nextval_internal(Oid relid) XLogRecPtr recptr; XLogRecData rdata[2]; - xlrec.node = seqrel->rd_node; - rdata[0].data = (char *) &xlrec; - rdata[0].len = sizeof(xl_seq_rec); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + /* + * We don't log the current state of the tuple, but rather the state + * as it would appear after "log" more fetches. This lets us skip + * that many future WAL records, at the cost that we lose those + * sequence values if we crash. + */ /* set values that will be saved in xlog */ seq->last_value = next; seq->is_called = true; seq->log_cnt = 0; - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + xlrec.node = seqrel->rd_node; + rdata[0].data = (char *) &xlrec; + rdata[0].len = sizeof(xl_seq_rec); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -731,7 +733,7 @@ nextval_internal(Oid relid) PageSetTLI(page, ThisTimeLineID); } - /* update on-disk data */ + /* Now update sequence tuple to the intended final state */ seq->last_value = last; /* last fetched number */ seq->is_called = true; seq->log_cnt = log; /* how much is logged */ @@ -830,6 +832,7 @@ do_setval(Oid relid, int64 next, bool iscalled) SeqTable elm; Relation seqrel; Buffer buf; + HeapTupleData seqtuple; Form_pg_sequence seq; /* open and AccessShareLock sequence */ @@ -846,7 +849,7 @@ do_setval(Oid relid, int64 next, bool iscalled) PreventCommandIfReadOnly("setval()"); /* lock page' buffer and read tuple */ - seq = read_info(elm, seqrel, &buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); if ((next < seq->min_value) || (next > seq->max_value)) { @@ -874,8 +877,13 @@ do_setval(Oid relid, int64 next, bool iscalled) /* In any case, forget any future cached numbers */ elm->cached = elm->last; + /* ready to change the on-disk (or really, in-buffer) tuple */ START_CRIT_SECTION(); + seq->last_value = next; /* last fetched number */ + seq->is_called = iscalled; + seq->log_cnt = 0; + MarkBufferDirty(buf); /* XLOG stuff */ @@ -892,14 +900,8 @@ do_setval(Oid relid, int64 next, bool iscalled) rdata[0].buffer = InvalidBuffer; rdata[0].next = &(rdata[1]); - /* set values that will be saved in xlog */ - seq->last_value = next; - seq->is_called = true; - seq->log_cnt = 0; - - rdata[1].data = (char *) page + ((PageHeader) page)->pd_upper; - rdata[1].len = ((PageHeader) page)->pd_special - - ((PageHeader) page)->pd_upper; + rdata[1].data = (char *) seqtuple.t_data; + rdata[1].len = seqtuple.t_len; rdata[1].buffer = InvalidBuffer; rdata[1].next = NULL; @@ -909,11 +911,6 @@ do_setval(Oid relid, int64 next, bool iscalled) PageSetTLI(page, ThisTimeLineID); } - /* save info in sequence relation */ - seq->last_value = next; /* last fetched number */ - seq->is_called = iscalled; - seq->log_cnt = (iscalled) ? 0 : 1; - END_CRIT_SECTION(); UnlockReleaseBuffer(buf); @@ -1066,13 +1063,20 @@ init_sequence(Oid relid, SeqTable *p_elm, Relation *p_rel) } -/* Given an opened relation, lock the page buffer and find the tuple */ +/* + * Given an opened sequence relation, lock the page buffer and find the tuple + * + * *buf receives the reference to the pinned-and-ex-locked buffer + * *seqtuple receives the reference to the sequence tuple proper + * (this arg should point to a local variable of type HeapTupleData) + * + * Function's return value points to the data payload of the tuple + */ static Form_pg_sequence -read_info(SeqTable elm, Relation rel, Buffer *buf) +read_seq_tuple(SeqTable elm, Relation rel, Buffer *buf, HeapTuple seqtuple) { Page page; ItemId lp; - HeapTupleData tuple; sequence_magic *sm; Form_pg_sequence seq; @@ -1088,7 +1092,10 @@ read_info(SeqTable elm, Relation rel, Buffer *buf) lp = PageGetItemId(page, FirstOffsetNumber); Assert(ItemIdIsNormal(lp)); - tuple.t_data = (HeapTupleHeader) PageGetItem(page, lp); + + /* Note we currently only bother to set these two fields of *seqtuple */ + seqtuple->t_data = (HeapTupleHeader) PageGetItem(page, lp); + seqtuple->t_len = ItemIdGetLength(lp); /* * Previous releases of Postgres neglected to prevent SELECT FOR UPDATE on @@ -1098,15 +1105,15 @@ read_info(SeqTable elm, Relation rel, Buffer *buf) * bit update, ie, don't bother to WAL-log it, since we can certainly do * this again if the update gets lost. */ - if (HeapTupleHeaderGetXmax(tuple.t_data) != InvalidTransactionId) + if (HeapTupleHeaderGetXmax(seqtuple->t_data) != InvalidTransactionId) { - HeapTupleHeaderSetXmax(tuple.t_data, InvalidTransactionId); - tuple.t_data->t_infomask &= ~HEAP_XMAX_COMMITTED; - tuple.t_data->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmax(seqtuple->t_data, InvalidTransactionId); + seqtuple->t_data->t_infomask &= ~HEAP_XMAX_COMMITTED; + seqtuple->t_data->t_infomask |= HEAP_XMAX_INVALID; SetBufferCommitInfoNeedsSave(*buf); } - seq = (Form_pg_sequence) GETSTRUCT(&tuple); + seq = (Form_pg_sequence) GETSTRUCT(seqtuple); /* this is a handy place to update our copy of the increment */ elm->increment = seq->increment_by; @@ -1210,6 +1217,13 @@ init_params(List *options, bool isInit, defel->defname); } + /* + * We must reset log_cnt when isInit or when changing any parameters + * that would affect future nextval allocations. + */ + if (isInit) + new->log_cnt = 0; + /* INCREMENT BY */ if (increment_by != NULL) { @@ -1218,6 +1232,7 @@ init_params(List *options, bool isInit, ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("INCREMENT must not be zero"))); + new->log_cnt = 0; } else if (isInit) new->increment_by = 1; @@ -1227,30 +1242,39 @@ init_params(List *options, bool isInit, { new->is_cycled = intVal(is_cycled->arg); Assert(BoolIsValid(new->is_cycled)); + new->log_cnt = 0; } else if (isInit) new->is_cycled = false; /* MAXVALUE (null arg means NO MAXVALUE) */ if (max_value != NULL && max_value->arg) + { new->max_value = defGetInt64(max_value); + new->log_cnt = 0; + } else if (isInit || max_value != NULL) { if (new->increment_by > 0) new->max_value = SEQ_MAXVALUE; /* ascending seq */ else new->max_value = -1; /* descending seq */ + new->log_cnt = 0; } /* MINVALUE (null arg means NO MINVALUE) */ if (min_value != NULL && min_value->arg) + { new->min_value = defGetInt64(min_value); + new->log_cnt = 0; + } else if (isInit || min_value != NULL) { if (new->increment_by > 0) new->min_value = 1; /* ascending seq */ else new->min_value = SEQ_MINVALUE; /* descending seq */ + new->log_cnt = 0; } /* crosscheck min/max */ @@ -1312,13 +1336,12 @@ init_params(List *options, bool isInit, else new->last_value = new->start_value; new->is_called = false; - new->log_cnt = 1; + new->log_cnt = 0; } else if (isInit) { new->last_value = new->start_value; new->is_called = false; - new->log_cnt = 1; } /* crosscheck RESTART (or current value, if changing MIN/MAX) */ @@ -1361,6 +1384,7 @@ init_params(List *options, bool isInit, errmsg("CACHE (%s) must be greater than zero", buf))); } + new->log_cnt = 0; } else if (isInit) new->cache_value = 1; @@ -1473,6 +1497,7 @@ pg_sequence_parameters(PG_FUNCTION_ARGS) SeqTable elm; Relation seqrel; Buffer buf; + HeapTupleData seqtuple; Form_pg_sequence seq; /* open and AccessShareLock sequence */ @@ -1500,7 +1525,7 @@ pg_sequence_parameters(PG_FUNCTION_ARGS) memset(isnull, 0, sizeof(isnull)); - seq = read_info(elm, seqrel, &buf); + seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); values[0] = Int64GetDatum(seq->start_value); values[1] = Int64GetDatum(seq->min_value); @@ -1555,7 +1580,7 @@ seq_redo(XLogRecPtr lsn, XLogRecord *record) item = (char *) xlrec + sizeof(xl_seq_rec); itemsz = record->xl_len - sizeof(xl_seq_rec); - itemsz = MAXALIGN(itemsz); + if (PageAddItem(localpage, (Item) item, itemsz, FirstOffsetNumber, false, false) == InvalidOffsetNumber) elog(PANIC, "seq_redo: failed to add item to page"); diff --git a/src/test/regress/expected/sequence.out b/src/test/regress/expected/sequence.out index 578778c312972b3eb81fce157bdccdb50311093a..87feb08b14afa302928bf17f7409bde3a8441ea2 100644 --- a/src/test/regress/expected/sequence.out +++ b/src/test/regress/expected/sequence.out @@ -170,7 +170,7 @@ ALTER TABLE foo_seq RENAME TO foo_seq_new; SELECT * FROM foo_seq_new; sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 1 | 1 | 1 | 9223372036854775807 | 1 | 1 | 1 | f | f + foo_seq | 1 | 1 | 1 | 9223372036854775807 | 1 | 1 | 0 | f | f (1 row) SELECT nextval('foo_seq_new'); @@ -188,7 +188,7 @@ SELECT nextval('foo_seq_new'); SELECT * FROM foo_seq_new; sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 2 | 1 | 1 | 9223372036854775807 | 1 | 1 | 32 | f | t + foo_seq | 2 | 1 | 1 | 9223372036854775807 | 1 | 1 | 31 | f | t (1 row) DROP SEQUENCE foo_seq_new; diff --git a/src/test/regress/expected/sequence_1.out b/src/test/regress/expected/sequence_1.out index 22399ab68dc5e0b9da4a61ce3ec0af98dfcb58b5..124967ede112027da53e4a91b69955039d7fd37a 100644 --- a/src/test/regress/expected/sequence_1.out +++ b/src/test/regress/expected/sequence_1.out @@ -170,7 +170,7 @@ ALTER TABLE foo_seq RENAME TO foo_seq_new; SELECT * FROM foo_seq_new; sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 1 | 1 | 1 | 9223372036854775807 | 1 | 1 | 1 | f | f + foo_seq | 1 | 1 | 1 | 9223372036854775807 | 1 | 1 | 0 | f | f (1 row) SELECT nextval('foo_seq_new'); @@ -188,7 +188,7 @@ SELECT nextval('foo_seq_new'); SELECT * FROM foo_seq_new; sequence_name | last_value | start_value | increment_by | max_value | min_value | cache_value | log_cnt | is_cycled | is_called ---------------+------------+-------------+--------------+---------------------+-----------+-------------+---------+-----------+----------- - foo_seq | 2 | 1 | 1 | 9223372036854775807 | 1 | 1 | 31 | f | t + foo_seq | 2 | 1 | 1 | 9223372036854775807 | 1 | 1 | 32 | f | t (1 row) DROP SEQUENCE foo_seq_new; @@ -234,6 +234,9 @@ DROP SEQUENCE myseq2; -- -- Alter sequence -- +ALTER SEQUENCE IF EXISTS sequence_test2 RESTART WITH 24 + INCREMENT BY 4 MAXVALUE 36 MINVALUE 5 CYCLE; +NOTICE: relation "sequence_test2" does not exist, skipping CREATE SEQUENCE sequence_test2 START WITH 32; SELECT nextval('sequence_test2'); nextval