diff --git a/src/sync.c b/src/sync.c index 4cbee88..429e843 100644 --- a/src/sync.c +++ b/src/sync.c @@ -95,7 +95,6 @@ make_flags( int flags, char *buf ) #define S_DEAD (1<<0) /* ephemeral: the entry was killed and should be ignored */ -#define S_DONE (1<<1) /* ephemeral: the entry was already synced */ #define S_DEL(ms) (1<<(2+(ms))) /* ephemeral: m/s message would be subject to expunge */ #define S_EXPIRED (1<<4) /* the entry is expired (slave message removal confirmed) */ #define S_EXPIRE (1<<5) /* the entry is being expired (slave message removal scheduled) */ @@ -146,7 +145,7 @@ typedef struct { void (*cb)( int sts, void *aux ), *aux; char *dname, *jname, *nname, *lname; FILE *jfp, *nfp; - sync_rec_t *srecs, **srecadd, **osrecadd; + sync_rec_t *srecs, **srecadd; channel_conf_t *chan; store_t *ctx[2]; driver_t *drv[2]; @@ -1203,84 +1202,9 @@ box_loaded( int sts, void *aux ) info( "Synchronizing...\n" ); - debug( "synchronizing new entries\n" ); - svars->osrecadd = svars->srecadd; - for (t = 0; t < 2; t++) { - for (tmsg = svars->ctx[1-t]->msgs; tmsg; tmsg = tmsg->next) { - /* If we have a srec: - * - message is old (> 0) or expired (0) => ignore - * - message was skipped (-1) => ReNew - * - message was attempted, but failed (-2) => New - * If new have no srec, the message is always New. If messages were previously ignored - * due to being excessive, they would now appear to be newer than the messages that - * got actually synced, so make sure to look only at the newest ones. As some messages - * may be already propagated before an interruption, and maxuid logging is delayed, - * we need to track the newmaxuid separately. */ - srec = tmsg->srec; - if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & (srec->uid[t] == -1 ? OP_RENEW : OP_NEW)) - : svars->newmaxuid[1-t] < tmsg->uid && (svars->chan->ops[t] & OP_NEW)) { - debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] ); - if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) - debug( " -> not %sing - would be expunged anyway\n", str_hl[t] ); - else { - if (srec) { - srec->status |= S_DONE; - debug( " -> pair(%d,%d) exists\n", srec->uid[M], srec->uid[S] ); - } else { - srec = nfmalloc( sizeof(*srec) ); - srec->next = 0; - *svars->srecadd = srec; - svars->srecadd = &srec->next; - svars->nsrecs++; - srec->status = S_DONE; - srec->flags = 0; - srec->tuid[0] = 0; - srec->uid[1-t] = tmsg->uid; - srec->uid[t] = -2; - srec->msg[1-t] = tmsg; - srec->msg[t] = 0; - tmsg->srec = srec; - if (svars->newmaxuid[1-t] < tmsg->uid) - svars->newmaxuid[1-t] = tmsg->uid; - Fprintf( svars->jfp, "+ %d %d\n", srec->uid[M], srec->uid[S] ); - debug( " -> pair(%d,%d) created\n", srec->uid[M], srec->uid[S] ); - } - if (svars->maxuid[1-t] < tmsg->uid) { - /* We do this here for simplicity. However, logging must be delayed until - * all messages were propagated, as skipped messages could otherwise be - * logged before the propagation of messages with lower UIDs completes. */ - svars->maxuid[1-t] = tmsg->uid; - } - if ((tmsg->flags & F_FLAGGED) || tmsg->size <= svars->chan->stores[t]->max_size) { - if (tmsg->flags) { - srec->flags = tmsg->flags; - Fprintf( svars->jfp, "* %d %d %u\n", srec->uid[M], srec->uid[S], srec->flags ); - debug( " -> updated flags to %u\n", tmsg->flags ); - } - for (t1 = 0; t1 < TUIDL; t1++) { - t2 = arc4_getbyte() & 0x3f; - srec->tuid[t1] = t2 < 26 ? t2 + 'A' : t2 < 52 ? t2 + 'a' - 26 : t2 < 62 ? t2 + '0' - 52 : t2 == 62 ? '+' : '/'; - } - Fprintf( svars->jfp, "# %d %d %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], srec->tuid ); - if (FSyncLevel >= FSYNC_THOROUGH) - fdatasync( fileno( svars->jfp ) ); - debug( " -> %sing message, TUID %." stringify(TUIDL) "s\n", str_hl[t], srec->tuid ); - } else { - if (srec->uid[t] == -1) { - debug( " -> not %sing - still too big\n", str_hl[t] ); - } else { - debug( " -> not %sing - too big\n", str_hl[t] ); - msg_copied_p2( svars, srec, t, -1 ); - } - } - } - } - } - } - debug( "synchronizing old entries\n" ); - for (srec = svars->srecs; srec != *svars->osrecadd; srec = srec->next) { - if (srec->status & (S_DEAD|S_DONE)) + for (srec = svars->srecs; srec; srec = srec->next) { + if (srec->status & S_DEAD) continue; debug( "pair (%d,%d)\n", srec->uid[M], srec->uid[S] ); no[M] = !srec->msg[M] && (svars->ctx[M]->opts & OPEN_OLD); @@ -1350,6 +1274,79 @@ box_loaded( int sts, void *aux ) } } + debug( "synchronizing new entries\n" ); + for (t = 0; t < 2; t++) { + for (tmsg = svars->ctx[1-t]->msgs; tmsg; tmsg = tmsg->next) { + /* If we have a srec: + * - message is old (> 0) or expired (0) => ignore + * - message was skipped (-1) => ReNew + * - message was attempted, but failed (-2) => New + * If new have no srec, the message is always New. If messages were previously ignored + * due to being excessive, they would now appear to be newer than the messages that + * got actually synced, so make sure to look only at the newest ones. As some messages + * may be already propagated before an interruption, and maxuid logging is delayed, + * we need to track the newmaxuid separately. */ + srec = tmsg->srec; + if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & (srec->uid[t] == -1 ? OP_RENEW : OP_NEW)) + : svars->newmaxuid[1-t] < tmsg->uid && (svars->chan->ops[t] & OP_NEW)) { + debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] ); + if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) { + debug( " -> not %sing - would be expunged anyway\n", str_hl[t] ); + } else { + if (srec) { + debug( " -> pair(%d,%d) exists\n", srec->uid[M], srec->uid[S] ); + } else { + srec = nfmalloc( sizeof(*srec) ); + srec->next = 0; + *svars->srecadd = srec; + svars->srecadd = &srec->next; + svars->nsrecs++; + srec->status = 0; + srec->flags = 0; + srec->tuid[0] = 0; + srec->uid[1-t] = tmsg->uid; + srec->uid[t] = -2; + srec->msg[1-t] = tmsg; + srec->msg[t] = 0; + tmsg->srec = srec; + if (svars->newmaxuid[1-t] < tmsg->uid) + svars->newmaxuid[1-t] = tmsg->uid; + Fprintf( svars->jfp, "+ %d %d\n", srec->uid[M], srec->uid[S] ); + debug( " -> pair(%d,%d) created\n", srec->uid[M], srec->uid[S] ); + } + if (svars->maxuid[1-t] < tmsg->uid) { + /* We do this here for simplicity. However, logging must be delayed until + * all messages were propagated, as skipped messages could otherwise be + * logged before the propagation of messages with lower UIDs completes. */ + svars->maxuid[1-t] = tmsg->uid; + } + if ((tmsg->flags & F_FLAGGED) || tmsg->size <= svars->chan->stores[t]->max_size) { + if (tmsg->flags) { + srec->flags = tmsg->flags; + Fprintf( svars->jfp, "* %d %d %u\n", srec->uid[M], srec->uid[S], srec->flags ); + debug( " -> updated flags to %u\n", tmsg->flags ); + } + for (t1 = 0; t1 < TUIDL; t1++) { + t2 = arc4_getbyte() & 0x3f; + srec->tuid[t1] = t2 < 26 ? t2 + 'A' : t2 < 52 ? t2 + 'a' - 26 : t2 < 62 ? t2 + '0' - 52 : t2 == 62 ? '+' : '/'; + } + Fprintf( svars->jfp, "# %d %d %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], srec->tuid ); + if (FSyncLevel >= FSYNC_THOROUGH) + fdatasync( fileno( svars->jfp ) ); + debug( " -> %sing message, TUID %." stringify(TUIDL) "s\n", str_hl[t], srec->tuid ); + } else { + if (srec->uid[t] == -1) { + debug( " -> not %sing - still too big\n", str_hl[t] ); + } else { + debug( " -> not %sing - too big\n", str_hl[t] ); + msg_copied_p2( svars, srec, t, -1 ); + } + } + } + } + } + } + if ((svars->chan->ops[S] & (OP_NEW|OP_RENEW|OP_FLAGS)) && svars->chan->max_messages) { /* Note: When this branch is entered, we have loaded all slave messages. */ /* Expire excess messages. Important (flagged, unread, or unpropagated) messages @@ -1452,12 +1449,10 @@ box_loaded( int sts, void *aux ) } debug( "synchronizing flags\n" ); - for (srec = svars->srecs; srec != *svars->osrecadd; srec = srec->next) { - if (srec->status & (S_DEAD|S_DONE)) + for (srec = svars->srecs; srec; srec = srec->next) { + if ((srec->status & S_DEAD) || srec->uid[M] <= 0 || srec->uid[S] <= 0) continue; for (t = 0; t < 2; t++) { - if (srec->uid[t] <= 0) - continue; aflags = srec->aflags[t]; dflags = srec->dflags[t]; if (srec->status & S_DELETE) {