move handling of new messages after that of old ones

i.e., move it back. whatever the original reason was, it's now gone.

this order is way more natural, which allows us to remove the osrecadd
and S_DONE hacks.
This commit is contained in:
Oswald Buddenhagen 2013-11-24 18:26:11 +01:00
parent fe3d19b7eb
commit 49a32910a7

View File

@ -95,7 +95,6 @@ make_flags( int flags, char *buf )
#define S_DEAD (1<<0) /* ephemeral: the entry was killed and should be ignored */ #define S_DEAD (1<<0) /* ephemeral: the entry was killed and should be ignored */
#define S_DONE (1<<1) /* ephemeral: the entry was already synced */
#define S_DEL(ms) (1<<(2+(ms))) /* ephemeral: m/s message would be subject to expunge */ #define S_DEL(ms) (1<<(2+(ms))) /* ephemeral: m/s message would be subject to expunge */
#define S_EXPIRED (1<<4) /* the entry is expired (slave message removal confirmed) */ #define S_EXPIRED (1<<4) /* the entry is expired (slave message removal confirmed) */
#define S_EXPIRE (1<<5) /* the entry is being expired (slave message removal scheduled) */ #define S_EXPIRE (1<<5) /* the entry is being expired (slave message removal scheduled) */
@ -146,7 +145,7 @@ typedef struct {
void (*cb)( int sts, void *aux ), *aux; void (*cb)( int sts, void *aux ), *aux;
char *dname, *jname, *nname, *lname; char *dname, *jname, *nname, *lname;
FILE *jfp, *nfp; FILE *jfp, *nfp;
sync_rec_t *srecs, **srecadd, **osrecadd; sync_rec_t *srecs, **srecadd;
channel_conf_t *chan; channel_conf_t *chan;
store_t *ctx[2]; store_t *ctx[2];
driver_t *drv[2]; driver_t *drv[2];
@ -1203,84 +1202,9 @@ box_loaded( int sts, void *aux )
info( "Synchronizing...\n" ); info( "Synchronizing...\n" );
debug( "synchronizing new entries\n" );
svars->osrecadd = svars->srecadd;
for (t = 0; t < 2; t++) {
for (tmsg = svars->ctx[1-t]->msgs; tmsg; tmsg = tmsg->next) {
/* If we have a srec:
* - message is old (> 0) or expired (0) => ignore
* - message was skipped (-1) => ReNew
* - message was attempted, but failed (-2) => New
* If new have no srec, the message is always New. If messages were previously ignored
* due to being excessive, they would now appear to be newer than the messages that
* got actually synced, so make sure to look only at the newest ones. As some messages
* may be already propagated before an interruption, and maxuid logging is delayed,
* we need to track the newmaxuid separately. */
srec = tmsg->srec;
if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & (srec->uid[t] == -1 ? OP_RENEW : OP_NEW))
: svars->newmaxuid[1-t] < tmsg->uid && (svars->chan->ops[t] & OP_NEW)) {
debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] );
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED))
debug( " -> not %sing - would be expunged anyway\n", str_hl[t] );
else {
if (srec) {
srec->status |= S_DONE;
debug( " -> pair(%d,%d) exists\n", srec->uid[M], srec->uid[S] );
} else {
srec = nfmalloc( sizeof(*srec) );
srec->next = 0;
*svars->srecadd = srec;
svars->srecadd = &srec->next;
svars->nsrecs++;
srec->status = S_DONE;
srec->flags = 0;
srec->tuid[0] = 0;
srec->uid[1-t] = tmsg->uid;
srec->uid[t] = -2;
srec->msg[1-t] = tmsg;
srec->msg[t] = 0;
tmsg->srec = srec;
if (svars->newmaxuid[1-t] < tmsg->uid)
svars->newmaxuid[1-t] = tmsg->uid;
Fprintf( svars->jfp, "+ %d %d\n", srec->uid[M], srec->uid[S] );
debug( " -> pair(%d,%d) created\n", srec->uid[M], srec->uid[S] );
}
if (svars->maxuid[1-t] < tmsg->uid) {
/* We do this here for simplicity. However, logging must be delayed until
* all messages were propagated, as skipped messages could otherwise be
* logged before the propagation of messages with lower UIDs completes. */
svars->maxuid[1-t] = tmsg->uid;
}
if ((tmsg->flags & F_FLAGGED) || tmsg->size <= svars->chan->stores[t]->max_size) {
if (tmsg->flags) {
srec->flags = tmsg->flags;
Fprintf( svars->jfp, "* %d %d %u\n", srec->uid[M], srec->uid[S], srec->flags );
debug( " -> updated flags to %u\n", tmsg->flags );
}
for (t1 = 0; t1 < TUIDL; t1++) {
t2 = arc4_getbyte() & 0x3f;
srec->tuid[t1] = t2 < 26 ? t2 + 'A' : t2 < 52 ? t2 + 'a' - 26 : t2 < 62 ? t2 + '0' - 52 : t2 == 62 ? '+' : '/';
}
Fprintf( svars->jfp, "# %d %d %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], srec->tuid );
if (FSyncLevel >= FSYNC_THOROUGH)
fdatasync( fileno( svars->jfp ) );
debug( " -> %sing message, TUID %." stringify(TUIDL) "s\n", str_hl[t], srec->tuid );
} else {
if (srec->uid[t] == -1) {
debug( " -> not %sing - still too big\n", str_hl[t] );
} else {
debug( " -> not %sing - too big\n", str_hl[t] );
msg_copied_p2( svars, srec, t, -1 );
}
}
}
}
}
}
debug( "synchronizing old entries\n" ); debug( "synchronizing old entries\n" );
for (srec = svars->srecs; srec != *svars->osrecadd; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & (S_DEAD|S_DONE)) if (srec->status & S_DEAD)
continue; continue;
debug( "pair (%d,%d)\n", srec->uid[M], srec->uid[S] ); debug( "pair (%d,%d)\n", srec->uid[M], srec->uid[S] );
no[M] = !srec->msg[M] && (svars->ctx[M]->opts & OPEN_OLD); no[M] = !srec->msg[M] && (svars->ctx[M]->opts & OPEN_OLD);
@ -1350,6 +1274,79 @@ box_loaded( int sts, void *aux )
} }
} }
debug( "synchronizing new entries\n" );
for (t = 0; t < 2; t++) {
for (tmsg = svars->ctx[1-t]->msgs; tmsg; tmsg = tmsg->next) {
/* If we have a srec:
* - message is old (> 0) or expired (0) => ignore
* - message was skipped (-1) => ReNew
* - message was attempted, but failed (-2) => New
* If new have no srec, the message is always New. If messages were previously ignored
* due to being excessive, they would now appear to be newer than the messages that
* got actually synced, so make sure to look only at the newest ones. As some messages
* may be already propagated before an interruption, and maxuid logging is delayed,
* we need to track the newmaxuid separately. */
srec = tmsg->srec;
if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & (srec->uid[t] == -1 ? OP_RENEW : OP_NEW))
: svars->newmaxuid[1-t] < tmsg->uid && (svars->chan->ops[t] & OP_NEW)) {
debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] );
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) {
debug( " -> not %sing - would be expunged anyway\n", str_hl[t] );
} else {
if (srec) {
debug( " -> pair(%d,%d) exists\n", srec->uid[M], srec->uid[S] );
} else {
srec = nfmalloc( sizeof(*srec) );
srec->next = 0;
*svars->srecadd = srec;
svars->srecadd = &srec->next;
svars->nsrecs++;
srec->status = 0;
srec->flags = 0;
srec->tuid[0] = 0;
srec->uid[1-t] = tmsg->uid;
srec->uid[t] = -2;
srec->msg[1-t] = tmsg;
srec->msg[t] = 0;
tmsg->srec = srec;
if (svars->newmaxuid[1-t] < tmsg->uid)
svars->newmaxuid[1-t] = tmsg->uid;
Fprintf( svars->jfp, "+ %d %d\n", srec->uid[M], srec->uid[S] );
debug( " -> pair(%d,%d) created\n", srec->uid[M], srec->uid[S] );
}
if (svars->maxuid[1-t] < tmsg->uid) {
/* We do this here for simplicity. However, logging must be delayed until
* all messages were propagated, as skipped messages could otherwise be
* logged before the propagation of messages with lower UIDs completes. */
svars->maxuid[1-t] = tmsg->uid;
}
if ((tmsg->flags & F_FLAGGED) || tmsg->size <= svars->chan->stores[t]->max_size) {
if (tmsg->flags) {
srec->flags = tmsg->flags;
Fprintf( svars->jfp, "* %d %d %u\n", srec->uid[M], srec->uid[S], srec->flags );
debug( " -> updated flags to %u\n", tmsg->flags );
}
for (t1 = 0; t1 < TUIDL; t1++) {
t2 = arc4_getbyte() & 0x3f;
srec->tuid[t1] = t2 < 26 ? t2 + 'A' : t2 < 52 ? t2 + 'a' - 26 : t2 < 62 ? t2 + '0' - 52 : t2 == 62 ? '+' : '/';
}
Fprintf( svars->jfp, "# %d %d %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], srec->tuid );
if (FSyncLevel >= FSYNC_THOROUGH)
fdatasync( fileno( svars->jfp ) );
debug( " -> %sing message, TUID %." stringify(TUIDL) "s\n", str_hl[t], srec->tuid );
} else {
if (srec->uid[t] == -1) {
debug( " -> not %sing - still too big\n", str_hl[t] );
} else {
debug( " -> not %sing - too big\n", str_hl[t] );
msg_copied_p2( svars, srec, t, -1 );
}
}
}
}
}
}
if ((svars->chan->ops[S] & (OP_NEW|OP_RENEW|OP_FLAGS)) && svars->chan->max_messages) { if ((svars->chan->ops[S] & (OP_NEW|OP_RENEW|OP_FLAGS)) && svars->chan->max_messages) {
/* Note: When this branch is entered, we have loaded all slave messages. */ /* Note: When this branch is entered, we have loaded all slave messages. */
/* Expire excess messages. Important (flagged, unread, or unpropagated) messages /* Expire excess messages. Important (flagged, unread, or unpropagated) messages
@ -1452,12 +1449,10 @@ box_loaded( int sts, void *aux )
} }
debug( "synchronizing flags\n" ); debug( "synchronizing flags\n" );
for (srec = svars->srecs; srec != *svars->osrecadd; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & (S_DEAD|S_DONE)) if ((srec->status & S_DEAD) || srec->uid[M] <= 0 || srec->uid[S] <= 0)
continue; continue;
for (t = 0; t < 2; t++) { for (t = 0; t < 2; t++) {
if (srec->uid[t] <= 0)
continue;
aflags = srec->aflags[t]; aflags = srec->aflags[t];
dflags = srec->dflags[t]; dflags = srec->dflags[t];
if (srec->status & S_DELETE) { if (srec->status & S_DELETE) {