From e9efc49b6c1747da7256d0c276b97689f337f871 Mon Sep 17 00:00:00 2001 From: Oswald Buddenhagen Date: Mon, 20 Jul 2020 20:53:21 +0200 Subject: [PATCH] do away with newmaxuid now that expiration order is determined by a single loop ordered by far-side UIDs, it is no longer necessary to accurately track the highest seen UID. as a side effect, this fixes a problem reported (way too long ago) by Yuri D'Elia: we failed to up newmaxuid for messages we produced ourselves, so we would keep enumerating the same messages until we also propagated externally generated messages from that mailbox - which might have been never for the server side of archive/trash mailboxes. --- src/run-tests.pl | 18 ++++++------- src/sync.c | 66 ++++++++++++++++++++++++++++-------------------- 2 files changed, 47 insertions(+), 37 deletions(-) diff --git a/src/run-tests.pl b/src/run-tests.pl index 77e0abc..4c257ee 100755 --- a/src/run-tests.pl +++ b/src/run-tests.pl @@ -61,7 +61,7 @@ my @X01 = ( A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "FT", G, 7, "FT", I, 9, "", J, 10, "" ], [ 10, A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", G, 7, "FT", H, 8, "T", J, 9, "", I, 10, "" ], - [ 9, 0, 9, + [ 10, 0, 10, 1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 5, 5, "T", 6, 0, "", 7, 7, "FT", 0, 8, "", 10, 9, "", 9, 10, "" ], ); test("full", \@x01, \@X01, @O01); @@ -73,7 +73,7 @@ my @X02 = ( A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", I, 9, "", J, 10, "" ], [ 10, A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", J, 9, "", I, 10, "" ], - [ 9, 0, 9, + [ 10, 0, 10, 1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 10, 9, "", 9, 10, "" ], ); test("full + expunge both", \@x01, \@X02, @O02); @@ -85,7 +85,7 @@ my @X03 = ( A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "FT", G, 7, "FT", I, 9, "", J, 10, "" ], [ 10, A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", J, 9, "", I, 10, "" ], - [ 9, 0, 9, + [ 10, 0, 10, 1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 5, 0, "T", 6, 0, "", 7, 0, "T", 10, 9, "", 9, 10, "" ], ); test("full + expunge near side", \@x01, \@X03, @O03); @@ -133,7 +133,7 @@ my @X07 = ( A, 1, "F", B, 2, "", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "F", G, 7, "FT", I, 9, "", J, 10, "" ], [ 10, A, 1, "", B, 2, "F", C, 3, "F", D, 4, "", E, 5, "", G, 7, "", H, 8, "", J, 9, "", I, 10, "" ], - [ 9, 0, 9, + [ 10, 0, 10, 1, 1, "", 2, 2, "", 3, 3, "", 4, 4, "", 5, 5, "", 6, 6, "", 7, 7, "", 8, 8, "", 10, 9, "", 9, 10, "" ], ); test("new", \@x01, \@X07, @O07); @@ -168,7 +168,7 @@ my @X11 = ( A, 1, "", B, 2, "*" ], [ 2, C, 1, "*", A, 2, "" ], - [ 2, 0, 1, + [ 2, 0, 2, 0, 1, "^", 1, 2, "", 2, 0, "^" ], ); test("max size", \@x10, \@X11, @O11); @@ -180,7 +180,7 @@ my @X22 = ( A, 1, "", B, 2, "*", C, 3, "*" ], [ 2, C, 1, "*", A, 2, "" ], - [ 2, 0, 1, + [ 3, 0, 2, 3, 1, "", 1, 2, "", 2, 0, "^" ], ); test("near side max size", \@X11, \@X22, @O22); @@ -203,7 +203,7 @@ my @X31 = ( A, 1, "F", B, 2, "", C, 3, "S", D, 4, "", E, 5, "S", F, 6, "" ], [ 5, A, 1, "F", B, 2, "", D, 3, "", E, 4, "S", F, 5, "" ], - [ 6, 3, 0, + [ 6, 3, 5, 1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ], ); test("max messages", \@x30, \@X31, @O31); @@ -215,7 +215,7 @@ my @X32 = ( A, 1, "F", B, 2, "", C, 3, "S", D, 4, "", E, 5, "S", F, 6, "" ], [ 4, A, 1, "F", D, 2, "", E, 3, "S", F, 4, "" ], - [ 6, 3, 0, + [ 6, 3, 4, 1, 1, "F", 4, 2, "", 5, 3, "S", 6, 4, "" ], ); test("max messages vs. unread", \@x30, \@X32, @O32); @@ -236,7 +236,7 @@ my @X51 = ( A, 1, "S", B, 2, "FS", C, 3, "S", D, 4, "", E, 5, "", F, 6, "" ], [ 6, B, 2, "FS", D, 4, "", E, 5, "", F, 6, "" ], - [ 6, 3, 0, + [ 6, 3, 6, 2, 2, "FS", 4, 4, "", 5, 5, "", 6, 6, "" ], ); test("max messages + expunge", \@x50, \@X51, @O51); diff --git a/src/sync.c b/src/sync.c index 18a381b..166d7fb 100644 --- a/src/sync.c +++ b/src/sync.c @@ -167,7 +167,6 @@ typedef struct { uint ref_count, nsrecs, opts[2]; uint new_pending[2], flags_pending[2], trash_pending[2]; uint maxuid[2]; // highest UID that was already propagated - uint newmaxuid[2]; // highest UID that is currently being propagated uint uidval[2]; // UID validity value uint newuidval[2]; // UID validity obtained from driver uint finduid[2]; // TUID lookup makes sense only for UIDs >= this @@ -285,6 +284,8 @@ match_tuids( sync_vars_t *svars, int t, message_t *msgs ) srec->msg[t] = tmsg; ntmsg = tmsg->next; srec->uid[t] = tmsg->uid; + if (tmsg->uid == svars->maxuid[t] + 1) + svars->maxuid[t] = tmsg->uid; srec->status = 0; srec->tuid[0] = 0; } @@ -613,7 +614,7 @@ clean_strdup( const char *s ) } -#define JOURNAL_VERSION "3" +#define JOURNAL_VERSION "4" static int prepare_state( sync_vars_t *svars ) @@ -853,8 +854,6 @@ load_state( sync_vars_t *svars ) svars->maxxfuid = minwuid - 1; } - svars->newmaxuid[F] = svars->maxuid[F]; - svars->newmaxuid[N] = svars->maxuid[N]; int line = 0; if ((jfp = fopen( svars->jname, "r" ))) { if (!lock_state( svars )) @@ -884,17 +883,17 @@ load_state( sync_vars_t *svars ) uint t1, t2, t3; if ((c = buf[0]) == '#' ? (tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) : - c == 'S' || c == '!' ? + c == '!' ? (sscanf( buf + 2, "%u", &t1 ) != 1) : - c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '|' ? + c == 'N' || c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '|' ? (sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) : (sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3)) { error( "Error: malformed journal entry at %s:%d\n", svars->jname, line ); goto jbail; } - if (c == 'S') - svars->maxuid[t1] = svars->newmaxuid[t1]; + if (c == 'N') + svars->maxuid[t1] = t2; else if (c == 'F') svars->finduid[t1] = t2; else if (c == 'T') @@ -908,10 +907,6 @@ load_state( sync_vars_t *svars ) srec = nfcalloc( sizeof(*srec) ); srec->uid[F] = t1; srec->uid[N] = t2; - if (svars->newmaxuid[F] < t1) - svars->newmaxuid[F] = t1; - if (svars->newmaxuid[N] < t2) - svars->newmaxuid[N] = t2; debug( " new entry(%u,%u)\n", t1, t2 ); srec->status = S_PENDING; *svars->srecadd = srec; @@ -935,7 +930,8 @@ load_state( sync_vars_t *svars ) break; case '=': debug( "aborted\n" ); - svars->maxxfuid = srec->uid[F]; + if (svars->maxxfuid < srec->uid[F]) + svars->maxxfuid = srec->uid[F]; srec->status = S_DEAD; break; case '#': @@ -949,12 +945,16 @@ load_state( sync_vars_t *svars ) case '<': debug( "far side now %u\n", t3 ); srec->uid[F] = t3; + if (t3 == svars->maxuid[F] + 1) + svars->maxuid[F] = t3; srec->status &= ~S_PENDING; srec->tuid[0] = 0; break; case '>': debug( "near side now %u\n", t3 ); srec->uid[N] = t3; + if (t3 == svars->maxuid[N] + 1) + svars->maxuid[N] = t3; srec->status &= ~S_PENDING; srec->tuid[0] = 0; break; @@ -1575,22 +1575,26 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux for (t = 0; t < 2; t++) { debug( "synchronizing new messages on %s\n", str_fn[1-t] ); for (tmsg = svars->msgs[1-t]; tmsg; tmsg = tmsg->next) { - // If messages were previously ignored due to being excessive, they would now - // appear to be newer than the messages that got actually synced, so increment - // newmaxuid immediately to make sure we always look only at the newest ones. - // However, committing it to maxuid must be delayed until all messages were - // propagated, to ensure that all pending messages are still loaded next time - // in case of interruption - in particular skipping big messages would otherwise - // up the limit too early. srec = tmsg->srec; if (srec) { if (srec->status & S_SKIPPED) { // The message was skipped due to being too big. + // We must have already seen the UID, but we might have been interrupted. + if (svars->maxuid[1-t] < tmsg->uid) + svars->maxuid[1-t] = tmsg->uid; if (!(svars->chan->ops[t] & OP_RENEW)) continue; } else { if (!(svars->chan->ops[t] & OP_NEW)) continue; + // This catches messages: + // - that are actually new + // - whose propagation got interrupted + // - whose propagation was completed, but not logged yet + // - that aren't actually new, but a result of syncing, and the instant + // maxuid upping was prevented by the presence of actually new messages + if (svars->maxuid[1-t] < tmsg->uid) + svars->maxuid[1-t] = tmsg->uid; if (!(srec->status & S_PENDING)) continue; // Nothing to do - the message is paired or expired // Propagation was scheduled, but we got interrupted @@ -1606,13 +1610,14 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux } else { if (!(svars->chan->ops[t] & OP_NEW)) continue; - if (tmsg->uid <= svars->newmaxuid[1-t]) { + if (tmsg->uid <= svars->maxuid[1-t]) { // The message should be already paired. It's not, so it was: // - previously paired, but the entry was expired and pruned => ignore // - attempted, but failed => ignore (the wisdom of this is debatable) // - ignored, as it would have been expunged anyway => ignore (even if undeleted) continue; } + svars->maxuid[1-t] = tmsg->uid; debug( "new message %u\n", tmsg->uid ); if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) { @@ -1628,8 +1633,6 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux srec->uid[1-t] = tmsg->uid; srec->msg[1-t] = tmsg; tmsg->srec = srec; - if (svars->newmaxuid[1-t] < tmsg->uid) - svars->newmaxuid[1-t] = tmsg->uid; JLOG( "+ %u %u", (srec->uid[F], srec->uid[N]), "fresh" ); } if ((tmsg->flags & F_FLAGGED) || tmsg->size <= svars->chan->stores[t]->max_size) { @@ -1748,7 +1751,8 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux // If we have so many new messages that some of them are instantly expired, // but some are still propagated because they are important, we need to // ensure explicitly that the bulk fetch limit is upped. - svars->maxxfuid = srec->uid[F]; + if (svars->maxxfuid < srec->uid[F]) + svars->maxxfuid = srec->uid[F]; srec->msg[F]->srec = NULL; srec->status = S_DEAD; } @@ -1849,6 +1853,8 @@ msg_copied( int sts, uint uid, copy_vars_t *vars ) } else { JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], uid), "%sed message", str_hl[t] ); vars->srec->uid[t] = uid; + if (uid == svars->maxuid[t] + 1) + svars->maxuid[t] = uid; vars->srec->status &= ~S_PENDING; vars->srec->tuid[0] = 0; } @@ -1918,10 +1924,6 @@ msgs_copied( sync_vars_t *svars, int t ) if (svars->new_pending[t]) goto out; - if (svars->maxuid[1-t] != svars->newmaxuid[1-t]) { - svars->maxuid[1-t] = svars->newmaxuid[1-t]; - JLOG( "S %d", 1-t, "commit maxuid of %s", str_fn[1-t] ); - } sync_close( svars, 1-t ); if (check_cancel( svars )) goto out; @@ -2159,6 +2161,14 @@ box_closed_p2( sync_vars_t *svars, int t ) // the operations are idempotent, and we're about to commit the new state // right afterwards anyway. + for (t = 0; t < 2; t++) { + // Committing maxuid is delayed until all messages were propagated, to + // ensure that all pending messages are still loaded next time in case + // of interruption - in particular skipping big messages would otherwise + // up the limit too early. + JLOG( "N %d %u", (t, svars->maxuid[t]), "up maxuid of %s", str_fn[t] ); + } + if (((svars->state[F] | svars->state[N]) & ST_DID_EXPUNGE) || svars->chan->max_messages) { debug( "purging obsolete entries\n" ); for (srec = svars->srecs; srec; srec = srec->next) {