major overhaul of flag change propagation and MaxMessages handling:

- wrap message (un)expirations into transactions
- no redundand flag propagations in conjunction with expirations
- better prepared for the upcoming async operation
This commit is contained in:
Oswald Buddenhagen 2006-02-02 17:03:01 +00:00
parent ab11737b33
commit 19128f1587
2 changed files with 141 additions and 98 deletions

View File

@ -118,7 +118,6 @@ typedef struct group_conf {
#define M_RECENT (1<<0) /* unsyncable flag; maildir_* depend on this being 1<<0 */ #define M_RECENT (1<<0) /* unsyncable flag; maildir_* depend on this being 1<<0 */
#define M_DEAD (1<<1) /* expunged */ #define M_DEAD (1<<1) /* expunged */
#define M_FLAGS (1<<2) /* flags fetched */ #define M_FLAGS (1<<2) /* flags fetched */
#define M_EXPIRE (1<<3) /* kicked out by MaxMessages */
typedef struct message { typedef struct message {
struct message *next; struct message *next;

View File

@ -79,17 +79,21 @@ make_flags( int flags, char *buf )
} }
#define S_DEAD (1<<0) #define S_DEAD (1<<0)
#define S_EXPIRED (1<<1) #define S_DONE (1<<1)
#define S_DEL(ms) (1<<(2+(ms))) #define S_DEL(ms) (1<<(2+(ms)))
#define S_EXP_S (1<<4) #define S_EXPIRED (1<<4)
#define S_DONE (1<<6) #define S_EXPIRE (1<<5)
#define S_NEXPIRE (1<<6)
#define S_EXP_S (1<<7)
#define mvBit(in,ib,ob) ((unsigned char)(((unsigned)in) * (ob) / (ib)))
typedef struct sync_rec { typedef struct sync_rec {
struct sync_rec *next; struct sync_rec *next;
/* string_list_t *keywords; */ /* string_list_t *keywords; */
int uid[2]; int uid[2];
message_t *msg[2]; message_t *msg[2];
unsigned char flags, status; unsigned char status, flags, aflags[2], dflags[2];
} sync_rec_t; } sync_rec_t;
static void static void
@ -186,11 +190,11 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
char *dname, *jname, *nname, *lname, *s, *cmname, *csname; char *dname, *jname, *nname, *lname, *s, *cmname, *csname;
FILE *dfp, *jfp, *nfp; FILE *dfp, *jfp, *nfp;
int opts[2]; int opts[2];
int nom, nos, del[2], ex[2]; int nom, nos, del[2], ex[2], nex;
int muidval, suidval, smaxxuid, maxuid[2], minwuid, maxwuid; int muidval, suidval, smaxxuid, maxuid[2], minwuid, maxwuid;
int t1, t2, t3, t, uid, nmsgs; int t1, t2, t3, t, uid, nmsgs;
int lfd, ret, line, sline, todel, delt, *mexcs, nmexcs, rmexcs; int lfd, ret, line, sline, todel, *mexcs, nmexcs, rmexcs;
unsigned char nflags; unsigned char nflags, sflags, aflags, dflags;
msg_data_t msgdata; msg_data_t msgdata;
struct stat st; struct stat st;
struct flock lck; struct flock lck;
@ -291,7 +295,7 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
s = fbuf; s = fbuf;
if (*s == 'X') { if (*s == 'X') {
s++; s++;
srec->status = S_EXPIRED; srec->status = S_EXPIRE | S_EXPIRED;
} else } else
srec->status = 0; srec->status = 0;
srec->flags = parse_flags( s ); srec->flags = parse_flags( s );
@ -337,7 +341,7 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
} }
if (buf[0] == '(' || buf[0] == ')' ? if (buf[0] == '(' || buf[0] == ')' ?
(sscanf( buf + 2, "%d", &t1 ) != 1) : (sscanf( buf + 2, "%d", &t1 ) != 1) :
buf[0] == '-' || buf[0] == '|' ? buf[0] == '-' || buf[0] == '|' || buf[0] == '/' || buf[0] == '\\' ?
(sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) : (sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) :
(sscanf( buf + 2, "%d %d %d", &t1, &t2, &t3 ) != 3)) (sscanf( buf + 2, "%d %d %d", &t1, &t2, &t3 ) != 3))
{ {
@ -395,10 +399,26 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
srec->flags = t3; srec->flags = t3;
break; break;
case '~': case '~':
debug( "expired now %d\n", t3 ); debug( "expire now %d\n", t3 );
if (t3)
srec->status |= S_EXPIRE;
else
srec->status &= ~S_EXPIRE;
break;
case '\\':
t3 = (srec->status & S_EXPIRED);
debug( "expire back to %d\n", t3 / S_EXPIRED );
if (t3)
srec->status |= S_EXPIRE;
else
srec->status &= ~S_EXPIRE;
break;
case '/':
t3 = (srec->status & S_EXPIRE);
debug( "expired now %d\n", t3 / S_EXPIRE );
if (t3) { if (t3) {
if (smaxxuid < t2) if (smaxxuid < srec->uid[S])
smaxxuid = t2; smaxxuid = srec->uid[S];
srec->status |= S_EXPIRED; srec->status |= S_EXPIRED;
} else } else
srec->status &= ~S_EXPIRED; srec->status &= ~S_EXPIRED;
@ -455,6 +475,12 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
} }
if ((chan->ops[S] & (OP_NEW|OP_RENEW)) && chan->max_messages) if ((chan->ops[S] & (OP_NEW|OP_RENEW)) && chan->max_messages)
opts[S] |= OPEN_OLD|OPEN_NEW|OPEN_FLAGS; opts[S] |= OPEN_OLD|OPEN_NEW|OPEN_FLAGS;
if (line)
for (srec = recs; srec; srec = srec->next)
if (!(srec->status & S_DEAD) && ((mvBit(srec->status, S_EXPIRE, S_EXPIRED) ^ srec->status) & S_EXPIRED)) {
opts[S] |= OPEN_OLD|OPEN_FLAGS;
break;
}
driver[M]->prepare_opts( ctx[M], opts[M] ); driver[M]->prepare_opts( ctx[M], opts[M] );
driver[S]->prepare_opts( ctx[S], opts[S] ); driver[S]->prepare_opts( ctx[S], opts[S] );
@ -678,23 +704,18 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
} else { } else {
del[M] = nom && (srec->uid[M] > 0); del[M] = nom && (srec->uid[M] > 0);
del[S] = nos && (srec->uid[S] > 0); del[S] = nos && (srec->uid[S] > 0);
if (srec->msg[M] && (srec->msg[M]->flags & F_DELETED))
srec->status |= S_DEL(M);
if (srec->msg[S] && (srec->msg[S]->flags & F_DELETED))
srec->status |= S_DEL(S);
nflags = srec->flags;
for (t = 0; t < 2; t++) { for (t = 0; t < 2; t++) {
int unex; srec->aflags[t] = srec->dflags[t] = 0;
unsigned char sflags, aflags, dflags; if (srec->msg[t] && (srec->msg[t]->flags & F_DELETED))
srec->status |= S_DEL(t);
/* excludes (push) c.3) d.2) d.3) d.4) / (pull) b.3) d.7) d.8) d.9) */ /* excludes (push) c.3) d.2) d.3) d.4) / (pull) b.3) d.7) d.8) d.9) */
if (!srec->uid[t]) { if (!srec->uid[t]) {
/* b.1) / c.1) */ /* b.1) / c.1) */
debug( " no more %s\n", str_ms[t] ); debug( " no more %s\n", str_ms[t] );
} else if (del[1-t]) { } else if (del[1-t]) {
/* c.4) d.9) / b.4) d.4) */ /* c.4) d.9) / b.4) d.4) */
if (srec->msg[t] && srec->msg[t]->flags != nflags) if (srec->msg[t] && (srec->msg[t]->status & M_FLAGS) && srec->msg[t]->flags != srec->flags)
info( "Info: conflicting changes in (%d,%d)\n", srec->uid[M], srec->uid[S] ); info( "Info: conflicting changes in (%d,%d)\n", srec->uid[M], srec->uid[S] );
if (chan->ops[t] & OP_DELETE) { if (chan->ops[t] & OP_DELETE) {
debug( " %sing delete\n", str_hl[t] ); debug( " %sing delete\n", str_hl[t] );
@ -719,103 +740,126 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
/* a) & b.3) / c.3) */ /* a) & b.3) / c.3) */
if (chan->ops[t] & OP_FLAGS) { if (chan->ops[t] & OP_FLAGS) {
sflags = srec->msg[1-t]->flags; sflags = srec->msg[1-t]->flags;
aflags = sflags & ~nflags; if ((srec->status & (S_EXPIRE|S_EXPIRED)) && !t)
dflags = ~sflags & nflags; sflags &= ~F_DELETED;
unex = 0; srec->aflags[t] = sflags & ~srec->flags;
if (srec->status & S_EXPIRED) { srec->dflags[t] = ~sflags & srec->flags;
if (!t) {
if ((aflags & ~F_DELETED) || dflags)
info( "Info: Flags of expired message changed in (%d,%d)\n", srec->uid[M], srec->uid[S] );
continue;
} else {
if ((sflags & F_FLAGGED) && !(sflags & F_DELETED)) {
unex = 1;
dflags |= F_DELETED;
} else
continue;
}
}
if ((chan->ops[t] & OP_EXPUNGE) && (sflags & F_DELETED) &&
(!ctx[t]->conf->trash || ctx[t]->conf->trash_only_new))
{
aflags &= F_DELETED;
dflags = 0;
}
if (DFlags & DEBUG) { if (DFlags & DEBUG) {
char afbuf[16], dfbuf[16]; /* enlarge when support for keywords is added */ char afbuf[16], dfbuf[16]; /* enlarge when support for keywords is added */
make_flags( aflags, afbuf ); make_flags( srec->aflags[t], afbuf );
make_flags( dflags, dfbuf ); make_flags( srec->dflags[t], dfbuf );
debug( " %sing flags: +%s -%s\n", str_hl[t], afbuf, dfbuf ); debug( " %sing flags: +%s -%s\n", str_hl[t], afbuf, dfbuf );
} }
switch ((aflags | dflags) ? driver[t]->set_flags( ctx[t], srec->msg[t], srec->uid[t], aflags, dflags ) : DRV_OK) {
case DRV_STORE_BAD: ret = SYNC_BAD(t); goto finish;
case DRV_BOX_BAD: ret = SYNC_FAIL; goto finish;
default: /* ok */ break;
case DRV_OK:
if (aflags & F_DELETED)
srec->status |= S_DEL(t);
else if (dflags & F_DELETED)
srec->status &= ~S_DEL(t);
nflags = (nflags | aflags) & ~dflags;
if (unex) {
debug( "unexpiring pair(%d,%d)\n", srec->uid[M], srec->uid[S] );
/* log last, so deletion can't be misinterpreted! */
Fprintf( jfp, "~ %d %d 0\n", srec->uid[M], srec->uid[S] );
srec->status &= ~S_EXPIRED;
}
}
} else } else
debug( " not %sing flags\n", str_hl[t] ); debug( " not %sing flags\n", str_hl[t] );
} /* else b.4) / c.4) */ } /* else b.4) / c.4) */
} }
}
}
if ((chan->ops[S] & (OP_NEW|OP_RENEW|OP_FLAGS)) && chan->max_messages) {
/* Flagged and not yet synced messages older than the first not
* expired message are not counted. */
todel = ctx[S]->count - chan->max_messages;
debug( "scheduling %d excess messages for expiration\n", todel );
for (tmsg = ctx[S]->msgs; tmsg && todel > 0; tmsg = tmsg->next)
if (!(tmsg->status & M_DEAD) && (srec = tmsg->srec) &&
((tmsg->flags | srec->aflags[S]) & ~srec->dflags[S] & F_DELETED) &&
!(srec->status & (S_EXPIRE|S_EXPIRED)))
todel--;
debug( "%d non-deleted excess messages\n", todel );
for (tmsg = ctx[S]->msgs; tmsg; tmsg = tmsg->next) {
if (tmsg->status & M_DEAD)
continue;
if (!(srec = tmsg->srec) || srec->uid[M] <= 0)
todel--;
else {
nflags = (tmsg->flags | srec->aflags[S]) & ~srec->dflags[S];
if (!(nflags & F_DELETED) || (srec->status & (S_EXPIRE|S_EXPIRED))) {
if (nflags & F_FLAGGED)
todel--;
else if (!(tmsg->status & M_RECENT) &&
(todel > 0 ||
((srec->status & (S_EXPIRE|S_EXPIRED)) == (S_EXPIRE|S_EXPIRED)) ||
((srec->status & (S_EXPIRE|S_EXPIRED)) && (tmsg->flags & F_DELETED)))) {
srec->status |= S_NEXPIRE;
debug( " pair(%d,%d)\n", srec->uid[M], srec->uid[S] );
todel--;
}
}
}
}
debug( "%d excess messages remain\n", todel );
for (srec = recs; srec; srec = srec->next) {
if ((srec->status & (S_DEAD|S_DONE)) || !srec->msg[S])
continue;
nex = (srec->status / S_NEXPIRE) & 1;
if (nex != ((srec->status / S_EXPIRED) & 1)) {
if (nex != ((srec->status / S_EXPIRE) & 1)) {
Fprintf( jfp, "~ %d %d %d\n", srec->uid[M], srec->uid[S], nex );
debug( " pair(%d,%d): %d (pre)\n", srec->uid[M], srec->uid[S], nex );
srec->status = (srec->status & ~S_EXPIRE) | (nex * S_EXPIRE);
} else
debug( " pair(%d,%d): %d (pending)\n", srec->uid[M], srec->uid[S], nex );
}
}
}
debug( "synchronizing flags\n" );
for (srec = recs; srec != *osrecadd; srec = srec->next) {
if (srec->status & (S_DEAD|S_DONE))
continue;
for (t = 0; t < 2; t++) {
aflags = srec->aflags[t];
dflags = srec->dflags[t];
if (t && ((mvBit(srec->status, S_EXPIRE, S_EXPIRED) ^ srec->status) & S_EXPIRED)) {
if (srec->status & S_NEXPIRE)
aflags |= F_DELETED;
else
dflags |= F_DELETED;
}
if ((chan->ops[t] & OP_EXPUNGE) && (((srec->msg[t] ? srec->msg[t]->flags : 0) | aflags) & ~dflags & F_DELETED) &&
(!ctx[t]->conf->trash || ctx[t]->conf->trash_only_new))
{
srec->aflags[t] &= F_DELETED;
aflags &= F_DELETED;
srec->dflags[t] = dflags = 0;
}
if (srec->msg[t] && (srec->msg[t]->status & M_FLAGS)) {
aflags &= ~srec->msg[t]->flags;
dflags &= srec->msg[t]->flags;
}
switch ((aflags | dflags) ? driver[t]->set_flags( ctx[t], srec->msg[t], srec->uid[t], aflags, dflags ) : DRV_OK) {
case DRV_STORE_BAD: ret = SYNC_BAD(t); goto finish;
case DRV_BOX_BAD: ret = SYNC_FAIL; goto finish;
default: /* ok */ srec->aflags[t] = srec->dflags[t] = 0; break;
case DRV_OK:
if (aflags & F_DELETED)
srec->status |= S_DEL(t);
else if (dflags & F_DELETED)
srec->status &= ~S_DEL(t);
if (t) {
nex = (srec->status / S_NEXPIRE) & 1;
if (nex != ((srec->status / S_EXPIRED) & 1)) {
if (nex && (smaxxuid < srec->uid[S]))
smaxxuid = srec->uid[S];
Fprintf( jfp, "/ %d %d\n", srec->uid[M], srec->uid[S] );
debug( " pair(%d,%d): expired %d (commit)\n", srec->uid[M], srec->uid[S], nex );
srec->status = (srec->status & ~S_EXPIRED) | (nex * S_EXPIRED);
} else if (nex != ((srec->status / S_EXPIRE) & 1)) {
Fprintf( jfp, "\\ %d %d\n", srec->uid[M], srec->uid[S] );
debug( " pair(%d,%d): expire %d (cancel)\n", srec->uid[M], srec->uid[S], nex );
srec->status = (srec->status & ~S_EXPIRE) | (nex * S_EXPIRE);
}
}
}
}
nflags = (srec->flags | srec->aflags[M] | srec->aflags[S]) & ~(srec->dflags[M] | srec->dflags[S]);
if (srec->flags != nflags) { if (srec->flags != nflags) {
debug( " updating flags (%u -> %u)\n", srec->flags, nflags ); debug( " pair(%d,%d): updating flags (%u -> %u)\n", srec->uid[M], srec->uid[S], srec->flags, nflags );
srec->flags = nflags; srec->flags = nflags;
Fprintf( jfp, "* %d %d %u\n", srec->uid[M], srec->uid[S], nflags ); Fprintf( jfp, "* %d %d %u\n", srec->uid[M], srec->uid[S], nflags );
} }
} }
}
if ((chan->ops[S] & (OP_NEW|OP_RENEW)) && chan->max_messages) {
debug( "expiring excess entries\n" );
todel = ctx[S]->count - chan->max_messages;
for (tmsg = ctx[S]->msgs; tmsg && todel > 0; tmsg = tmsg->next)
if (!(tmsg->status & M_DEAD) && (tmsg->flags & F_DELETED))
todel--;
delt = 0;
for (tmsg = ctx[S]->msgs; tmsg && todel > 0; tmsg = tmsg->next) {
if ((tmsg->status & M_DEAD) || (tmsg->flags & F_DELETED))
continue;
if ((tmsg->flags & F_FLAGGED) || !tmsg->srec || tmsg->srec->uid[M] <= 0) /* add M_DESYNCED? */
todel--;
else if (!(tmsg->status & M_RECENT)) {
tmsg->status |= M_EXPIRE;
delt++;
todel--;
}
}
if (delt) {
for (srec = recs; srec; srec = srec->next) {
if (srec->status & (S_DEAD|S_EXPIRED))
continue;
if (srec->msg[S] && (srec->msg[S]->status & M_EXPIRE)) {
debug( " expiring pair(%d,%d)\n", srec->uid[M], srec->uid[S] );
/* log first, so deletion can't be misinterpreted! */
Fprintf( jfp, "~ %d %d 1\n", srec->uid[M], srec->uid[S] );
if (smaxxuid < srec->uid[S])
smaxxuid = srec->uid[S];
srec->status |= S_EXPIRED;
switch (driver[S]->set_flags( ctx[S], srec->msg[S], 0, F_DELETED, 0 )) {
case DRV_STORE_BAD: ret = SYNC_BAD(S); goto finish;
case DRV_BOX_BAD: ret = SYNC_FAIL; goto finish;
default: /* ok */ break;
case DRV_OK: srec->status |= S_DEL(S);
}
}
}
}
}
for (t = 0; t < 2; t++) { for (t = 0; t < 2; t++) {
ex[t] = 0; ex[t] = 0;