rework maxuid tracking yet again
re-introduce newmaxuid, but now it's not used at all until the state is committed. this simplifies the new-message loop, esp. in view of a soon significantly increased number of branches in it.
This commit is contained in:
parent
2f4b71c56e
commit
be9625725c
|
@ -939,7 +939,9 @@ sub test($$$$)
|
||||||
# Generic syncing tests
|
# Generic syncing tests
|
||||||
|
|
||||||
my @x01 = (
|
my @x01 = (
|
||||||
I, 0, 0,
|
I, 0, I,
|
||||||
|
R, "*", "", "", # Skipped/failed messages to prevent maxuid topping
|
||||||
|
S, "", "", "*",
|
||||||
A, "*F", "*", "*",
|
A, "*F", "*", "*",
|
||||||
B, "*", "*", "*F",
|
B, "*", "*", "*F",
|
||||||
C, "*FS", "*", "*F",
|
C, "*FS", "*", "*F",
|
||||||
|
@ -999,7 +1001,7 @@ test("full + expunge near side", \@x01, \@X03, \@O03);
|
||||||
|
|
||||||
my @O04 = ("", "", "Sync Pull\n");
|
my @O04 = ("", "", "Sync Pull\n");
|
||||||
my @X04 = (
|
my @X04 = (
|
||||||
K, 0, 0,
|
K, 0, I,
|
||||||
A, "", "+F", "+F",
|
A, "", "+F", "+F",
|
||||||
C, "", "+FS", "+S",
|
C, "", "+FS", "+S",
|
||||||
E, "", "+T", "+T",
|
E, "", "+T", "+T",
|
||||||
|
@ -1011,7 +1013,7 @@ test("pull", \@x01, \@X04, \@O04);
|
||||||
|
|
||||||
my @O05 = ("", "", "Sync Flags\n");
|
my @O05 = ("", "", "Sync Flags\n");
|
||||||
my @X05 = (
|
my @X05 = (
|
||||||
I, 0, 0,
|
I, 0, I,
|
||||||
A, "", "+F", "+F",
|
A, "", "+F", "+F",
|
||||||
B, "+F", "+F", "",
|
B, "+F", "+F", "",
|
||||||
C, "", "+FS", "+S",
|
C, "", "+FS", "+S",
|
||||||
|
@ -1022,7 +1024,7 @@ test("flags", \@x01, \@X05, \@O05);
|
||||||
|
|
||||||
my @O06 = ("", "", "Sync Delete\n");
|
my @O06 = ("", "", "Sync Delete\n");
|
||||||
my @X06 = (
|
my @X06 = (
|
||||||
I, 0, 0,
|
I, 0, I,
|
||||||
G, "+T", ">", "",
|
G, "+T", ">", "",
|
||||||
I, "", "<", "+T",
|
I, "", "<", "+T",
|
||||||
);
|
);
|
||||||
|
@ -1038,7 +1040,7 @@ test("new", \@x01, \@X07, \@O07);
|
||||||
|
|
||||||
my @O08 = ("", "", "Sync PushFlags PullDelete\n");
|
my @O08 = ("", "", "Sync PushFlags PullDelete\n");
|
||||||
my @X08 = (
|
my @X08 = (
|
||||||
I, 0, 0,
|
I, 0, I,
|
||||||
B, "+F", "+F", "",
|
B, "+F", "+F", "",
|
||||||
C, "", "+F", "",
|
C, "", "+F", "",
|
||||||
I, "", "<", "+T",
|
I, "", "<", "+T",
|
||||||
|
@ -1162,6 +1164,30 @@ my @X38 = (
|
||||||
);
|
);
|
||||||
test("max messages + expunge", \@x38, \@X38, \@O38);
|
test("max messages + expunge", \@x38, \@X38, \@O38);
|
||||||
|
|
||||||
|
# Test for legacy/tampered states with inaccurate maxuid tracking
|
||||||
|
|
||||||
|
# Joined post-push & post-pull state to have just one test -
|
||||||
|
# there is no way how this could have occurred naturally.
|
||||||
|
my @x60 = (
|
||||||
|
0, C, 0,
|
||||||
|
A, "*S", "", "_",
|
||||||
|
B, "*FS", "*FS", "*FS",
|
||||||
|
C, "*S", "", "_",
|
||||||
|
D, "*", "*", "*",
|
||||||
|
E, "*", "*", "*",
|
||||||
|
F, "*", "", "",
|
||||||
|
G, "*", "", "",
|
||||||
|
H, "", "", "*",
|
||||||
|
I, "", "", "_",
|
||||||
|
J, "*", "*", "*",
|
||||||
|
);
|
||||||
|
|
||||||
|
my @O61 = ("", "", "Sync Flags\n"); # Need to fetch old messages
|
||||||
|
my @X61 = (
|
||||||
|
E, C, E,
|
||||||
|
);
|
||||||
|
test("maxuid topping", \@x60, \@X61, \@O61);
|
||||||
|
|
||||||
# Trashing
|
# Trashing
|
||||||
|
|
||||||
my @x10 = (
|
my @x10 = (
|
||||||
|
|
55
src/sync.c
55
src/sync.c
|
@ -968,8 +968,8 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||||
JLOG( "| %u %u", (svars->uidval[F], svars->uidval[N]), "new UIDVALIDITYs" );
|
JLOG( "| %u %u", (svars->uidval[F], svars->uidval[N]), "new UIDVALIDITYs" );
|
||||||
}
|
}
|
||||||
|
|
||||||
svars->oldmaxuid[F] = svars->maxuid[F];
|
svars->oldmaxuid[F] = svars->newmaxuid[F];
|
||||||
svars->oldmaxuid[N] = svars->maxuid[N];
|
svars->oldmaxuid[N] = svars->newmaxuid[N];
|
||||||
|
|
||||||
info( "Synchronizing...\n" );
|
info( "Synchronizing...\n" );
|
||||||
for (t = 0; t < 2; t++)
|
for (t = 0; t < 2; t++)
|
||||||
|
@ -1100,16 +1100,22 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||||
|
|
||||||
for (t = 0; t < 2; t++) {
|
for (t = 0; t < 2; t++) {
|
||||||
debug( "synchronizing new messages on %s\n", str_fn[t^1] );
|
debug( "synchronizing new messages on %s\n", str_fn[t^1] );
|
||||||
|
int topping = 1;
|
||||||
for (tmsg = svars->msgs[t^1]; tmsg; tmsg = tmsg->next) {
|
for (tmsg = svars->msgs[t^1]; tmsg; tmsg = tmsg->next) {
|
||||||
if (tmsg->status & M_DEAD)
|
if (tmsg->status & M_DEAD)
|
||||||
continue;
|
continue;
|
||||||
srec = tmsg->srec;
|
srec = tmsg->srec;
|
||||||
if (srec) {
|
if (srec) {
|
||||||
|
// This covers legacy (or somehow corrupted) state files which
|
||||||
|
// failed to track maxuid properly.
|
||||||
|
// Note that this doesn't work in the presence of skipped or
|
||||||
|
// failed messages. We could start keeping zombie entries, but
|
||||||
|
// this wouldn't help with legacy state files.
|
||||||
|
if (topping && svars->newmaxuid[t^1] < tmsg->uid)
|
||||||
|
svars->newmaxuid[t^1] = tmsg->uid;
|
||||||
|
|
||||||
if (srec->status & S_SKIPPED) {
|
if (srec->status & S_SKIPPED) {
|
||||||
// Pre-1.4 legacy only: The message was skipped due to being too big.
|
// Pre-1.4 legacy only: The message was skipped due to being too big.
|
||||||
// We must have already seen the UID, but we might have been interrupted.
|
|
||||||
if (svars->maxuid[t^1] < tmsg->uid)
|
|
||||||
svars->maxuid[t^1] = tmsg->uid;
|
|
||||||
if (!(svars->chan->ops[t] & OP_RENEW))
|
if (!(svars->chan->ops[t] & OP_RENEW))
|
||||||
continue;
|
continue;
|
||||||
srec->status = S_PENDING;
|
srec->status = S_PENDING;
|
||||||
|
@ -1124,27 +1130,18 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||||
} else {
|
} else {
|
||||||
if (!(svars->chan->ops[t] & OP_NEW))
|
if (!(svars->chan->ops[t] & OP_NEW))
|
||||||
continue;
|
continue;
|
||||||
// This catches messages:
|
|
||||||
// - that are actually new
|
|
||||||
// - whose propagation got interrupted
|
|
||||||
// - whose propagation was completed, but not logged yet
|
|
||||||
// - that aren't actually new, but a result of syncing, and the instant
|
|
||||||
// maxuid upping was prevented by the presence of actually new messages
|
|
||||||
if (svars->maxuid[t^1] < tmsg->uid)
|
|
||||||
svars->maxuid[t^1] = tmsg->uid;
|
|
||||||
if (!(srec->status & S_PENDING))
|
if (!(srec->status & S_PENDING))
|
||||||
continue; // Nothing to do - the message is paired or expired
|
continue; // Nothing to do - the message is paired or expired
|
||||||
// Propagation was scheduled, but we got interrupted
|
// Propagation was scheduled, but we got interrupted
|
||||||
debug( "unpropagated old message %u\n", tmsg->uid );
|
debug( "unpropagated old message %u\n", tmsg->uid );
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) {
|
|
||||||
JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing - would be expunged anyway" );
|
|
||||||
tmsg->srec = NULL;
|
|
||||||
srec->status = S_DEAD;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
|
// The 1st unknown message which should be known marks the end
|
||||||
|
// of the synced range; more known messages may follow (from an
|
||||||
|
// unidirectional sync in the opposite direction).
|
||||||
|
if (t == F || tmsg->uid > svars->maxxfuid)
|
||||||
|
topping = 0;
|
||||||
|
|
||||||
if (!(svars->chan->ops[t] & OP_NEW))
|
if (!(svars->chan->ops[t] & OP_NEW))
|
||||||
continue;
|
continue;
|
||||||
if (tmsg->uid <= svars->maxuid[t^1]) {
|
if (tmsg->uid <= svars->maxuid[t^1]) {
|
||||||
|
@ -1154,14 +1151,8 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||||
// - ignored, as it would have been expunged anyway => ignore (even if undeleted)
|
// - ignored, as it would have been expunged anyway => ignore (even if undeleted)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
svars->maxuid[t^1] = tmsg->uid;
|
|
||||||
debug( "new message %u\n", tmsg->uid );
|
debug( "new message %u\n", tmsg->uid );
|
||||||
|
|
||||||
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) {
|
|
||||||
debug( "-> ignoring - would be expunged anyway\n" );
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
srec = nfzalloc( sizeof(*srec) );
|
srec = nfzalloc( sizeof(*srec) );
|
||||||
*svars->srecadd = srec;
|
*svars->srecadd = srec;
|
||||||
svars->srecadd = &srec->next;
|
svars->srecadd = &srec->next;
|
||||||
|
@ -1170,8 +1161,19 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||||
srec->uid[t^1] = tmsg->uid;
|
srec->uid[t^1] = tmsg->uid;
|
||||||
srec->msg[t^1] = tmsg;
|
srec->msg[t^1] = tmsg;
|
||||||
tmsg->srec = srec;
|
tmsg->srec = srec;
|
||||||
|
if (svars->newmaxuid[t^1] < tmsg->uid)
|
||||||
|
svars->newmaxuid[t^1] = tmsg->uid;
|
||||||
JLOG( "+ %u %u", (srec->uid[F], srec->uid[N]), "fresh" );
|
JLOG( "+ %u %u", (srec->uid[F], srec->uid[N]), "fresh" );
|
||||||
}
|
}
|
||||||
|
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) {
|
||||||
|
// Yes, we may nuke fresh entries, created only for newmaxuid tracking.
|
||||||
|
// It would be lighter on the journal to log a (compressed) skip, but
|
||||||
|
// this rare case does not justify additional complexity.
|
||||||
|
JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing - would be expunged anyway" );
|
||||||
|
tmsg->srec = NULL;
|
||||||
|
srec->status = S_DEAD;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (!(tmsg->flags & F_FLAGGED) && tmsg->size > svars->chan->stores[t]->max_size &&
|
if (!(tmsg->flags & F_FLAGGED) && tmsg->size > svars->chan->stores[t]->max_size &&
|
||||||
!(srec->status & (S_DUMMY(F) | S_DUMMY(N) | S_UPGRADE))) {
|
!(srec->status & (S_DUMMY(F) | S_DUMMY(N) | S_UPGRADE))) {
|
||||||
srec->status |= S_DUMMY(t);
|
srec->status |= S_DUMMY(t);
|
||||||
|
@ -1760,6 +1762,7 @@ box_closed_p2( sync_vars_t *svars, int t )
|
||||||
// ensure that all pending messages are still loaded next time in case
|
// ensure that all pending messages are still loaded next time in case
|
||||||
// of interruption - in particular skipping messages would otherwise
|
// of interruption - in particular skipping messages would otherwise
|
||||||
// up the limit too early.
|
// up the limit too early.
|
||||||
|
svars->maxuid[t] = svars->newmaxuid[t];
|
||||||
if (svars->maxuid[t] != svars->oldmaxuid[t])
|
if (svars->maxuid[t] != svars->oldmaxuid[t])
|
||||||
PC_JLOG( "N %d %u", (t, svars->maxuid[t]), "up maxuid of %s", str_fn[t] );
|
PC_JLOG( "N %d %u", (t, svars->maxuid[t]), "up maxuid of %s", str_fn[t] );
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,7 @@ typedef struct {
|
||||||
uint new_pending[2], flags_pending[2], trash_pending[2];
|
uint new_pending[2], flags_pending[2], trash_pending[2];
|
||||||
uint maxuid[2]; // highest UID that was already propagated
|
uint maxuid[2]; // highest UID that was already propagated
|
||||||
uint oldmaxuid[2]; // highest UID that was already propagated before this run
|
uint oldmaxuid[2]; // highest UID that was already propagated before this run
|
||||||
|
uint newmaxuid[2]; // highest UID that is currently being propagated
|
||||||
uint uidval[2]; // UID validity value
|
uint uidval[2]; // UID validity value
|
||||||
uint newuidval[2]; // UID validity obtained from driver
|
uint newuidval[2]; // UID validity obtained from driver
|
||||||
uint finduid[2]; // TUID lookup makes sense only for UIDs >= this
|
uint finduid[2]; // TUID lookup makes sense only for UIDs >= this
|
||||||
|
|
|
@ -255,6 +255,8 @@ load_state( sync_vars_t *svars )
|
||||||
svars->maxxfuid = minwuid - 1;
|
svars->maxxfuid = minwuid - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
svars->newmaxuid[F] = svars->maxuid[F];
|
||||||
|
svars->newmaxuid[N] = svars->maxuid[N];
|
||||||
int line = 0;
|
int line = 0;
|
||||||
if ((jfp = fopen( svars->jname, "r" ))) {
|
if ((jfp = fopen( svars->jname, "r" ))) {
|
||||||
if (!lock_state( svars ))
|
if (!lock_state( svars ))
|
||||||
|
@ -319,7 +321,7 @@ load_state( sync_vars_t *svars )
|
||||||
goto jbail;
|
goto jbail;
|
||||||
}
|
}
|
||||||
if (c == 'N') {
|
if (c == 'N') {
|
||||||
svars->maxuid[t1] = t2;
|
svars->maxuid[t1] = svars->newmaxuid[t1] = t2;
|
||||||
debug( " maxuid of %s now %u\n", str_fn[t1], t2 );
|
debug( " maxuid of %s now %u\n", str_fn[t1], t2 );
|
||||||
} else if (c == 'F') {
|
} else if (c == 'F') {
|
||||||
svars->finduid[t1] = t2;
|
svars->finduid[t1] = t2;
|
||||||
|
@ -335,6 +337,10 @@ load_state( sync_vars_t *svars )
|
||||||
srec = nfzalloc( sizeof(*srec) );
|
srec = nfzalloc( sizeof(*srec) );
|
||||||
srec->uid[F] = t1;
|
srec->uid[F] = t1;
|
||||||
srec->uid[N] = t2;
|
srec->uid[N] = t2;
|
||||||
|
if (svars->newmaxuid[F] < t1)
|
||||||
|
svars->newmaxuid[F] = t1;
|
||||||
|
if (svars->newmaxuid[N] < t2)
|
||||||
|
svars->newmaxuid[N] = t2;
|
||||||
debug( " new entry(%u,%u)\n", t1, t2 );
|
debug( " new entry(%u,%u)\n", t1, t2 );
|
||||||
srec->status = S_PENDING;
|
srec->status = S_PENDING;
|
||||||
*svars->srecadd = srec;
|
*svars->srecadd = srec;
|
||||||
|
@ -508,8 +514,8 @@ void
|
||||||
assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid )
|
assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid )
|
||||||
{
|
{
|
||||||
srec->uid[t] = uid;
|
srec->uid[t] = uid;
|
||||||
if (uid == svars->maxuid[t] + 1)
|
if (uid == svars->newmaxuid[t] + 1)
|
||||||
svars->maxuid[t] = uid;
|
svars->newmaxuid[t] = uid;
|
||||||
srec->status &= ~(S_PENDING | S_UPGRADE);
|
srec->status &= ~(S_PENDING | S_UPGRADE);
|
||||||
srec->tuid[0] = 0;
|
srec->tuid[0] = 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user