From b1842617f7700c56d3eede8ce6c5afdacf433fca Mon Sep 17 00:00:00 2001 From: Oswald Buddenhagen Date: Sat, 30 Nov 2013 13:03:12 +0100 Subject: [PATCH] make MaxMessages work for new mails as well this helps enormously on the first sync of a 100k message box with a limit of 1k messages. it also happens to make the syncing idempotent. in a few conditionals we now explicitly test for max_messages being enabled, not smaxxuid != 0, as after the initial fetch with no important messages smaxxuid is zero, but we still have to skip over 99k messages in the above case. --- src/run-tests.pl | 24 +++-------- src/sync.c | 109 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 95 insertions(+), 38 deletions(-) diff --git a/src/run-tests.pl b/src/run-tests.pl index 42cd4b1..7210cb3 100755 --- a/src/run-tests.pl +++ b/src/run-tests.pl @@ -174,7 +174,7 @@ test("slave max size", \@X11, \@X22, @O22); my @x30 = ( [ 0, - 1, 0, "F", 2, 0, "S", 3, 0, "S", 4, 0, "", 5, 0, "" ], + 1, 0, "F", 2, 0, "", 3, 0, "S", 4, 0, "", 5, 0, "S", 6, 0, "" ], [ 0, ], [ 0, 0, 0, @@ -184,26 +184,16 @@ my @x30 = ( my @O31 = ("", "", "MaxMessages 3\n"); #show("30", "31", "31"); my @X31 = ( + [ 6, + 1, 1, "F", 2, 2, "", 3, 3, "S", 4, 4, "", 5, 5, "S", 6, 6, "" ], [ 5, - 1, 1, "F", 2, 2, "S", 3, 3, "S", 4, 4, "", 5, 5, "" ], - [ 5, - 1, 1, "F", 2, 2, "S", 3, 3, "S", 4, 4, "", 5, 5, "" ], - [ 5, 0, 0, - 1, 1, "F", 2, 2, "S", 3, 3, "S", 4, 4, "", 5, 5, "" ], + 1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ], + [ 6, 2, 0, + 1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ], ); test("max messages", \@x30, \@X31, @O31); -my @O41 = ("", "", "MaxMessages 3\nExpunge Both\n"); -#show("40", "41", "41"); -my @X41 = ( - [ 5, - 1, 1, "F", 2, 2, "S", 3, 3, "S", 4, 4, "", 5, 5, "" ], - [ 5, - 1, 1, "F", 3, 3, "S", 4, 4, "", 5, 5, "" ], - [ 5, 2, 0, - 1, 1, "F", 3, 3, "S", 4, 4, "", 5, 5, "" ], -); -test("max messages catch-up", \@X31, \@X41, @O41); +test("max messages verification", \@X31, \@X31, @O31); my @x50 = ( [ 6, diff --git a/src/sync.c b/src/sync.c index 24a3bb6..4cbee88 100644 --- a/src/sync.c +++ b/src/sync.c @@ -155,8 +155,10 @@ typedef struct { int flags_total[2], flags_done[2]; int trash_total[2], trash_done[2]; int maxuid[2]; /* highest UID that was already propagated */ + int newmaxuid[2]; /* highest UID that is currently being propagated */ int uidval[2]; /* UID validity value */ int newuid[2]; /* TUID lookup makes sense only for UIDs >= this */ + int mmaxxuid; /* highest expired UID on master during new message propagation */ int smaxxuid; /* highest expired UID on slave */ } sync_vars_t; @@ -806,6 +808,9 @@ box_selected( int sts, void *aux ) goto bail; } } + svars->newmaxuid[M] = svars->maxuid[M]; + svars->newmaxuid[S] = svars->maxuid[S]; + svars->mmaxxuid = INT_MAX; line = 0; if ((jfp = fopen( svars->jname, "r" ))) { if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp )) { @@ -829,7 +834,7 @@ box_selected( int sts, void *aux ) } if (buf[0] == '#' ? (t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) : - buf[0] == '(' || buf[0] == ')' || buf[0] == '{' || buf[0] == '}' ? + buf[0] == '(' || buf[0] == ')' || buf[0] == '{' || buf[0] == '}' || buf[0] == '!' ? (sscanf( buf + 2, "%d", &t1 ) != 1) : buf[0] == '+' || buf[0] == '&' || buf[0] == '-' || buf[0] == '|' || buf[0] == '/' || buf[0] == '\\' ? (sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) : @@ -846,6 +851,8 @@ box_selected( int sts, void *aux ) svars->newuid[M] = t1; else if (buf[0] == '}') svars->newuid[S] = t1; + else if (buf[0] == '!') + svars->smaxxuid = t1; else if (buf[0] == '|') { svars->uidval[M] = t1; svars->uidval[S] = t2; @@ -853,6 +860,10 @@ box_selected( int sts, void *aux ) srec = nfmalloc( sizeof(*srec) ); srec->uid[M] = t1; srec->uid[S] = t2; + if (svars->newmaxuid[M] < t1) + svars->newmaxuid[M] = t1; + if (svars->newmaxuid[S] < t2) + svars->newmaxuid[S] = t2; debug( " new entry(%d,%d)\n", t1, t2 ); srec->msg[M] = srec->msg[S] = 0; srec->status = 0; @@ -876,6 +887,8 @@ box_selected( int sts, void *aux ) switch (buf[0]) { case '-': debug( "killed\n" ); + if (srec->msg[M]) + srec->msg[M]->srec = 0; srec->status = S_DEAD; break; case '#': @@ -1014,7 +1027,7 @@ box_selected( int sts, void *aux ) mexcs = 0; nmexcs = rmexcs = 0; if (svars->ctx[M]->opts & OPEN_OLD) { - if (svars->smaxxuid) { + if (chan->max_messages) { /* When messages have been expired on the slave, the master fetch is split into * two ranges: The bulk fetch which corresponds with the most recent messages, and an * exception list of messages which would have been expired if they weren't important. */ @@ -1198,10 +1211,14 @@ box_loaded( int sts, void *aux ) * - message is old (> 0) or expired (0) => ignore * - message was skipped (-1) => ReNew * - message was attempted, but failed (-2) => New - * If new have no srec, the message is always New. */ + * If new have no srec, the message is always New. If messages were previously ignored + * due to being excessive, they would now appear to be newer than the messages that + * got actually synced, so make sure to look only at the newest ones. As some messages + * may be already propagated before an interruption, and maxuid logging is delayed, + * we need to track the newmaxuid separately. */ srec = tmsg->srec; if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & (srec->uid[t] == -1 ? OP_RENEW : OP_NEW)) - : (svars->chan->ops[t] & OP_NEW)) { + : svars->newmaxuid[1-t] < tmsg->uid && (svars->chan->ops[t] & OP_NEW)) { debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] ); if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) debug( " -> not %sing - would be expunged anyway\n", str_hl[t] ); @@ -1220,7 +1237,11 @@ box_loaded( int sts, void *aux ) srec->tuid[0] = 0; srec->uid[1-t] = tmsg->uid; srec->uid[t] = -2; + srec->msg[1-t] = tmsg; + srec->msg[t] = 0; tmsg->srec = srec; + if (svars->newmaxuid[1-t] < tmsg->uid) + svars->newmaxuid[1-t] = tmsg->uid; Fprintf( svars->jfp, "+ %d %d\n", srec->uid[M], srec->uid[S] ); debug( " -> pair(%d,%d) created\n", srec->uid[M], srec->uid[S] ); } @@ -1346,6 +1367,10 @@ box_loaded( int sts, void *aux ) alive++; } } + for (tmsg = svars->ctx[M]->msgs; tmsg; tmsg = tmsg->next) { + if ((srec = tmsg->srec) && srec->tuid[0] && !(tmsg->flags & F_DELETED)) + alive++; + } todel = alive - svars->chan->max_messages; debug( "%d alive messages, %d excess - expiring\n", alive, todel ); for (tmsg = svars->ctx[S]->msgs; tmsg; tmsg = tmsg->next) { @@ -1353,6 +1378,7 @@ box_loaded( int sts, void *aux ) continue; if (!(srec = tmsg->srec) || srec->uid[M] <= 0) { /* We did not push the message, so it must be kept. */ + debug( " old pair(%d,%d) unpropagated\n", srec->uid[M], srec->uid[S] ); todel--; } else { nflags = (tmsg->flags | srec->aflags[S]) & ~srec->dflags[S]; @@ -1360,13 +1386,32 @@ box_loaded( int sts, void *aux ) /* The message is not deleted, or is already (being) expired. */ if ((nflags & F_FLAGGED) || !(nflags & F_SEEN)) { /* Important messages are always kept. */ + debug( " old pair(%d,%d) important\n", srec->uid[M], srec->uid[S] ); todel--; } else if (todel > 0 || ((srec->status & (S_EXPIRE|S_EXPIRED)) == (S_EXPIRE|S_EXPIRED)) || ((srec->status & (S_EXPIRE|S_EXPIRED)) && (tmsg->flags & F_DELETED))) { /* The message is excess or was already (being) expired. */ srec->status |= S_NEXPIRE; - debug( " pair(%d,%d)\n", srec->uid[M], srec->uid[S] ); + debug( " old pair(%d,%d) expired\n", srec->uid[M], srec->uid[S] ); + todel--; + } + } + } + } + for (tmsg = svars->ctx[M]->msgs; tmsg; tmsg = tmsg->next) { + if ((srec = tmsg->srec) && srec->tuid[0]) { + nflags = tmsg->flags; + if (!(nflags & F_DELETED)) { + if ((nflags & F_FLAGGED) || !(nflags & F_SEEN)) { + /* Important messages are always fetched. */ + debug( " new pair(%d,%d) important\n", srec->uid[M], srec->uid[S] ); + todel--; + } else if (todel > 0) { + /* The message is excess. */ + srec->status |= S_NEXPIRE; + debug( " new pair(%d,%d) expired\n", srec->uid[M], srec->uid[S] ); + svars->mmaxxuid = srec->uid[M]; todel--; } } @@ -1374,23 +1419,34 @@ box_loaded( int sts, void *aux ) } debug( "%d excess messages remain\n", todel ); for (srec = svars->srecs; srec; srec = srec->next) { - if ((srec->status & (S_DEAD|S_DONE)) || !srec->msg[S]) + if (srec->status & S_DEAD) continue; - nex = (srec->status / S_NEXPIRE) & 1; - if (nex != ((srec->status / S_EXPIRED) & 1)) { - /* The record needs a state change ... */ - if (nex != ((srec->status / S_EXPIRE) & 1)) { - /* ... and we need to start a transaction. */ - Fprintf( svars->jfp, "~ %d %d %d\n", srec->uid[M], srec->uid[S], nex ); - debug( " pair(%d,%d): %d (pre)\n", srec->uid[M], srec->uid[S], nex ); - srec->status = (srec->status & ~S_EXPIRE) | (nex * S_EXPIRE); + if (!srec->tuid[0]) { + if (!srec->msg[S]) + continue; + nex = (srec->status / S_NEXPIRE) & 1; + if (nex != ((srec->status / S_EXPIRED) & 1)) { + /* The record needs a state change ... */ + if (nex != ((srec->status / S_EXPIRE) & 1)) { + /* ... and we need to start a transaction. */ + Fprintf( svars->jfp, "~ %d %d %d\n", srec->uid[M], srec->uid[S], nex ); + debug( " pair(%d,%d): %d (pre)\n", srec->uid[M], srec->uid[S], nex ); + srec->status = (srec->status & ~S_EXPIRE) | (nex * S_EXPIRE); + } else { + /* ... but the "right" transaction is already pending. */ + debug( " pair(%d,%d): %d (pending)\n", srec->uid[M], srec->uid[S], nex ); + } } else { - /* ... but the "right" transaction is already pending. */ - debug( " pair(%d,%d): %d (pending)\n", srec->uid[M], srec->uid[S], nex ); + /* Note: the "wrong" transaction may be pending here, + * e.g.: S_NEXPIRE = 0, S_EXPIRE = 1, S_EXPIRED = 0. */ } } else { - /* Note: the "wrong" transaction may be pending here, - * e.g.: S_NEXPIRE = 0, S_EXPIRE = 1, S_EXPIRED = 0. */ + if (srec->status & S_NEXPIRE) { + Fprintf( svars->jfp, "- %d %d\n", srec->uid[M], srec->uid[S] ); + debug( " pair(%d,%d): 1 (abort)\n", srec->uid[M], srec->uid[S] ); + srec->msg[M]->srec = 0; + srec->status = S_DEAD; + } } } } @@ -1515,6 +1571,16 @@ msg_copied_p2( sync_vars_t *svars, sync_rec_t *srec, int t, int uid ) srec->uid[t] = uid; srec->tuid[0] = 0; } + if (t == S && svars->mmaxxuid < srec->uid[M]) { + /* If we have so many new messages that some of them are instantly expired, + * but some are still propagated because they are important, we need to + * ensure explicitly that the bulk fetch limit is upped. */ + svars->mmaxxuid = INT_MAX; + if (svars->smaxxuid < srec->uid[S] - 1) { + svars->smaxxuid = srec->uid[S] - 1; + Fprintf( svars->jfp, "! %d\n", svars->smaxxuid ); + } + } } static void msgs_found_new( int sts, void *aux ); @@ -1735,13 +1801,14 @@ box_closed_p2( sync_vars_t *svars, int t ) if (!(svars->state[1-t] & ST_CLOSED)) return; - if (((svars->state[M] | svars->state[S]) & ST_DID_EXPUNGE) || svars->smaxxuid) { + if (((svars->state[M] | svars->state[S]) & ST_DID_EXPUNGE) || svars->chan->max_messages) { /* This cleanup is not strictly necessary, as the next full sync would throw out the dead entries anyway. But ... */ + debug( "purging obsolete entries\n" ); minwuid = INT_MAX; - if (svars->smaxxuid) { - debug( "preparing entry purge - max expired slave uid is %d\n", svars->smaxxuid ); + if (svars->chan->max_messages) { + debug( " max expired slave uid is %d\n", svars->smaxxuid ); for (srec = svars->srecs; srec; srec = srec->next) { if (srec->status & S_DEAD) continue;