rework flag propagation during placeholder upgrade
don't implicitly propagate flags with upgrades. the user asked for replacing the body, so do just that. if they also asked for flag propagation, handle it like the case without upgrade as far as possible. this makes async parallel flag propagation in the opposite direction robust, while still being reasonably simple.
This commit is contained in:
parent
3d90507a75
commit
0da273686f
@ -1066,20 +1066,24 @@ my @X21 = (
|
||||
test("max size", \@x20, \@X21, \@O21);
|
||||
|
||||
my @x22 = (
|
||||
C, 0, B,
|
||||
E, 0, B,
|
||||
A, "*", "", "",
|
||||
B, "**", "", "",
|
||||
C, "*?", "*<", "*F*",
|
||||
B, "*PR*", "", "",
|
||||
C, "*PR?", "*<DP", "*DFP*",
|
||||
D, "*PR?", "*<DP", "*DP*",
|
||||
E, "*PR*", "*>DP", "*DP?",
|
||||
A, "", "*", "*",
|
||||
B, "", "*>", "*F?",
|
||||
B, "", "*>DP", "*DFP?",
|
||||
);
|
||||
|
||||
my @X22 = (
|
||||
C, 0, B,
|
||||
B, "", ">->", "^*",
|
||||
B, "", ">->D+R", "^PR*",
|
||||
B, "", "", "&1/",
|
||||
C, "^F*", "<-<+F", "",
|
||||
C, "^FPR*", "<-<D+FR", "-D+R",
|
||||
C, "&1+T", "^", "&",
|
||||
D, "", "-D+R", "-D+R",
|
||||
E, "", "-D+R", "-D+R",
|
||||
);
|
||||
test("max size + flagging", \@x22, \@X22, \@O21);
|
||||
|
||||
|
95
src/sync.c
95
src/sync.c
@ -289,12 +289,21 @@ msg_fetched( int sts, void *aux )
|
||||
return;
|
||||
}
|
||||
|
||||
vars->data.flags = sanitize_flags( vars->data.flags, svars, t );
|
||||
if (srec && !(srec->status & S_UPGRADE)) {
|
||||
if (vars->data.flags) {
|
||||
srec->pflags = vars->data.flags;
|
||||
JLOG( "%% %u %u %u", (srec->uid[F], srec->uid[N], srec->pflags),
|
||||
"%sing with flags %s", (str_hl[t], fmt_lone_flags( srec->pflags ).str) );
|
||||
if (srec && (srec->status & S_UPGRADE)) {
|
||||
vars->data.flags = (srec->pflags | srec->aflags[t]) & ~srec->dflags[t];
|
||||
if (srec->aflags[t] || srec->dflags[t]) {
|
||||
JLOG( "$ %u %u %u %u", (srec->uid[F], srec->uid[N], srec->aflags[t], srec->dflags[t]),
|
||||
"%sing upgrade with flags: +%s -%s",
|
||||
(str_hl[t], fmt_flags( srec->aflags[t] ).str, fmt_flags( srec->dflags[t] ).str) );
|
||||
}
|
||||
} else {
|
||||
vars->data.flags = sanitize_flags( vars->data.flags, svars, t );
|
||||
if (srec) {
|
||||
if (vars->data.flags) {
|
||||
srec->pflags = vars->data.flags;
|
||||
JLOG( "%% %u %u %u", (srec->uid[F], srec->uid[N], srec->pflags),
|
||||
"%sing with flags %s", (str_hl[t], fmt_lone_flags( srec->pflags ).str) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1008,7 +1017,23 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||
for (t = 0; t < 2; t++) {
|
||||
if (srec->msg[t] && (srec->msg[t]->flags & F_DELETED))
|
||||
srec->status |= S_DEL(t);
|
||||
if (del[t]) {
|
||||
if (srec->status & S_UPGRADE) {
|
||||
// Such records hold orphans by definition, so the del[] cases are irrelevant.
|
||||
if (srec->uid[t]) {
|
||||
// Direction towards the source message.
|
||||
// The placeholder was already detached, so use its saved flags instead.
|
||||
sflags = srec->pflags;
|
||||
goto doflags;
|
||||
}
|
||||
// Direction towards the copy.
|
||||
if (srec->msg[t^1]) {
|
||||
// Flag propagation along placeholder upgrades must be explicitly requested,
|
||||
// and is, at the source, handled like any other flag update.
|
||||
sflags = srec->msg[t^1]->flags;
|
||||
goto doflags;
|
||||
}
|
||||
debug( " no %s\n", str_fn[t^1] );
|
||||
} else if (del[t]) {
|
||||
// The target was newly expunged, so there is nothing to update.
|
||||
// The deletion is propagated in the opposite iteration.
|
||||
} else if (!srec->uid[t]) {
|
||||
@ -1044,8 +1069,11 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||
debug( " no %s\n", str_fn[t^1] );
|
||||
} else {
|
||||
// We have a source. The target may be in an unknown state.
|
||||
sflags = srec->msg[t^1]->flags;
|
||||
|
||||
doflags:
|
||||
if (svars->chan->ops[t] & OP_FLAGS) {
|
||||
sflags = sanitize_flags( srec->msg[t^1]->flags, svars, t );
|
||||
sflags = sanitize_flags( sflags, svars, t );
|
||||
if ((t == F) && (srec->status & (S_EXPIRE|S_EXPIRED))) {
|
||||
/* Don't propagate deletion resulting from expiration. */
|
||||
debug( " near side expiring\n" );
|
||||
@ -1076,32 +1104,15 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||
ushort suflags = srec->msg[N] ? srec->msg[N]->flags : 0;
|
||||
if ((muflags | suflags) & F_FLAGGED) {
|
||||
t = (srec->status & S_DUMMY(F)) ? F : N;
|
||||
// We calculate the flags for the replicated message already now,
|
||||
// because after an interruption the dummy may be already gone.
|
||||
srec->pflags = ((srec->msg[t]->flags & ~(F_SEEN|F_FLAGGED)) | srec->aflags[t]) & ~srec->dflags[t];
|
||||
// Consequently, the srec's flags are committed right away as well.
|
||||
srec->flags = (srec->flags | srec->aflags[t]) & ~srec->dflags[t];
|
||||
JLOG( "^ %u %u %u %u", (srec->uid[F], srec->uid[N], srec->pflags, srec->flags),
|
||||
"upgrading %s placeholder, dummy's flags %s, srec flags %s",
|
||||
(str_fn[t], fmt_lone_flags( srec->pflags ).str, fmt_lone_flags( srec->flags ).str) );
|
||||
// We save away the dummy's flags, because after an
|
||||
// interruption it may be already gone. Filtering as above.
|
||||
srec->pflags = srec->msg[t]->flags & ~(F_SEEN | F_FLAGGED);
|
||||
JLOG( "^ %u %u %u", (srec->uid[F], srec->uid[N], srec->pflags),
|
||||
"upgrading %s placeholder, dummy's flags %s",
|
||||
(str_fn[t], fmt_lone_flags( srec->pflags ).str) );
|
||||
nsrec = upgrade_srec( svars, srec, t );
|
||||
}
|
||||
}
|
||||
// This is separated, because the upgrade can come from the journal.
|
||||
if (srec->status & S_UPGRADE) {
|
||||
t = !srec->uid[F] ? F : N;
|
||||
tmsg = srec->msg[t^1];
|
||||
if ((svars->chan->ops[t] & OP_EXPUNGE) && (srec->pflags & F_DELETED)) {
|
||||
JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing upgrade - would be expunged anyway" );
|
||||
tmsg->srec = NULL;
|
||||
srec->status = S_DEAD;
|
||||
} else {
|
||||
// Pretend that the source message has the adjusted flags of the dummy.
|
||||
tmsg->flags = srec->pflags;
|
||||
tmsg->status |= M_FLAGS;
|
||||
any_new[t] = 1;
|
||||
}
|
||||
}
|
||||
srec = nsrec; // Minor optimization: skip freshly created placeholder entry.
|
||||
}
|
||||
}
|
||||
@ -1136,12 +1147,30 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||
"was previously skipped" );
|
||||
}
|
||||
} else {
|
||||
if (!(svars->chan->ops[t] & OP_NEW))
|
||||
if (!(svars->chan->ops[t] & OP_NEW) && !(srec->status & S_UPGRADE))
|
||||
continue;
|
||||
if (!(srec->status & S_PENDING))
|
||||
continue; // Nothing to do - the message is paired or expired
|
||||
// Propagation was scheduled, but we got interrupted
|
||||
debug( "unpropagated old message %u\n", tmsg->uid );
|
||||
|
||||
if (srec->status & S_UPGRADE) {
|
||||
if ((svars->chan->ops[t] & OP_EXPUNGE) &&
|
||||
((srec->pflags | srec->aflags[t]) & ~srec->dflags[t] & F_DELETED)) {
|
||||
// We can't just kill the entry, as we may be propagating flags
|
||||
// (in particular, F_DELETED) towards the real message.
|
||||
// No dummy is actually present, but pretend there is, so the
|
||||
// real message is considered new when trashing.
|
||||
srec->status = (srec->status & ~(S_PENDING | S_UPGRADE)) | S_DUMMY(t);
|
||||
JLOG( "~ %u %u %d", (srec->uid[F], srec->uid[N], srec->status & S_LOGGED),
|
||||
"canceling placeholder upgrade - would be expunged anyway" );
|
||||
continue;
|
||||
}
|
||||
// Prevent the driver from "completing" the flags, as we'll ignore them anyway.
|
||||
tmsg->status |= M_FLAGS;
|
||||
any_new[t] = 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// The 1st unknown message which should be known marks the end
|
||||
@ -1183,7 +1212,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
|
||||
continue;
|
||||
}
|
||||
if (!(tmsg->flags & F_FLAGGED) && tmsg->size > svars->chan->stores[t]->max_size &&
|
||||
!(srec->status & (S_DUMMY(F) | S_DUMMY(N) | S_UPGRADE))) {
|
||||
!(srec->status & (S_DUMMY(F) | S_DUMMY(N)))) {
|
||||
srec->status |= S_DUMMY(t);
|
||||
JLOG( "_ %u %u", (srec->uid[F], srec->uid[N]), "placeholder only - too big" );
|
||||
}
|
||||
|
@ -308,9 +308,10 @@ load_state( sync_vars_t *svars )
|
||||
case '*':
|
||||
case '%':
|
||||
case '~':
|
||||
case '^':
|
||||
bad = sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3;
|
||||
break;
|
||||
case '^':
|
||||
case '$':
|
||||
bad = sscanf( buf + 2, "%u %u %u %u", &t1, &t2, &t3, &t4 ) != 4;
|
||||
break;
|
||||
default:
|
||||
@ -411,11 +412,17 @@ load_state( sync_vars_t *svars )
|
||||
case '^':
|
||||
tn = (srec->status & S_DUMMY(F)) ? F : N;
|
||||
srec->pflags = (uchar)t3;
|
||||
srec->flags = (uchar)t4;
|
||||
debug( "upgrading %s placeholder, dummy's flags %s, srec flags %s\n",
|
||||
str_fn[tn], fmt_lone_flags( t3 ).str, fmt_lone_flags( t4 ).str );
|
||||
debug( "upgrading %s placeholder, dummy's flags %s\n",
|
||||
str_fn[tn], fmt_lone_flags( t3 ).str );
|
||||
srec = upgrade_srec( svars, srec, tn );
|
||||
break;
|
||||
case '$':
|
||||
tn = !srec->uid[F] ? F : N;
|
||||
srec->aflags[tn] = (uchar)t3;
|
||||
srec->dflags[tn] = (uchar)t4;
|
||||
debug( "flag update for %s now +%s -%s\n",
|
||||
str_fn[tn], fmt_flags( t3 ).str, fmt_flags( t4 ).str );
|
||||
break;
|
||||
default:
|
||||
assert( !"Unhandled journal entry" );
|
||||
}
|
||||
@ -522,8 +529,12 @@ assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid )
|
||||
if (uid == svars->newmaxuid[t] + 1)
|
||||
svars->newmaxuid[t] = uid;
|
||||
if (uid) {
|
||||
if (!(srec->status & S_UPGRADE))
|
||||
if (srec->status & S_UPGRADE) {
|
||||
srec->flags = (srec->flags | srec->aflags[t]) & ~srec->dflags[t];
|
||||
srec->aflags[t] = srec->dflags[t] = 0; // Cleanup after journal replay
|
||||
} else {
|
||||
srec->flags = srec->pflags;
|
||||
}
|
||||
}
|
||||
srec->status &= ~(S_PENDING | S_UPGRADE);
|
||||
srec->tuid[0] = 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user