make expiration target side configurable

REFMAIL: 87k0fauw7q.fsf@wavexx.thregr.org
This commit is contained in:
Oswald Buddenhagen 2022-05-05 20:31:43 +02:00
parent abb596709b
commit 8566283c59
8 changed files with 199 additions and 67 deletions

2
NEWS
View File

@ -12,6 +12,8 @@ they are flagged on the source side.
Renamed the ReNew/--renew/-N options to Upgrade/--upgrade/-u Renamed the ReNew/--renew/-N options to Upgrade/--upgrade/-u
and Delete/--delete/-d to Gone/--gone/-g. and Delete/--delete/-d to Gone/--gone/-g.
Made the Channel side to expire with MaxMessages configurable.
MaxMessages and MaxSize can be used together now. MaxMessages and MaxSize can be used together now.
The unfiltered list of mailboxes in each Store can be printed now. The unfiltered list of mailboxes in each Store can be printed now.

View File

@ -261,6 +261,17 @@ getopt_helper( conffile_t *cfile, int *cops, channel_conf_t *conf )
conf->use_internal_date = parse_bool( cfile ); conf->use_internal_date = parse_bool( cfile );
} else if (!strcasecmp( "MaxMessages", cfile->cmd )) { } else if (!strcasecmp( "MaxMessages", cfile->cmd )) {
conf->max_messages = parse_int( cfile ); conf->max_messages = parse_int( cfile );
} else if (!strcasecmp( "ExpireSide", cfile->cmd )) {
arg = cfile->val;
if (!strcasecmp( "Far", arg )) {
conf->expire_side = F;
} else if (!strcasecmp( "Near", arg )) {
conf->expire_side = N;
} else {
error( "%s:%d: invalid ExpireSide argument '%s'\n",
cfile->file, cfile->line, arg );
cfile->err = 1;
}
} else if (!strcasecmp( "ExpireUnread", cfile->cmd )) { } else if (!strcasecmp( "ExpireUnread", cfile->cmd )) {
conf->expire_unread = parse_bool( cfile ); conf->expire_unread = parse_bool( cfile );
} else { } else {
@ -481,6 +492,7 @@ load_config( const char *where )
gcops = 0; gcops = 0;
glob_ok = 1; glob_ok = 1;
global_conf.expire_side = N;
global_conf.expire_unread = -1; global_conf.expire_unread = -1;
reloop: reloop:
while (getcline( &cfile )) { while (getcline( &cfile )) {
@ -505,6 +517,7 @@ load_config( const char *where )
channel = nfzalloc( sizeof(*channel) ); channel = nfzalloc( sizeof(*channel) );
channel->name = nfstrdup( cfile.val ); channel->name = nfstrdup( cfile.val );
channel->max_messages = global_conf.max_messages; channel->max_messages = global_conf.max_messages;
channel->expire_side = global_conf.expire_side;
channel->expire_unread = global_conf.expire_unread; channel->expire_unread = global_conf.expire_unread;
channel->use_internal_date = global_conf.use_internal_date; channel->use_internal_date = global_conf.use_internal_date;
cops = 0; cops = 0;

View File

@ -579,6 +579,12 @@ case you need to enable this option.
(Global default: \fBno\fR). (Global default: \fBno\fR).
. .
.TP .TP
\fBExpireSide\fR \fBFar\fR|\fBNear\fR
Selects on which side messages should be expired when \fBMaxMessages\fR is
configured.
(Global default: \fBNear\fR).
.
.TP
\fBSync\fR {\fBNone\fR|[\fBPull\fR] [\fBPush\fR] [\fBNew\fR] [\fBOld\fR] [\fBUpgrade\fR] [\fBGone\fR] [\fBFlags\fR] [\fBFull\fR]} \fBSync\fR {\fBNone\fR|[\fBPull\fR] [\fBPush\fR] [\fBNew\fR] [\fBOld\fR] [\fBUpgrade\fR] [\fBGone\fR] [\fBFlags\fR] [\fBFull\fR]}
Select the synchronization operation(s) to perform: Select the synchronization operation(s) to perform:
.br .br
@ -693,7 +699,8 @@ date\fR) is actually the arrival time, but it is usually close enough.
. .
.P .P
\fBSync\fR, \fBCreate\fR, \fBRemove\fR, \fBExpunge\fR, \fBExpungeSolo\fR, \fBSync\fR, \fBCreate\fR, \fBRemove\fR, \fBExpunge\fR, \fBExpungeSolo\fR,
\fBMaxMessages\fR, \fBExpireUnread\fR, and \fBCopyArrivalDate\fR \fBMaxMessages\fR, \fBExpireUnread\fR, \fBExpireSide\fR,
and \fBCopyArrivalDate\fR
can be used before any section for a global effect. can be used before any section for a global effect.
The global settings are overridden by Channel-specific options, The global settings are overridden by Channel-specific options,
which in turn are overridden by command line switches. which in turn are overridden by command line switches.

View File

@ -266,12 +266,28 @@ sub parse_chan($;$)
} }
$$ss{max_pulled} = resolv_msg($$ics[0], $cs, "far"); $$ss{max_pulled} = resolv_msg($$ics[0], $cs, "far");
$$ss{max_expired} = resolv_msg($$ics[1], $cs, "far"); $$ss{max_expired_far} = resolv_msg($$ics[1], $cs, "far");
$$ss{max_pushed} = resolv_msg($$ics[2], $cs, "near"); $$ss{max_pushed} = resolv_msg($$ics[2], $cs, "near");
$$ss{max_expired_near} = 0;
return $cs; return $cs;
} }
sub flip_chan($)
{
my ($cs) = @_;
($$cs{far}, $$cs{near}) = ($$cs{near}, $$cs{far});
($$cs{far_trash}, $$cs{near_trash}) = ($$cs{near_trash}, $$cs{far_trash});
my $ss = $$cs{state};
($$ss{max_pulled}, $$ss{max_pushed}) = ($$ss{max_pushed}, $$ss{max_pulled});
($$ss{max_expired_far}, $$ss{max_expired_near}) = ($$ss{max_expired_near}, $$ss{max_expired_far});
for my $ent (@{$$ss{entries}}) {
($$ent[0], $$ent[1]) = ($$ent[1], $$ent[0]);
$$ent[2] =~ tr/<>/></;
}
}
sub qm($) sub qm($)
{ {
@ -442,8 +458,9 @@ sub readstate(;$)
my @ents; my @ents;
my %ss = ( my %ss = (
max_pulled => 0, max_pulled => 0,
max_expired => 0, max_expired_far => 0,
max_pushed => 0, max_pushed => 0,
max_expired_near => 0,
entries => \@ents entries => \@ents
); );
my ($far_val, $near_val) = (0, 0); my ($far_val, $near_val) = (0, 0);
@ -452,7 +469,8 @@ sub readstate(;$)
'NearUidValidity' => \$near_val, 'NearUidValidity' => \$near_val,
'MaxPulledUid' => \$ss{max_pulled}, 'MaxPulledUid' => \$ss{max_pulled},
'MaxPushedUid' => \$ss{max_pushed}, 'MaxPushedUid' => \$ss{max_pushed},
'MaxExpiredFarUid' => \$ss{max_expired} 'MaxExpiredFarUid' => \$ss{max_expired_far},
'MaxExpiredNearUid' => \$ss{max_expired_near}
); );
OUTER: while (1) { OUTER: while (1) {
while (@$ls) { while (@$ls) {
@ -473,6 +491,7 @@ sub readstate(;$)
return; return;
} }
delete $hdr{'MaxExpiredFarUid'}; # optional field delete $hdr{'MaxExpiredFarUid'}; # optional field
delete $hdr{'MaxExpiredNearUid'}; # ditto
my @ky = keys %hdr; my @ky = keys %hdr;
if (@ky) { if (@ky) {
print STDERR "Keys missing from sync state header: @ky\n"; print STDERR "Keys missing from sync state header: @ky\n";
@ -537,8 +556,12 @@ sub mkstate($)
open(FILE, ">", "near/.mbsyncstate") or open(FILE, ">", "near/.mbsyncstate") or
die "Cannot create sync state.\n"; die "Cannot create sync state.\n";
print FILE "FarUidValidity 1\nMaxPulledUid ".$$ss{max_pulled}."\n". print FILE "FarUidValidity 1\nMaxPulledUid ".$$ss{max_pulled}."\n".
"NearUidValidity 1\nMaxExpiredFarUid ".$$ss{max_expired}. "NearUidValidity 1\nMaxPushedUid ".$$ss{max_pushed}."\n";
"\nMaxPushedUid ".$$ss{max_pushed}."\n\n"; print FILE "MaxExpiredFarUid ".$$ss{max_expired_far}."\n"
if ($$ss{max_expired_far});
print FILE "MaxExpiredNearUid ".$$ss{max_expired_near}."\n"
if ($$ss{max_expired_near});
print FILE "\n";
for my $ent (@{$$ss{entries}}) { for my $ent (@{$$ss{entries}}) {
print FILE $$ent[0]." ".$$ent[1]." ".$$ent[2]."\n"; print FILE $$ent[0]." ".$$ent[1]." ".$$ent[2]."\n";
} }
@ -646,8 +669,9 @@ sub cmpstate($$)
return 0 if ($ss == $ref_ss); return 0 if ($ss == $ref_ss);
my $ret = 0; my $ret = 0;
for my $h (['MaxPulledUid', 'max_pulled'], for my $h (['MaxPulledUid', 'max_pulled'],
['MaxExpiredFarUid', 'max_expired'], ['MaxExpiredFarUid', 'max_expired_far'],
['MaxPushedUid', 'max_pushed']) { ['MaxPushedUid', 'max_pushed'],
['MaxExpiredNearUid', 'max_expired_near']) {
my ($hn, $sn) = @$h; my ($hn, $sn) = @$h;
my ($got, $want) = ($$ss{$sn}, $$ref_ss{$sn}); my ($got, $want) = ($$ss{$sn}, $$ref_ss{$sn});
if ($got != $want) { if ($got != $want) {
@ -735,7 +759,8 @@ sub printstate($)
my ($ss) = @_; my ($ss) = @_;
return if (!$ss); return if (!$ss);
print " [ ".$$ss{max_pulled}.", ".$$ss{max_expired}.", ".$$ss{max_pushed}.",\n "; print " [ ".$$ss{max_pulled}.", ".$$ss{max_expired_far}.", ".
$$ss{max_pushed}.", ".$$ss{max_expired_near}.",\n ";
my $frst = 1; my $frst = 1;
for my $ent (@{$$ss{entries}}) { for my $ent (@{$$ss{entries}}) {
if ($frst) { if ($frst) {
@ -884,10 +909,10 @@ sub test_impl($$$$)
} }
} }
# $title, \@source_state, \@target_state, \@channel_configs # $title, \@source_state, \@target_state, \@channel_configs, $flip_sides
sub test($$$$) sub test($$$$;$)
{ {
my ($ttl, $isx, $itx, $sfx) = @_; my ($ttl, $isx, $itx, $sfx, $flip) = @_;
if (@match) { if (@match) {
if ($start) { if ($start) {
@ -899,10 +924,16 @@ sub test($$$$)
} }
print "Testing: ".$ttl." ...\n"; print "Testing: ".$ttl." ...\n";
# We don't flip the Store configs, as inverting the Channel config
# would be unreasonably complex.
writecfg($sfx); writecfg($sfx);
my $sx = parse_chan($isx); my $sx = parse_chan($isx);
my $tx = parse_chan($itx, $sx); my $tx = parse_chan($itx, $sx);
if ($flip) {
flip_chan($sx);
flip_chan($tx);
}
test_impl(0, $sx, $tx, $sfx); test_impl(0, $sx, $tx, $sfx);
test_impl(1, $sx, $tx, $sfx); test_impl(1, $sx, $tx, $sfx);
@ -1263,6 +1294,9 @@ my @X33 = (
); );
test("max messages + expire - full", \@x33, \@X33, \@O31); test("max messages + expire - full", \@x33, \@X33, \@O31);
my @O31a = ("", "", "MaxMessages 3\nExpireSide Far\n");
test("max messages + expire far - full", \@x33, \@X33, \@O31a, 1);
my @O34 = ("", "", "Sync New\nMaxMessages 3\n"); my @O34 = ("", "", "Sync New\nMaxMessages 3\n");
my @X34 = ( my @X34 = (
I, F, I, I, F, I,
@ -1465,6 +1499,9 @@ my @X61 = (
); );
test("maxuid topping", \@x60, \@X61, \@O61); test("maxuid topping", \@x60, \@X61, \@O61);
my @O62 = ("", "", "Sync Flags\nExpireSide Far");
test("maxuid topping (expire far)", \@x60, \@X61, \@O62, 1);
# Tests for refreshing previously skipped/failed/expired messages. # Tests for refreshing previously skipped/failed/expired messages.
# We don't know the flags at the time of the (hypothetical) previous # We don't know the flags at the time of the (hypothetical) previous
# sync, so we can't know why a particular message is missing. # sync, so we can't know why a particular message is missing.
@ -1688,6 +1725,24 @@ my @X82a = (
); );
test("weird old + expire + expunge near", \@x80a, \@X82a, \@O72a); test("weird old + expire + expunge near", \@x80a, \@X82a, \@O72a);
my @O82b = ("", "", "Sync New Old\nMaxMessages 3\nExpireUnread yes\nExpireSide Far\nExpunge Far\n");
my @X82b = (
U, L, D,
E, "", "/", "/",
F, "", ">", "",
G, "", "", "/",
L, "", "/", "",
N, "", "/", "/",
O, "", ">", "",
P, "", "", "/",
T, "", "", "/",
D, "", "*F", "*F",
H, "*", "*", "",
Q, "*", "*", "",
U, "*", "*", "",
);
test("weird old + expire far + expunge far", \@x80a, \@X82b, \@O82b, 1);
my @X83 = ( my @X83 = (
S, L, Q, S, L, Q,
E, "", "<", "", E, "", "<", "",
@ -1816,6 +1871,24 @@ my @X13 = (
); );
test("trash new remotely", \@x10, \@X13, \@O13); test("trash new remotely", \@x10, \@X13, \@O13);
my @O14 = ("Trash far_trash\n", "",
"Sync Flags Gone\nMaxMessages 20\nExpireUnread yes\nExpireSide Far\nMaxSize 1k\nExpunge Both\n");
my @X14 = (
K, A, K,
A, "", "/", "/",
B, "/", "/", "",
C, "/", "/", "#/",
D, "", "/", "#/",
E, "/", "/", "",
F, "/", "/", "/",
G, "/", "/", "#/",
H, "/", "/", "/",
I, "/", "/", "#/",
L, "/", "", "",
M, "", "", "#/",
);
test("trash near (expire far)", \@x10, \@X14, \@O14, 1);
# Test "mirroring" expunges. # Test "mirroring" expunges.
my @xa0 = ( my @xa0 = (

View File

@ -625,14 +625,15 @@ box_opened2( sync_vars_t *svars, int t )
// The latter would also apply when the expired box is the source, // The latter would also apply when the expired box is the source,
// but it's more natural to treat it as read-only in that case. // but it's more natural to treat it as read-only in that case.
// OP_UPGRADE makes sense only for legacy S_SKIPPED entries. // OP_UPGRADE makes sense only for legacy S_SKIPPED entries.
if ((chan->ops[N] & (OP_OLD | OP_NEW | OP_UPGRADE | OP_FLAGS)) && chan->max_messages) int xt = chan->expire_side;
if ((chan->ops[xt] & (OP_OLD | OP_NEW | OP_UPGRADE | OP_FLAGS)) && chan->max_messages)
svars->any_expiring = 1; svars->any_expiring = 1;
if (svars->any_expiring) { if (svars->any_expiring) {
opts[N] |= OPEN_PAIRED | OPEN_FLAGS; opts[xt] |= OPEN_PAIRED | OPEN_FLAGS;
if (any_dummies[N]) if (any_dummies[xt])
opts[F] |= OPEN_PAIRED | OPEN_FLAGS; opts[xt^1] |= OPEN_PAIRED | OPEN_FLAGS;
else if (chan->ops[N] & (OP_OLD | OP_NEW | OP_UPGRADE)) else if (chan->ops[xt] & (OP_OLD | OP_NEW | OP_UPGRADE))
opts[F] |= OPEN_FLAGS; opts[xt^1] |= OPEN_FLAGS;
} }
for (t = 0; t < 2; t++) { for (t = 0; t < 2; t++) {
svars->opts[t] = svars->drv[t]->prepare_load_box( ctx[t], opts[t] ); svars->opts[t] = svars->drv[t]->prepare_load_box( ctx[t], opts[t] );
@ -651,36 +652,37 @@ box_opened2( sync_vars_t *svars, int t )
} }
ARRAY_INIT( &mexcs ); ARRAY_INIT( &mexcs );
if ((svars->opts[F] & OPEN_PAIRED) && !(svars->opts[F] & OPEN_OLD) && chan->max_messages) { if ((svars->opts[xt^1] & OPEN_PAIRED) && !(svars->opts[xt^1] & OPEN_OLD) && chan->max_messages) {
/* When messages have been expired on the near side, the far side fetch is split into /* When messages have been expired on one side, the other side's fetch is split into
* two ranges: The bulk fetch which corresponds with the most recent messages, and an * two ranges: The bulk fetch which corresponds with the most recent messages, and an
* exception list of messages which would have been expired if they weren't important. */ * exception list of messages which would have been expired if they weren't important. */
debug( "preparing far side selection - max expired far uid is %u\n", svars->maxxfuid ); debug( "preparing %s selection - max expired %s uid is %u\n",
str_fn[xt^1], str_fn[xt^1], svars->maxxfuid );
/* First, find out the lower bound for the bulk fetch. */ /* First, find out the lower bound for the bulk fetch. */
minwuid = svars->maxxfuid + 1; minwuid = svars->maxxfuid + 1;
/* Next, calculate the exception fetch. */ /* Next, calculate the exception fetch. */
for (srec = svars->srecs; srec; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
continue; continue;
if (!srec->uid[F]) if (!srec->uid[xt^1])
continue; // No message; other state is irrelevant continue; // No message; other state is irrelevant
if (srec->uid[F] >= minwuid) if (srec->uid[xt^1] >= minwuid)
continue; // Message is in non-expired range continue; // Message is in non-expired range
if ((svars->opts[F] & OPEN_NEW) && srec->uid[F] > svars->maxuid[F]) if ((svars->opts[xt^1] & OPEN_NEW) && srec->uid[xt^1] > svars->maxuid[xt^1])
continue; // Message is in expired range, but new range overlaps that continue; // Message is in expired range, but new range overlaps that
if (!srec->uid[N] && !(srec->status & S_PENDING)) if (!srec->uid[xt] && !(srec->status & S_PENDING))
continue; // Only actually paired up messages matter continue; // Only actually paired up messages matter
// The pair is alive, but outside the bulk range // The pair is alive, but outside the bulk range
*uint_array_append( &mexcs ) = srec->uid[F]; *uint_array_append( &mexcs ) = srec->uid[xt^1];
} }
sort_uint_array( mexcs.array ); sort_uint_array( mexcs.array );
} else { } else {
minwuid = 1; minwuid = 1;
} }
sync_ref( svars ); sync_ref( svars );
load_box( svars, F, minwuid, mexcs.array ); load_box( svars, xt^1, minwuid, mexcs.array );
if (!check_cancel( svars )) if (!check_cancel( svars ))
load_box( svars, N, 1, (uint_array_t){ NULL, 0 } ); load_box( svars, xt, 1, (uint_array_t){ NULL, 0 } );
sync_deref( svars ); sync_deref( svars );
} }
@ -747,6 +749,16 @@ cmp_srec_far( const void *a, const void *b )
return au > bu ? 1 : -1; // Can't subtract, the result might not fit into signed int. return au > bu ? 1 : -1; // Can't subtract, the result might not fit into signed int.
} }
static int
cmp_srec_near( const void *a, const void *b )
{
uint au = (*(const alive_srec_t *)a).srec->uid[N];
uint bu = (*(const alive_srec_t *)b).srec->uid[N];
assert( au && bu );
assert( au != bu );
return au > bu ? 1 : -1; // Can't subtract, the result might not fit into signed int.
}
typedef struct { typedef struct {
void *aux; void *aux;
sync_rec_t *srec; sync_rec_t *srec;
@ -877,6 +889,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
svars->oldmaxuid[N] = svars->newmaxuid[N]; svars->oldmaxuid[N] = svars->newmaxuid[N];
info( "Synchronizing...\n" ); info( "Synchronizing...\n" );
int xt = svars->chan->expire_side;
for (t = 0; t < 2; t++) for (t = 0; t < 2; t++)
svars->good_flags[t] = (uchar)svars->drv[t]->get_supported_flags( svars->ctx[t] ); svars->good_flags[t] = (uchar)svars->drv[t]->get_supported_flags( svars->ctx[t] );
@ -953,15 +966,16 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
} else if (del[t^1]) { } else if (del[t^1]) {
// The source was newly expunged, so possibly propagate the deletion. // The source was newly expunged, so possibly propagate the deletion.
// The target may be in an unknown state (not fetched). // The target may be in an unknown state (not fetched).
if ((t == F) && (srec->status & (S_EXPIRE|S_EXPIRED))) { if ((t != xt) && (srec->status & (S_EXPIRE | S_EXPIRED))) {
/* Don't propagate deletion resulting from expiration. */ /* Don't propagate deletion resulting from expiration. */
if (~srec->status & (S_EXPIRE | S_EXPIRED)) { if (~srec->status & (S_EXPIRE | S_EXPIRED)) {
// An expiration was interrupted, but the message was expunged since. // An expiration was interrupted, but the message was expunged since.
srec->status |= S_EXPIRE | S_EXPIRED; // Override failed unexpiration attempts. srec->status |= S_EXPIRE | S_EXPIRED; // Override failed unexpiration attempts.
JLOG( "~ %u %u %u", (srec->uid[F], srec->uid[N], srec->status), "forced expiration commit" ); JLOG( "~ %u %u %u", (srec->uid[F], srec->uid[N], srec->status), "forced expiration commit" );
} }
JLOG( "> %u %u 0", (srec->uid[F], srec->uid[N]), "near side expired, orphaning far side" ); JLOG( "%c %u %u 0", ("<>"[xt], srec->uid[F], srec->uid[N]),
srec->uid[N] = 0; "%s expired, orphaning %s", (str_fn[xt], str_fn[xt^1]) );
srec->uid[xt] = 0;
} else { } else {
if (srec->msg[t] && (srec->msg[t]->status & M_FLAGS) && if (srec->msg[t] && (srec->msg[t]->status & M_FLAGS) &&
// Ignore deleted flag, as that's what we'll change ourselves ... // Ignore deleted flag, as that's what we'll change ourselves ...
@ -988,7 +1002,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
doflags: doflags:
if (svars->chan->ops[t] & OP_FLAGS) { if (svars->chan->ops[t] & OP_FLAGS) {
sflags = sanitize_flags( sflags, svars, t ); sflags = sanitize_flags( sflags, svars, t );
if ((t == F) && (srec->status & (S_EXPIRE|S_EXPIRED))) { if ((t != xt) && (srec->status & (S_EXPIRE | S_EXPIRED))) {
/* Don't propagate deletion resulting from expiration. */ /* Don't propagate deletion resulting from expiration. */
debug( " near side expiring\n" ); debug( " near side expiring\n" );
sflags &= ~F_DELETED; sflags &= ~F_DELETED;
@ -1054,7 +1068,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
// debug( "not re-propagating orphaned message %u\n", tmsg->uid ); // debug( "not re-propagating orphaned message %u\n", tmsg->uid );
continue; continue;
} }
if (t == F || !(srec->status & S_EXPIRED)) { if (t != xt || !(srec->status & S_EXPIRED)) {
// Orphans are essentially deletion propagation transactions which // Orphans are essentially deletion propagation transactions which
// were interrupted midway through by not expunging the target. We // were interrupted midway through by not expunging the target. We
// don't re-propagate these, as it would be illogical, and also // don't re-propagate these, as it would be illogical, and also
@ -1100,7 +1114,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
// The 1st unknown message which should be known marks the end // The 1st unknown message which should be known marks the end
// of the synced range; more known messages may follow (from an // of the synced range; more known messages may follow (from an
// unidirectional sync in the opposite direction). // unidirectional sync in the opposite direction).
if (t == F || tmsg->uid > svars->maxxfuid) if (t != xt || tmsg->uid > svars->maxxfuid)
topping = 0; topping = 0;
const char *what; const char *what;
@ -1163,50 +1177,50 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
} }
if (svars->any_expiring) { if (svars->any_expiring) {
// Note: When this branch is entered, we have loaded all near side messages.
/* Expire excess messages. Important (flagged, unread, or unpropagated) messages /* Expire excess messages. Important (flagged, unread, or unpropagated) messages
* older than the first not expired message are not counted towards the total. */ * older than the first not expired message are not counted towards the total. */
// Note: When this branch is entered, we have loaded all expire-side messages.
debug( "preparing message expiration\n" ); debug( "preparing message expiration\n" );
alive_srec_t *arecs = nfmalloc( sizeof(*arecs) * svars->nsrecs ); alive_srec_t *arecs = nfmalloc( sizeof(*arecs) * svars->nsrecs );
int alive = 0; int alive = 0;
for (srec = svars->srecs; srec; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
continue; continue;
// We completely ignore unpaired near-side messages, as we cannot expire // We completely ignore unpaired expire-side messages, as we cannot expire
// them without data loss; consequently, we also don't count them. // them without data loss; consequently, we also don't count them.
// Note that we also ignore near-side messages we're currently propagating, // Note that we also ignore expire-side messages we're currently propagating,
// which delays expiration of some messages by one cycle. Otherwise, we'd // which delays expiration of some messages by one cycle. Otherwise, we'd
// have to sequence flag updating after message propagation to avoid a race // have to sequence flag updating after message propagation to avoid a race
// with external expunging, and that seems unreasonably expensive. // with external expunging, and that seems unreasonably expensive.
if (!srec->uid[F]) if (!srec->uid[xt^1])
continue; continue;
if (!(srec->status & S_PENDING)) { if (!(srec->status & S_PENDING)) {
// We ignore unpaired far-side messages, as there is obviously nothing // We ignore unpaired far-side messages, as there is obviously nothing
// to expire in the first place. // to expire in the first place.
if (!srec->msg[N]) if (!srec->msg[xt])
continue; continue;
nflags = srec->msg[N]->flags; nflags = srec->msg[xt]->flags;
if (srec->status & S_DUMMY(N)) { if (srec->status & S_DUMMY(xt)) {
if (!srec->msg[F]) if (!srec->msg[xt^1])
continue; continue;
// We need to pull in the real Flagged and Seen even if flag // We need to pull in the real Flagged and Seen even if flag
// propagation was not requested, as the placeholder's ones are // propagation was not requested, as the placeholder's ones are
// useless (except for un-seeing). // useless (except for un-seeing).
// This results in the somewhat weird situation that messages // This results in the somewhat weird situation that messages
// which are not visibly flagged remain unexpired. // which are not visibly flagged remain unexpired.
sflags = srec->msg[F]->flags; sflags = srec->msg[xt^1]->flags;
aflags = (sflags & ~srec->flags) & (F_SEEN | F_FLAGGED); aflags = (sflags & ~srec->flags) & (F_SEEN | F_FLAGGED);
dflags = (~sflags & srec->flags) & F_SEEN; dflags = (~sflags & srec->flags) & F_SEEN;
nflags = (nflags & (~(F_SEEN | F_FLAGGED) | (srec->flags & F_SEEN)) & ~dflags) | aflags; nflags = (nflags & (~(F_SEEN | F_FLAGGED) | (srec->flags & F_SEEN)) & ~dflags) | aflags;
} }
nflags = (nflags | srec->aflags[N]) & ~srec->dflags[N]; nflags = (nflags | srec->aflags[xt]) & ~srec->dflags[xt];
} else { } else {
if (srec->status & S_UPGRADE) { if (srec->status & S_UPGRADE) {
// The dummy's F & S flags are mostly masked out anyway, // The dummy's F & S flags are mostly masked out anyway,
// but we may be pulling in the real ones. // but we may be pulling in the real ones.
nflags = (srec->pflags | srec->aflags[N]) & ~srec->dflags[N]; nflags = (srec->pflags | srec->aflags[xt]) & ~srec->dflags[xt];
} else { } else {
nflags = srec->msg[F]->flags; nflags = srec->msg[xt^1]->flags;
} }
} }
if (!(nflags & F_DELETED) || (srec->status & (S_EXPIRE | S_EXPIRED))) { if (!(nflags & F_DELETED) || (srec->status & (S_EXPIRE | S_EXPIRED))) {
@ -1216,7 +1230,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
} }
// Sort such that the messages which have been in the // Sort such that the messages which have been in the
// complete store longest expire first. // complete store longest expire first.
qsort( arecs, alive, sizeof(*arecs), cmp_srec_far ); qsort( arecs, alive, sizeof(*arecs), (xt == F) ? cmp_srec_near : cmp_srec_far );
int todel = alive - svars->chan->max_messages; int todel = alive - svars->chan->max_messages;
debug( "%d alive messages, %d excess - expiring\n", alive, todel ); debug( "%d alive messages, %d excess - expiring\n", alive, todel );
int unseen = 0; int unseen = 0;
@ -1230,7 +1244,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
todel--; todel--;
} else if (todel > 0 || } else if (todel > 0 ||
((srec->status & (S_EXPIRE | S_EXPIRED)) == (S_EXPIRE | S_EXPIRED)) || ((srec->status & (S_EXPIRE | S_EXPIRED)) == (S_EXPIRE | S_EXPIRED)) ||
((srec->status & (S_EXPIRE | S_EXPIRED)) && (srec->msg[N]->flags & F_DELETED))) { ((srec->status & (S_EXPIRE | S_EXPIRED)) && (srec->msg[xt]->flags & F_DELETED))) {
/* The message is excess or was already (being) expired. */ /* The message is excess or was already (being) expired. */
srec->status |= S_NEXPIRE; srec->status |= S_NEXPIRE;
debug( " expiring pair(%u,%u)\n", srec->uid[F], srec->uid[N] ); debug( " expiring pair(%u,%u)\n", srec->uid[F], srec->uid[N] );
@ -1241,7 +1255,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
if (svars->chan->expire_unread < 0 && unseen * 2 > svars->chan->max_messages) { if (svars->chan->expire_unread < 0 && unseen * 2 > svars->chan->max_messages) {
error( "%s: %d unread messages in excess of MaxMessages (%d).\n" error( "%s: %d unread messages in excess of MaxMessages (%d).\n"
"Please set ExpireUnread to decide outcome. Skipping mailbox.\n", "Please set ExpireUnread to decide outcome. Skipping mailbox.\n",
svars->orig_name[N], unseen, svars->chan->max_messages ); svars->orig_name[xt], unseen, svars->chan->max_messages );
svars->ret |= SYNC_FAIL; svars->ret |= SYNC_FAIL;
cancel_sync( svars ); cancel_sync( svars );
return; return;
@ -1272,9 +1286,9 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
// If we have so many new messages that some of them are instantly expired, // If we have so many new messages that some of them are instantly expired,
// but some are still propagated because they are important, we need to // but some are still propagated because they are important, we need to
// ensure explicitly that the bulk fetch limit is upped. // ensure explicitly that the bulk fetch limit is upped.
if (svars->maxxfuid < srec->uid[F]) if (svars->maxxfuid < srec->uid[xt^1])
svars->maxxfuid = srec->uid[F]; svars->maxxfuid = srec->uid[xt^1];
srec->msg[F]->srec = NULL; srec->msg[xt^1]->srec = NULL;
} }
} }
} }
@ -1307,7 +1321,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
} }
} else { } else {
/* The trigger is an expiration transaction being ongoing ... */ /* The trigger is an expiration transaction being ongoing ... */
if ((t == N) && ((shifted_bit(srec->status, S_EXPIRE, S_EXPIRED) ^ srec->status) & S_EXPIRED)) { if ((t == xt) && ((shifted_bit(srec->status, S_EXPIRE, S_EXPIRED) ^ srec->status) & S_EXPIRED)) {
// ... but the actual action derives from the wanted state - // ... but the actual action derives from the wanted state -
// so that canceled transactions are rolled back as well. // so that canceled transactions are rolled back as well.
if (srec->status & S_NEXPIRE) if (srec->status & S_NEXPIRE)
@ -1529,14 +1543,14 @@ flags_set_p2( sync_vars_t *svars, sync_rec_t *srec, int t )
(str_hl[t], fmt_lone_flags( nflags ).str, fmt_lone_flags( srec->flags ).str) ); (str_hl[t], fmt_lone_flags( nflags ).str, fmt_lone_flags( srec->flags ).str) );
srec->flags = nflags; srec->flags = nflags;
} }
if (t == N) { if (t == svars->chan->expire_side) {
uchar ex = (srec->status / S_EXPIRE) & 1; uchar ex = (srec->status / S_EXPIRE) & 1;
uchar exd = (srec->status / S_EXPIRED) & 1; uchar exd = (srec->status / S_EXPIRED) & 1;
if (ex != exd) { if (ex != exd) {
uchar nex = (srec->status / S_NEXPIRE) & 1; uchar nex = (srec->status / S_NEXPIRE) & 1;
if (nex == ex) { if (nex == ex) {
if (nex && svars->maxxfuid < srec->uid[F]) if (nex && svars->maxxfuid < srec->uid[t^1])
svars->maxxfuid = srec->uid[F]; svars->maxxfuid = srec->uid[t^1];
srec->status = (srec->status & ~S_EXPIRED) | (nex * S_EXPIRED); srec->status = (srec->status & ~S_EXPIRED) | (nex * S_EXPIRED);
JLOG( "~ %u %u %d", (srec->uid[F], srec->uid[N], srec->status & S_LOGGED), JLOG( "~ %u %u %d", (srec->uid[F], srec->uid[N], srec->status & S_LOGGED),
"expired %d - commit", nex ); "expired %d - commit", nex );
@ -1582,6 +1596,7 @@ msgs_flags_set( sync_vars_t *svars, int t )
only_solo = 0; only_solo = 0;
else else
goto skip; goto skip;
int xt = svars->chan->expire_side;
int expunge_other = (svars->chan->ops[t^1] & OP_EXPUNGE); int expunge_other = (svars->chan->ops[t^1] & OP_EXPUNGE);
// Driver-wise, this makes sense only if (svars->opts[t] & OPEN_UID_EXPUNGE), // Driver-wise, this makes sense only if (svars->opts[t] & OPEN_UID_EXPUNGE),
// but the trashing loop uses the result as well. // but the trashing loop uses the result as well.
@ -1603,7 +1618,7 @@ msgs_flags_set( sync_vars_t *svars, int t )
debugn( "(orphaned) " ); debugn( "(orphaned) " );
} else if (expunge_other && (srec->status & S_DEL(t^1))) { } else if (expunge_other && (srec->status & S_DEL(t^1))) {
debugn( "(orphaning) " ); debugn( "(orphaning) " );
} else if (t == N && (srec->status & (S_EXPIRE | S_EXPIRED))) { } else if (t == xt && (srec->status & (S_EXPIRE | S_EXPIRED))) {
// Expiration overrides mirroring, as otherwise the combination // Expiration overrides mirroring, as otherwise the combination
// makes no sense at all. // makes no sense at all.
debugn( "(expire) " ); debugn( "(expire) " );
@ -1644,7 +1659,7 @@ msgs_flags_set( sync_vars_t *svars, int t )
} }
debugn( " message %u ", tmsg->uid ); debugn( " message %u ", tmsg->uid );
if ((srec = tmsg->srec)) { if ((srec = tmsg->srec)) {
if (t == N && (srec->status & (S_EXPIRE | S_EXPIRED))) { if (t == xt && (srec->status & (S_EXPIRE | S_EXPIRED))) {
// Don't trash messages that are deleted only due to expiring. // Don't trash messages that are deleted only due to expiring.
// However, this is an unlikely configuration to start with ... // However, this is an unlikely configuration to start with ...
debug( "is expired\n" ); debug( "is expired\n" );
@ -1814,12 +1829,17 @@ box_closed_p2( sync_vars_t *svars, int t )
} }
debug( "purging obsolete entries\n" ); debug( "purging obsolete entries\n" );
int xt = svars->chan->expire_side;
for (srec = svars->srecs; srec; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
continue; continue;
if (!srec->uid[N] || (srec->status & S_GONE(N))) { if ((srec->status & S_EXPIRED) &&
if (!srec->uid[F] || (srec->status & S_GONE(F)) || (!srec->uid[xt] || (srec->status & S_GONE(xt))) &&
((srec->status & S_EXPIRED) && svars->maxuid[F] >= srec->uid[F] && svars->maxxfuid >= srec->uid[F])) { svars->maxuid[xt^1] >= srec->uid[xt^1] && svars->maxxfuid >= srec->uid[xt^1]) {
PC_JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing expired" );
srec->status = S_DEAD;
} else if (!srec->uid[N] || (srec->status & S_GONE(N))) {
if (!srec->uid[F] || (srec->status & S_GONE(F))) {
PC_JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing" ); PC_JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing" );
srec->status = S_DEAD; srec->status = S_DEAD;
} else if (srec->uid[N] && (srec->status & S_DEL(F))) { } else if (srec->uid[N] && (srec->status & S_DEL(F))) {

View File

@ -57,6 +57,7 @@ typedef struct channel_conf {
string_list_t *patterns; string_list_t *patterns;
int ops[2]; int ops[2];
int max_messages; // For near side only. int max_messages; // For near side only.
int expire_side;
signed char expire_unread; signed char expire_unread;
char use_internal_date; char use_internal_date;
} channel_conf_t; } channel_conf_t;

View File

@ -11,8 +11,8 @@
BIT_ENUM( BIT_ENUM(
S_DEAD, // ephemeral: the entry was killed and should be ignored S_DEAD, // ephemeral: the entry was killed and should be ignored
S_EXPIRE, // the entry is being expired (near side message removal scheduled) S_EXPIRE, // the entry is being expired (expire-side message removal scheduled)
S_EXPIRED, // the entry is expired (near side message removal confirmed) S_EXPIRED, // the entry is expired (expire-side message removal confirmed)
S_NEXPIRE, // temporary: new expiration state S_NEXPIRE, // temporary: new expiration state
S_PENDING, // the entry is new and awaits propagation (possibly a retry) S_PENDING, // the entry is new and awaits propagation (possibly a retry)
S_DUMMY(2), // f/n message is only a placeholder S_DUMMY(2), // f/n message is only a placeholder
@ -62,7 +62,7 @@ typedef struct {
uint uidval[2]; // UID validity value uint uidval[2]; // UID validity value
uint newuidval[2]; // UID validity obtained from driver uint newuidval[2]; // UID validity obtained from driver
uint finduid[2]; // TUID lookup makes sense only for UIDs >= this uint finduid[2]; // TUID lookup makes sense only for UIDs >= this
uint maxxfuid; // highest expired UID on far side uint maxxfuid; // highest expired UID on full side
uchar good_flags[2], bad_flags[2], can_crlf[2]; uchar good_flags[2], bad_flags[2], can_crlf[2];
} sync_vars_t; } sync_vars_t;

View File

@ -126,6 +126,7 @@ load_state( sync_vars_t *svars )
char fbuf[16]; // enlarge when support for keywords is added char fbuf[16]; // enlarge when support for keywords is added
char buf[128], buf1[64], buf2[64]; char buf[128], buf1[64], buf2[64];
int xt = svars->chan->expire_side;
if ((jfp = fopen( svars->dname, "r" ))) { if ((jfp = fopen( svars->dname, "r" ))) {
if (!lock_state( svars )) if (!lock_state( svars ))
goto jbail; goto jbail;
@ -148,6 +149,8 @@ load_state( sync_vars_t *svars )
error( "Error: invalid sync state header in %s\n", svars->dname ); error( "Error: invalid sync state header in %s\n", svars->dname );
goto jbail; goto jbail;
} }
if (maxxnuid && xt != N)
goto sidefail;
goto gothdr; goto gothdr;
} }
uint uid; uint uid;
@ -164,8 +167,19 @@ load_state( sync_vars_t *svars )
} else if (!strcmp( buf1, "MaxPushedUid" )) { } else if (!strcmp( buf1, "MaxPushedUid" )) {
svars->maxuid[N] = uid; svars->maxuid[N] = uid;
} else if (!strcmp( buf1, "MaxExpiredFarUid" ) || !strcmp( buf1, "MaxExpiredMasterUid" ) /* Pre-1.4 legacy */) { } else if (!strcmp( buf1, "MaxExpiredFarUid" ) || !strcmp( buf1, "MaxExpiredMasterUid" ) /* Pre-1.4 legacy */) {
if (xt != N) {
sidefail:
error( "Error: state file %s does not match ExpireSide setting\n", svars->dname );
goto jbail;
}
svars->maxxfuid = uid;
} else if (!strcmp( buf1, "MaxExpiredNearUid" )) {
if (xt != F)
goto sidefail;
svars->maxxfuid = uid; svars->maxxfuid = uid;
} else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) { // Pre-1.3 legacy } else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) { // Pre-1.3 legacy
if (xt != N)
goto sidefail;
maxxnuid = uid; maxxnuid = uid;
} else { } else {
error( "Error: unrecognized sync state header entry at %s:%d\n", svars->dname, line ); error( "Error: unrecognized sync state header entry at %s:%d\n", svars->dname, line );
@ -397,8 +411,8 @@ load_state( sync_vars_t *svars )
break; break;
case '~': case '~':
srec->status = (srec->status & ~S_LOGGED) | t3; srec->status = (srec->status & ~S_LOGGED) | t3;
if ((srec->status & S_EXPIRED) && svars->maxxfuid < srec->uid[F]) if ((srec->status & S_EXPIRED) && svars->maxxfuid < srec->uid[xt^1])
svars->maxxfuid = srec->uid[F]; svars->maxxfuid = srec->uid[xt^1];
debug( "status now %s\n", fmt_sts( srec->status ).str ); debug( "status now %s\n", fmt_sts( srec->status ).str );
break; break;
case '_': case '_':
@ -490,7 +504,9 @@ save_state( sync_vars_t *svars )
"FarUidValidity %u\nNearUidValidity %u\nMaxPulledUid %u\nMaxPushedUid %u\n", "FarUidValidity %u\nNearUidValidity %u\nMaxPulledUid %u\nMaxPushedUid %u\n",
svars->uidval[F], svars->uidval[N], svars->maxuid[F], svars->maxuid[N] ); svars->uidval[F], svars->uidval[N], svars->maxuid[F], svars->maxuid[N] );
if (svars->maxxfuid) if (svars->maxxfuid)
Fprintf( svars->nfp, "MaxExpiredFarUid %u\n", svars->maxxfuid ); Fprintf( svars->nfp,
svars->chan->expire_side == N ? "MaxExpiredFarUid %u\n" : "MaxExpiredNearUid %u\n",
svars->maxxfuid );
Fprintf( svars->nfp, "\n" ); Fprintf( svars->nfp, "\n" );
for (sync_rec_t *srec = svars->srecs; srec; srec = srec->next) { for (sync_rec_t *srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)