create placeholders for messages over MaxSize

this is vastly more useful than just omitting the messages with no
indication at all.
This commit is contained in:
Oswald Buddenhagen 2019-12-29 14:37:53 +01:00
parent 68a412115a
commit 70bad66129
8 changed files with 315 additions and 84 deletions

2
NEWS
View File

@ -15,6 +15,8 @@ The IMAP user query can be scripted now.
Added built-in support for macOS Keychain. Added built-in support for macOS Keychain.
Messages excluded by MaxSize will now result in placeholders.
The use of Master/Slave terminology has been deprecated. The use of Master/Slave terminology has been deprecated.
[1.3.0] [1.3.0]

5
TODO
View File

@ -61,9 +61,8 @@ messages.
use MULTIAPPEND and FETCH with multiple messages. use MULTIAPPEND and FETCH with multiple messages.
create dummies describing MIME structure of messages bigger than MaxSize. dummy messages resulting from MaxSize should contain a dump of the original
flagging the dummy would fetch the real message. possibly remove --renew. message's MIME structure and its (reasonably sized) text parts.
note that all interaction needs to happen on the near side probably.
don't SELECT boxes unless really needed; in particular not for appending, don't SELECT boxes unless really needed; in particular not for appending,
and in write-only mode not before changes are made. and in write-only mode not before changes are made.

View File

@ -80,7 +80,6 @@ typedef struct message {
#define OPEN_OLD (1<<0) // Paired messages *in* this store. #define OPEN_OLD (1<<0) // Paired messages *in* this store.
#define OPEN_NEW (1<<1) // Messages (possibly) not yet propagated *from* this store. #define OPEN_NEW (1<<1) // Messages (possibly) not yet propagated *from* this store.
#define OPEN_FLAGS (1<<2) // Note that fetch_msg() gets the flags regardless. #define OPEN_FLAGS (1<<2) // Note that fetch_msg() gets the flags regardless.
#define OPEN_OLD_SIZE (1<<3)
#define OPEN_NEW_SIZE (1<<4) #define OPEN_NEW_SIZE (1<<4)
#define OPEN_EXPUNGE (1<<5) #define OPEN_EXPUNGE (1<<5)
#define OPEN_SETFLAGS (1<<6) #define OPEN_SETFLAGS (1<<6)
@ -217,8 +216,10 @@ struct driver {
void (*load_box)( store_t *ctx, uint minuid, uint maxuid, uint finduid, uint pairuid, uint newuid, uint_array_t excs, void (*load_box)( store_t *ctx, uint minuid, uint maxuid, uint finduid, uint pairuid, uint newuid, uint_array_t excs,
void (*cb)( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux ), void *aux ); void (*cb)( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux ), void *aux );
/* Fetch the contents and flags of the given message from the current mailbox. */ /* Fetch the contents and flags of the given message from the current mailbox.
void (*fetch_msg)( store_t *ctx, message_t *msg, msg_data_t *data, * If minimal is non-zero, fetch only a placeholder for the requested message -
* ideally, this is precisely the header, but it may be more. */
void (*fetch_msg)( store_t *ctx, message_t *msg, msg_data_t *data, int minimal,
void (*cb)( int sts, void *aux ), void *aux ); void (*cb)( int sts, void *aux ), void *aux );
/* Store the given message to either the current mailbox or the trash folder. /* Store the given message to either the current mailbox or the trash folder.

View File

@ -1098,7 +1098,7 @@ parse_fetch_rsp( imap_store_t *ctx, list_t *list, char *s ATTR_UNUSED )
error( "IMAP error: unable to parse RFC822.SIZE\n" ); error( "IMAP error: unable to parse RFC822.SIZE\n" );
goto ffail; goto ffail;
} }
} else if (!strcmp( "BODY[]", name )) { } else if (!strcmp( "BODY[]", name ) || !strcmp( "BODY[HEADER]", name )) {
if (!is_atom( tmp )) { if (!is_atom( tmp )) {
error( "IMAP error: unable to parse BODY[]\n" ); error( "IMAP error: unable to parse BODY[]\n" );
goto ffail; goto ffail;
@ -2714,9 +2714,8 @@ imap_load_box( store_t *gctx, uint minuid, uint maxuid, uint finduid, uint pairu
ranges[0].last = maxuid; ranges[0].last = maxuid;
ranges[0].flags = 0; ranges[0].flags = 0;
uint nranges = 1; uint nranges = 1;
if (ctx->opts & (OPEN_OLD_SIZE | OPEN_NEW_SIZE)) if (ctx->opts & OPEN_NEW_SIZE)
imap_set_range( ranges, &nranges, shifted_bit( ctx->opts, OPEN_OLD_SIZE, WantSize), imap_set_range( ranges, &nranges, 0, WantSize, newuid );
shifted_bit( ctx->opts, OPEN_NEW_SIZE, WantSize), newuid );
if (ctx->opts & OPEN_FIND) if (ctx->opts & OPEN_FIND)
imap_set_range( ranges, &nranges, 0, WantTuids, finduid - 1 ); imap_set_range( ranges, &nranges, 0, WantTuids, finduid - 1 );
if (ctx->opts & OPEN_OLD_IDS) if (ctx->opts & OPEN_OLD_IDS)
@ -2811,7 +2810,7 @@ imap_submit_load_p3( imap_store_t *ctx, imap_load_box_state_t *sts )
static void imap_fetch_msg_p2( imap_store_t *, imap_cmd_t *, int ); static void imap_fetch_msg_p2( imap_store_t *, imap_cmd_t *, int );
static void static void
imap_fetch_msg( store_t *ctx, message_t *msg, msg_data_t *data, imap_fetch_msg( store_t *ctx, message_t *msg, msg_data_t *data, int minimal,
void (*cb)( int sts, void *aux ), void *aux ) void (*cb)( int sts, void *aux ), void *aux )
{ {
imap_cmd_fetch_msg_t *cmd; imap_cmd_fetch_msg_t *cmd;
@ -2821,9 +2820,10 @@ imap_fetch_msg( store_t *ctx, message_t *msg, msg_data_t *data,
cmd->msg_data = data; cmd->msg_data = data;
data->data = NULL; data->data = NULL;
imap_exec( (imap_store_t *)ctx, &cmd->gen.gen, imap_fetch_msg_p2, imap_exec( (imap_store_t *)ctx, &cmd->gen.gen, imap_fetch_msg_p2,
"UID FETCH %u (%s%sBODY.PEEK[])", msg->uid, "UID FETCH %u (%s%sBODY.PEEK[%s])", msg->uid,
!(msg->status & M_FLAGS) ? "FLAGS " : "", !(msg->status & M_FLAGS) ? "FLAGS " : "",
(data->date== -1) ? "INTERNALDATE " : "" ); (data->date== -1) ? "INTERNALDATE " : "",
minimal ? "HEADER" : "" );
} }
static void static void

View File

@ -1139,7 +1139,7 @@ maildir_scan( maildir_store_t *ctx, msg_t_array_alloc_t *msglist )
free( entry->base ); free( entry->base );
entry->base = nfstrndup( buf + bl + 4, (size_t)fnl ); entry->base = nfstrndup( buf + bl + 4, (size_t)fnl );
} }
int want_size = (uid > ctx->newuid) ? (ctx->opts & OPEN_NEW_SIZE) : (ctx->opts & OPEN_OLD_SIZE); int want_size = ((ctx->opts & OPEN_NEW_SIZE) && uid > ctx->newuid);
int want_tuid = ((ctx->opts & OPEN_FIND) && uid >= ctx->finduid); int want_tuid = ((ctx->opts & OPEN_FIND) && uid >= ctx->finduid);
int want_msgid = ((ctx->opts & OPEN_OLD_IDS) && uid <= ctx->pairuid); int want_msgid = ((ctx->opts & OPEN_OLD_IDS) && uid <= ctx->pairuid);
if (!want_size && !want_tuid && !want_msgid) if (!want_size && !want_tuid && !want_msgid)
@ -1533,7 +1533,7 @@ maildir_again( maildir_store_t *ctx, maildir_message_t *msg, const char *err, ..
} }
static void static void
maildir_fetch_msg( store_t *gctx, message_t *gmsg, msg_data_t *data, maildir_fetch_msg( store_t *gctx, message_t *gmsg, msg_data_t *data, int minimal ATTR_UNUSED,
void (*cb)( int sts, void *aux ), void *aux ) void (*cb)( int sts, void *aux ), void *aux )
{ {
maildir_store_t *ctx = (maildir_store_t *)gctx; maildir_store_t *ctx = (maildir_store_t *)gctx;

View File

@ -163,8 +163,16 @@ directory.
. .
.TP .TP
\fBMaxSize\fR \fIsize\fR[\fBk\fR|\fBm\fR][\fBb\fR] \fBMaxSize\fR \fIsize\fR[\fBk\fR|\fBm\fR][\fBb\fR]
Messages larger than that will not be propagated into this Store. Messages larger than \fIsize\fR will have only a small placeholder message
This is useful for weeding out messages with large attachments. propagated into this Store. To propagate the full message, it must be
flagged in either Store; that can be done retroactively, in which case
the \fBReNew\fR operation needs to be executed instead of \fBNew\fR.
This is useful for avoiding downloading messages with large attachments
unless they are actually needed.
Caveat: Setting a size limit on a Store you never read directly (which is
typically the case for servers) is not recommended, as you may never
notice that affected messages were not propagated to it.
.br
\fBK\fR and \fBM\fR can be appended to the size to specify KiBytes resp. \fBK\fR and \fBM\fR can be appended to the size to specify KiBytes resp.
MeBytes instead of bytes. \fBB\fR is accepted but superfluous. MeBytes instead of bytes. \fBB\fR is accepted but superfluous.
If \fIsize\fR is 0, the maximum message size is \fBunlimited\fR. If \fIsize\fR is 0, the maximum message size is \fBunlimited\fR.
@ -574,9 +582,8 @@ Select the synchronization operation(s) to perform:
.br .br
\fBNew\fR - propagate newly appeared messages. \fBNew\fR - propagate newly appeared messages.
.br .br
\fBReNew\fR - previously refused messages are re-evaluated for propagation. \fBReNew\fR - upgrade placeholders to full messages. Useful only with
Useful after flagging affected messages in the source Store or enlarging a configured \fBMaxSize\fR.
MaxSize in the destination Store.
.br .br
\fBDelete\fR - propagate message deletions. This applies only to messages that \fBDelete\fR - propagate message deletions. This applies only to messages that
are actually gone, i.e., were expunged. The affected messages in the remote are actually gone, i.e., were expunged. The affected messages in the remote

View File

@ -161,29 +161,75 @@ my @x10 = (
], ],
); );
my @O11 = ("MaxSize 1k\n", "MaxSize 1k\n", ""); my @O11 = ("MaxSize 1k\n", "MaxSize 1k\n", "Expunge Near");
#show("10", "11", "11"); #show("10", "11", "11");
my @X11 = ( my @X11 = (
[ 2, [ 3,
A, 1, "", B, 2, "*" ], A, 1, "", B, 2, "*", C, 3, "?" ],
[ 2, [ 3,
C, 1, "*", A, 2, "" ], C, 1, "*", A, 2, "", B, 3, "?" ],
[ 2, 0, 2, [ 3, 0, 3,
0, 1, "^", 1, 2, "", 2, 0, "^" ], 3, 1, "<", 1, 2, "", 2, 3, ">" ],
); );
test("max size", \@x10, \@X11, @O11); test("max size", \@x10, \@X11, @O11);
my @O22 = ("", "MaxSize 1k\n", ""); my @x22 = (
#show("11", "22", "22");
my @X22 = (
[ 3, [ 3,
A, 1, "", B, 2, "*", C, 3, "*" ], A, 1, "", B, 2, "*", C, 3, "?" ],
[ 2, [ 3,
C, 1, "*", A, 2, "" ], C, 1, "F*", A, 2, "", B, 3, "F?" ],
[ 3, 0, 2, [ 3, 0, 3,
3, 1, "", 1, 2, "", 2, 0, "^" ], 3, 1, "<", 1, 2, "", 2, 3, ">" ],
); );
test("near side max size", \@X11, \@X22, @O22);
#show("22", "22", "11");
my @X22 = (
[ 4,
A, 1, "", B, 2, "*", C, 3, "T?", C, 4, "F*" ],
[ 4,
C, 1, "F*", A, 2, "", B, 4, "*" ],
[ 4, 0, 4,
4, 1, "F", 3, 0, "T", 1, 2, "", 2, 4, "" ],
);
test("max size + flagging", \@x22, \@X22, @O11);
my @x23 = (
[ 2,
A, 1, "", B, 2, "F*" ],
[ 1,
C, 1, "F*" ],
[ 0, 0, 0,
],
);
my @X23 = (
[ 3,
A, 1, "", B, 2, "F*", C, 3, "F*" ],
[ 3,
C, 1, "F*", A, 2, "", B, 3, "F*" ],
[ 3, 0, 3,
3, 1, "F", 1, 2, "", 2, 3, "F" ]
);
test("max size + initial flagging", \@x23, \@X23, @O11);
my @x24 = (
[ 3,
A, 1, "", B, 2, "*", C, 3, "F*" ],
[ 1,
A, 1, "" ],
[ 3, 0, 1,
1, 1, "", 2, 0, "^", 3, 0, "^" ],
);
my @X24 = (
[ 3,
A, 1, "", B, 2, "*", C, 3, "F*" ],
[ 3,
A, 1, "", B, 2, "?", C, 3, "F*" ],
[ 3, 0, 3,
1, 1, "", 2, 2, ">", 3, 3, "F" ],
);
test("max size (pre-1.4 legacy)", \@x24, \@X24, @O11);
# expiration tests # expiration tests
@ -329,7 +375,7 @@ sub readbox($)
for my $d ("cur", "new") { for my $d ("cur", "new") {
opendir(DIR, $bn."/".$d) or next; opendir(DIR, $bn."/".$d) or next;
for my $f (grep(!/^\.\.?$/, readdir(DIR))) { for my $f (grep(!/^\.\.?$/, readdir(DIR))) {
my ($uid, $flg, $num); my ($uid, $flg, $ph, $num);
if ($f =~ /^\d+\.\d+_\d+\.[-[:alnum:]]+,U=(\d+):2,(.*)$/) { if ($f =~ /^\d+\.\d+_\d+\.[-[:alnum:]]+,U=(\d+):2,(.*)$/) {
($uid, $flg) = ($1, $2); ($uid, $flg) = ($1, $2);
} else { } else {
@ -339,7 +385,7 @@ sub readbox($)
open(FILE, "<", $bn."/".$d."/".$f) or die "Cannot read message '$f' in '$bn'.\n"; open(FILE, "<", $bn."/".$d."/".$f) or die "Cannot read message '$f' in '$bn'.\n";
my $sz = 0; my $sz = 0;
while (<FILE>) { while (<FILE>) {
/^Subject: (\d+)$/ && ($num = $1); /^Subject: (\[placeholder\] )?(\d+)$/ && ($ph = defined($1), $num = $2);
$sz += length($_); $sz += length($_);
} }
close FILE; close FILE;
@ -347,7 +393,7 @@ sub readbox($)
print STDERR "message '$f' in '$bn' has no identifier.\n"; print STDERR "message '$f' in '$bn' has no identifier.\n";
exit 1; exit 1;
} }
@{ $ms{$uid} } = ($num, $flg.($sz>1000?"*":"")); @{ $ms{$uid} } = ($num, $flg.($sz>1000?"*":"").($ph?"?":""));
} }
} }
return ($mu, %ms); return ($mu, %ms);
@ -455,9 +501,10 @@ sub mkbox($$@)
while (@ms) { while (@ms) {
my ($num, $uid, $flg) = (shift @ms, shift @ms, shift @ms); my ($num, $uid, $flg) = (shift @ms, shift @ms, shift @ms);
my $big = $flg =~ s/\*//; my $big = $flg =~ s/\*//;
my $ph = $flg =~ s/\?//;
open(FILE, ">", $bn."/".($flg =~ /S/ ? "cur" : "new")."/0.1_".$num.".local,U=".$uid.":2,".$flg) or open(FILE, ">", $bn."/".($flg =~ /S/ ? "cur" : "new")."/0.1_".$num.".local,U=".$uid.":2,".$flg) or
die "Cannot create message ".mn($num)." in mailbox $bn.\n"; die "Cannot create message ".mn($num)." in mailbox $bn.\n";
print FILE "From: foo\nTo: bar\nDate: Thu, 1 Jan 1970 00:00:00 +0000\nSubject: $num\n\n".(("A"x50)."\n")x($big*30); print FILE "From: foo\nTo: bar\nDate: Thu, 1 Jan 1970 00:00:00 +0000\nSubject: ".($ph?"[placeholder] ":"").$num."\n\n".(("A"x50)."\n")x($big*30);
close FILE; close FILE;
} }
} }

View File

@ -136,20 +136,23 @@ make_flags( uchar flags, char *buf )
#define S_EXPIRE (1<<0) // the entry is being expired (near side message removal scheduled) #define S_EXPIRE (1<<0) // the entry is being expired (near side message removal scheduled)
#define S_EXPIRED (1<<1) // the entry is expired (near side message removal confirmed) #define S_EXPIRED (1<<1) // the entry is expired (near side message removal confirmed)
#define S_PENDING (1<<2) // the entry is new and awaits propagation (possibly a retry) #define S_PENDING (1<<2) // the entry is new and awaits propagation (possibly a retry)
#define S_SKIPPED (1<<3) // the entry was not propagated (message is too big) #define S_DUMMY(fn) (1<<(3+(fn))) // f/n message is only a placeholder
#define S_SKIPPED (1<<5) // pre-1.4 legacy: the entry was not propagated (message is too big)
#define S_DEAD (1<<7) // ephemeral: the entry was killed and should be ignored #define S_DEAD (1<<7) // ephemeral: the entry was killed and should be ignored
// Ephemeral working set. // Ephemeral working set.
#define W_NEXPIRE (1<<0) // temporary: new expiration state #define W_NEXPIRE (1<<0) // temporary: new expiration state
#define W_DELETE (1<<1) // ephemeral: flags propagation is a deletion #define W_DELETE (1<<1) // ephemeral: flags propagation is a deletion
#define W_DEL(fn) (1<<(2+(fn))) // ephemeral: f/n message would be subject to expunge #define W_DEL(fn) (1<<(2+(fn))) // ephemeral: f/n message would be subject to expunge
#define W_UPGRADE (1<<4) // ephemeral: upgrading placeholder, do not apply MaxSize
#define W_PURGE (1<<5) // ephemeral: placeholder is being nuked
typedef struct sync_rec { typedef struct sync_rec {
struct sync_rec *next; struct sync_rec *next;
/* string_list_t *keywords; */ /* string_list_t *keywords; */
uint uid[2]; uint uid[2];
message_t *msg[2]; message_t *msg[2];
uchar status, wstate, flags, aflags[2], dflags[2]; uchar status, wstate, flags, pflags, aflags[2], dflags[2];
char tuid[TUIDL]; char tuid[TUIDL];
} sync_rec_t; } sync_rec_t;
@ -274,6 +277,7 @@ assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid )
if (uid == svars->maxuid[t] + 1) if (uid == svars->maxuid[t] + 1)
svars->maxuid[t] = uid; svars->maxuid[t] = uid;
srec->status &= ~S_PENDING; srec->status &= ~S_PENDING;
srec->wstate &= ~W_UPGRADE;
srec->tuid[0] = 0; srec->tuid[0] = 0;
} }
@ -353,6 +357,7 @@ typedef struct copy_vars {
sync_rec_t *srec; /* also ->tuid */ sync_rec_t *srec; /* also ->tuid */
message_t *msg; message_t *msg;
msg_data_t data; msg_data_t data;
int minimal;
} copy_vars_t; } copy_vars_t;
static void msg_fetched( int sts, void *aux ); static void msg_fetched( int sts, void *aux );
@ -365,7 +370,7 @@ copy_msg( copy_vars_t *vars )
t ^= 1; t ^= 1;
vars->data.flags = vars->msg->flags; vars->data.flags = vars->msg->flags;
vars->data.date = svars->chan->use_internal_date ? -1 : 0; vars->data.date = svars->chan->use_internal_date ? -1 : 0;
svars->drv[t]->fetch_msg( svars->ctx[t], vars->msg, &vars->data, msg_fetched, vars ); svars->drv[t]->fetch_msg( svars->ctx[t], vars->msg, &vars->data, vars->minimal, msg_fetched, vars );
} }
static void msg_stored( int sts, uint uid, void *aux ); static void msg_stored( int sts, uint uid, void *aux );
@ -405,8 +410,10 @@ copy_msg_convert( int in_cr, int out_cr, copy_vars_t *vars )
{ {
char *in_buf = vars->data.data; char *in_buf = vars->data.data;
uint in_len = vars->data.len; uint in_len = vars->data.len;
uint idx = 0, sbreak = 0, ebreak = 0; uint idx = 0, sbreak = 0, ebreak = 0, break2 = 0;
uint lines = 0, hdr_crs = 0, bdy_crs = 0, app_cr = 0, extra = 0; uint lines = 0, hdr_crs = 0, bdy_crs = 0, app_cr = 0, extra = 0;
uint add_subj = 0;
if (vars->srec) { if (vars->srec) {
nloop: ; nloop: ;
uint start = idx; uint start = idx;
@ -416,14 +423,29 @@ copy_msg_convert( int in_cr, int out_cr, copy_vars_t *vars )
if (c == '\r') { if (c == '\r') {
line_crs++; line_crs++;
} else if (c == '\n') { } else if (c == '\n') {
if (starts_with_upper( in_buf + start, (int)(in_len - start), "X-TUID: ", 8 )) { if (!ebreak && starts_with_upper( in_buf + start, (int)(in_len - start), "X-TUID: ", 8 )) {
extra = (sbreak = start) - (ebreak = idx); extra = (sbreak = start) - (ebreak = idx);
goto oke; if (!vars->minimal)
goto oke;
} else {
if (!break2 && vars->minimal && !strncasecmp( in_buf + start, "Subject:", 8 )) {
break2 = start + 8;
if (in_buf[break2] == ' ')
break2++;
}
lines++;
hdr_crs += line_crs;
} }
lines++;
hdr_crs += line_crs;
if (idx - line_crs - 1 == start) { if (idx - line_crs - 1 == start) {
sbreak = ebreak = start; if (!ebreak)
sbreak = ebreak = start;
if (vars->minimal) {
in_len = idx;
if (!break2) {
break2 = start;
add_subj = 1;
}
}
goto oke; goto oke;
} }
goto nloop; goto nloop;
@ -449,10 +471,36 @@ copy_msg_convert( int in_cr, int out_cr, copy_vars_t *vars )
extra += lines; extra += lines;
} }
uint dummy_msg_len = 0;
char dummy_msg_buf[180];
static const char dummy_pfx[] = "[placeholder] ";
static const char dummy_subj[] = "Subject: [placeholder] (No Subject)";
static const char dummy_msg[] =
"Having a size of %s, this message is over the MaxSize limit.%s"
"Flag it and sync again (Sync mode ReNew) to fetch its real contents.%s";
if (vars->minimal) {
char sz[32];
if (vars->msg->size < 1024000)
sprintf( sz, "%dKiB", (int)(vars->msg->size >> 10) );
else
sprintf( sz, "%.1fMiB", vars->msg->size / 1048576. );
const char *nl = app_cr ? "\r\n" : "\n";
dummy_msg_len = (uint)sprintf( dummy_msg_buf, dummy_msg, sz, nl, nl );
extra += dummy_msg_len;
extra += add_subj ? strlen(dummy_subj) + app_cr + 1 : strlen(dummy_pfx);
}
vars->data.len = in_len + extra; vars->data.len = in_len + extra;
char *out_buf = vars->data.data = nfmalloc( vars->data.len ); char *out_buf = vars->data.data = nfmalloc( vars->data.len );
idx = 0; idx = 0;
if (vars->srec) { if (vars->srec) {
if (break2 && break2 < sbreak) {
copy_msg_bytes( &out_buf, in_buf, &idx, break2, in_cr, out_cr );
memcpy( out_buf, dummy_pfx, strlen(dummy_pfx) );
out_buf += strlen(dummy_pfx);
}
copy_msg_bytes( &out_buf, in_buf, &idx, sbreak, in_cr, out_cr ); copy_msg_bytes( &out_buf, in_buf, &idx, sbreak, in_cr, out_cr );
memcpy( out_buf, "X-TUID: ", 8 ); memcpy( out_buf, "X-TUID: ", 8 );
@ -463,9 +511,26 @@ copy_msg_convert( int in_cr, int out_cr, copy_vars_t *vars )
*out_buf++ = '\r'; *out_buf++ = '\r';
*out_buf++ = '\n'; *out_buf++ = '\n';
idx = ebreak; idx = ebreak;
if (break2 >= sbreak) {
copy_msg_bytes( &out_buf, in_buf, &idx, break2, in_cr, out_cr );
if (!add_subj) {
memcpy( out_buf, dummy_pfx, strlen(dummy_pfx) );
out_buf += strlen(dummy_pfx);
} else {
memcpy( out_buf, dummy_subj, strlen(dummy_subj) );
out_buf += strlen(dummy_subj);
if (app_cr)
*out_buf++ = '\r';
*out_buf++ = '\n';
}
}
} }
copy_msg_bytes( &out_buf, in_buf, &idx, in_len, in_cr, out_cr ); copy_msg_bytes( &out_buf, in_buf, &idx, in_len, in_cr, out_cr );
if (vars->minimal)
memcpy( out_buf, dummy_msg_buf, dummy_msg_len );
free( in_buf ); free( in_buf );
return 1; return 1;
} }
@ -648,6 +713,33 @@ clean_strdup( const char *s )
} }
static sync_rec_t *
upgrade_srec( sync_vars_t *svars, sync_rec_t *srec )
{
// Create an entry and append it to the current one.
sync_rec_t *nsrec = nfcalloc( sizeof(*nsrec) );
nsrec->next = srec->next;
srec->next = nsrec;
if (svars->srecadd == &srec->next)
svars->srecadd = &nsrec->next;
// Move the placeholder to the new entry.
int t = (srec->status & S_DUMMY(F)) ? F : N;
nsrec->uid[t] = srec->uid[t];
srec->uid[t] = 0;
if (srec->msg[t]) { // NULL during journal replay; is assigned later.
nsrec->msg[t] = srec->msg[t];
nsrec->msg[t]->srec = nsrec;
srec->msg[t] = NULL;
}
// Mark the original entry for upgrade.
srec->status = (srec->status & ~(S_DUMMY(F)|S_DUMMY(N))) | S_PENDING;
srec->wstate |= W_UPGRADE;
// Mark the placeholder for nuking.
nsrec->wstate = W_PURGE;
nsrec->aflags[t] = F_DELETED;
return nsrec;
}
static int static int
prepare_state( sync_vars_t *svars ) prepare_state( sync_vars_t *svars )
{ {
@ -741,7 +833,8 @@ save_state( sync_vars_t *svars )
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
continue; continue;
make_flags( srec->flags, fbuf ); make_flags( srec->flags, fbuf );
Fprintf( svars->nfp, "%u %u %s%s\n", srec->uid[F], srec->uid[N], Fprintf( svars->nfp, "%u %u %s%s%s\n", srec->uid[F], srec->uid[N],
(srec->status & S_DUMMY(F)) ? "<" : (srec->status & S_DUMMY(N)) ? ">" : "",
(srec->status & S_SKIPPED) ? "^" : (srec->status & S_EXPIRED) ? "~" : "", fbuf ); (srec->status & S_SKIPPED) ? "^" : (srec->status & S_EXPIRED) ? "~" : "", fbuf );
} }
@ -836,7 +929,14 @@ load_state( sync_vars_t *svars )
srec->uid[F] = t1; srec->uid[F] = t1;
srec->uid[N] = t2; srec->uid[N] = t2;
s = fbuf; s = fbuf;
if (*s == '^') { if (*s == '<') {
s++;
srec->status = S_DUMMY(F);
} else if (*s == '>') {
s++;
srec->status = S_DUMMY(N);
}
if (*s == '^') { // Pre-1.4 legacy
s++; s++;
srec->status = S_SKIPPED; srec->status = S_SKIPPED;
} else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) { } else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) {
@ -850,8 +950,9 @@ load_state( sync_vars_t *svars )
srec->status = S_SKIPPED; srec->status = S_SKIPPED;
} }
srec->flags = parse_flags( s ); srec->flags = parse_flags( s );
debug( " entry (%u,%u,%u,%s)\n", srec->uid[F], srec->uid[N], srec->flags, debug( " entry (%u,%u,%u,%s%s)\n", srec->uid[F], srec->uid[N], srec->flags,
(srec->status & S_SKIPPED) ? "SKIP" : (srec->status & S_EXPIRED) ? "XPIRE" : "" ); (srec->status & S_SKIPPED) ? "SKIP" : (srec->status & S_EXPIRED) ? "XPIRE" : "",
(srec->status & S_DUMMY(F)) ? ",F-DUMMY" : (srec->status & S_DUMMY(N)) ? ",N-DUMMY" : "" );
*svars->srecadd = srec; *svars->srecadd = srec;
svars->srecadd = &srec->next; svars->srecadd = &srec->next;
svars->nsrecs++; svars->nsrecs++;
@ -919,14 +1020,16 @@ load_state( sync_vars_t *svars )
} }
buf[ll] = 0; buf[ll] = 0;
int tn; int tn;
uint t1, t2, t3; uint t1, t2, t3, t4;
if ((c = buf[0]) == '#' ? if ((c = buf[0]) == '#' ?
(tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) : (tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) :
c == '!' ? c == '!' ?
(sscanf( buf + 2, "%u", &t1 ) != 1) : (sscanf( buf + 2, "%u", &t1 ) != 1) :
c == 'N' || c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '|' ? c == 'N' || c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '_' || c == '|' ?
(sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) : (sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) :
(sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3)) c != '^' ?
(sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3) :
(sscanf( buf + 2, "%u %u %u %u", &t1, &t2, &t3, &t4 ) != 4))
{ {
error( "Error: malformed journal entry at %s:%d\n", svars->jname, line ); error( "Error: malformed journal entry at %s:%d\n", svars->jname, line );
goto jbail; goto jbail;
@ -992,11 +1095,24 @@ load_state( sync_vars_t *svars )
case '*': case '*':
debug( "flags now %u\n", t3 ); debug( "flags now %u\n", t3 );
srec->flags = (uchar)t3; srec->flags = (uchar)t3;
srec->aflags[F] = srec->aflags[N] = 0;
srec->wstate &= ~W_PURGE;
break; break;
case '~': case '~':
debug( "status now %#x\n", t3 ); debug( "status now %#x\n", t3 );
srec->status = (uchar)t3; srec->status = (uchar)t3;
break; break;
case '_':
debug( "has placeholder now\n" );
srec->status = S_PENDING; // Pre-1.4 legacy only
srec->status |= !srec->uid[F] ? S_DUMMY(F) : S_DUMMY(N);
break;
case '^':
debug( "is being upgraded, flags %u, srec flags %u\n", t3, t4 );
srec->pflags = (uchar)t3;
srec->flags = (uchar)t4;
srec = upgrade_srec( svars, srec );
break;
default: default:
error( "Error: unrecognized journal entry at %s:%d\n", svars->jname, line ); error( "Error: unrecognized journal entry at %s:%d\n", svars->jname, line );
goto jbail; goto jbail;
@ -1275,18 +1391,17 @@ box_opened2( sync_vars_t *svars, int t )
} }
if (chan->ops[t] & (OP_NEW|OP_RENEW)) { if (chan->ops[t] & (OP_NEW|OP_RENEW)) {
opts[t] |= OPEN_APPEND; opts[t] |= OPEN_APPEND;
if (chan->ops[t] & OP_RENEW) if (chan->ops[t] & OP_NEW) {
opts[1-t] |= OPEN_OLD;
if (chan->ops[t] & OP_NEW)
opts[1-t] |= OPEN_NEW; opts[1-t] |= OPEN_NEW;
if (chan->ops[t] & OP_EXPUNGE) // Don't propagate doomed msgs if (chan->stores[t]->max_size != UINT_MAX)
opts[1-t] |= OPEN_FLAGS;
if (chan->stores[t]->max_size != UINT_MAX) {
if (chan->ops[t] & OP_RENEW)
opts[1-t] |= OPEN_FLAGS|OPEN_OLD_SIZE;
if (chan->ops[t] & OP_NEW)
opts[1-t] |= OPEN_FLAGS|OPEN_NEW_SIZE; opts[1-t] |= OPEN_FLAGS|OPEN_NEW_SIZE;
} }
if (chan->ops[t] & OP_RENEW) {
opts[t] |= OPEN_OLD|OPEN_FLAGS|OPEN_SETFLAGS;
opts[1-t] |= OPEN_OLD|OPEN_FLAGS;
}
if (chan->ops[t] & OP_EXPUNGE) // Don't propagate doomed msgs
opts[1-t] |= OPEN_FLAGS;
} }
if (chan->ops[t] & OP_EXPUNGE) { if (chan->ops[t] & OP_EXPUNGE) {
opts[t] |= OPEN_EXPUNGE; opts[t] |= OPEN_EXPUNGE;
@ -1312,6 +1427,15 @@ box_opened2( sync_vars_t *svars, int t )
else else
warn( "Warning: sync record (%u,%u) has stray TUID. Ignoring.\n", srec->uid[F], srec->uid[N] ); warn( "Warning: sync record (%u,%u) has stray TUID. Ignoring.\n", srec->uid[F], srec->uid[N] );
} }
if (srec->wstate & W_PURGE) {
t = srec->uid[F] ? F : N;
opts[t] |= OPEN_SETFLAGS;
}
if (srec->wstate & W_UPGRADE) {
t = !srec->uid[F] ? F : N;
opts[t] |= OPEN_APPEND;
opts[1-t] |= OPEN_OLD;
}
} }
svars->opts[F] = svars->drv[F]->prepare_load_box( ctx[F], opts[F] ); svars->opts[F] = svars->drv[F]->prepare_load_box( ctx[F], opts[F] );
svars->opts[N] = svars->drv[N]->prepare_load_box( ctx[N], opts[N] ); svars->opts[N] = svars->drv[N]->prepare_load_box( ctx[N], opts[N] );
@ -1583,6 +1707,12 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
debug( " near side expiring\n" ); debug( " near side expiring\n" );
sflags &= ~F_DELETED; sflags &= ~F_DELETED;
} }
if (srec->status & S_DUMMY(1-t)) {
// For placeholders, don't propagate:
// - Seen, because the real contents were obviously not seen yet
// - Flagged, because it's just a request to upgrade
sflags &= ~(F_SEEN|F_FLAGGED);
}
srec->aflags[t] = sflags & ~srec->flags; srec->aflags[t] = sflags & ~srec->flags;
srec->dflags[t] = ~sflags & srec->flags; srec->dflags[t] = ~sflags & srec->flags;
if ((DFlags & DEBUG_SYNC) && (srec->aflags[t] || srec->dflags[t])) { if ((DFlags & DEBUG_SYNC) && (srec->aflags[t] || srec->dflags[t])) {
@ -1594,6 +1724,41 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
} }
} }
} }
sync_rec_t *nsrec = srec;
if (((srec->status & S_DUMMY(F)) && (svars->chan->ops[F] & OP_RENEW)) ||
((srec->status & S_DUMMY(N)) && (svars->chan->ops[N] & OP_RENEW))) {
// Flagging the message on either side causes an upgrade of the dummy.
// We ignore flag resets, because that corner case is not worth it.
ushort muflags = srec->msg[F] ? srec->msg[F]->flags : 0;
ushort suflags = srec->msg[N] ? srec->msg[N]->flags : 0;
if ((muflags | suflags) & F_FLAGGED) {
t = (srec->status & S_DUMMY(F)) ? F : N;
// We calculate the flags for the replicated message already now,
// because after an interruption the dummy may be already gone.
srec->pflags = ((srec->msg[t]->flags & ~(F_SEEN|F_FLAGGED)) | srec->aflags[t]) & ~srec->dflags[t];
// Consequently, the srec's flags are committed right away as well.
srec->flags = (srec->flags | srec->aflags[t]) & ~srec->dflags[t];
JLOG( "^ %u %u %u %u", (srec->uid[F], srec->uid[N], srec->pflags, srec->flags), "upgrading placeholder" );
nsrec = upgrade_srec( svars, srec );
}
}
// This is separated, because the upgrade can come from the journal.
if (srec->wstate & W_UPGRADE) {
t = !srec->uid[F] ? F : N;
tmsg = srec->msg[1-t];
if ((svars->chan->ops[t] & OP_EXPUNGE) && (srec->pflags & F_DELETED)) {
JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing upgrade - would be expunged anyway" );
tmsg->srec = NULL;
srec->status = S_DEAD;
} else {
// Pretend that the source message has the adjusted flags of the dummy.
tmsg->flags = srec->pflags;
tmsg->status = M_FLAGS;
any_new[t] = 1;
}
}
srec = nsrec;
} }
} }
@ -1603,12 +1768,20 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
srec = tmsg->srec; srec = tmsg->srec;
if (srec) { if (srec) {
if (srec->status & S_SKIPPED) { if (srec->status & S_SKIPPED) {
// The message was skipped due to being too big. // Pre-1.4 legacy only: The message was skipped due to being too big.
// We must have already seen the UID, but we might have been interrupted. // We must have already seen the UID, but we might have been interrupted.
if (svars->maxuid[1-t] < tmsg->uid) if (svars->maxuid[1-t] < tmsg->uid)
svars->maxuid[1-t] = tmsg->uid; svars->maxuid[1-t] = tmsg->uid;
if (!(svars->chan->ops[t] & OP_RENEW)) if (!(svars->chan->ops[t] & OP_RENEW))
continue; continue;
srec->status = S_PENDING;
// The message size was not queried, so this won't be dummified below.
if (!(tmsg->flags & F_FLAGGED)) {
srec->status |= S_DUMMY(t);
JLOG( "_ %u %u", (srec->uid[F], srec->uid[N]), "placeholder only - was previously skipped" );
} else {
JLOG( "~ %u %u %u", (srec->uid[F], srec->uid[N], srec->status), "was previously skipped" );
}
} else { } else {
if (!(svars->chan->ops[t] & OP_NEW)) if (!(svars->chan->ops[t] & OP_NEW))
continue; continue;
@ -1623,8 +1796,8 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
if (!(srec->status & S_PENDING)) if (!(srec->status & S_PENDING))
continue; // Nothing to do - the message is paired or expired continue; // Nothing to do - the message is paired or expired
// Propagation was scheduled, but we got interrupted // Propagation was scheduled, but we got interrupted
debug( "unpropagated old message %u\n", tmsg->uid );
} }
debug( "unpropagated old message %u\n", tmsg->uid );
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) { if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) {
JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing - would be expunged anyway" ); JLOG( "- %u %u", (srec->uid[F], srec->uid[N]), "killing - would be expunged anyway" );
@ -1660,20 +1833,12 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
tmsg->srec = srec; tmsg->srec = srec;
JLOG( "+ %u %u", (srec->uid[F], srec->uid[N]), "fresh" ); JLOG( "+ %u %u", (srec->uid[F], srec->uid[N]), "fresh" );
} }
if ((tmsg->flags & F_FLAGGED) || tmsg->size <= svars->chan->stores[t]->max_size) { if (!(tmsg->flags & F_FLAGGED) && tmsg->size > svars->chan->stores[t]->max_size &&
if (srec->status != S_PENDING) { !(srec->wstate & W_UPGRADE) && !(srec->status & (S_DUMMY(F)|S_DUMMY(N)))) {
srec->status = S_PENDING; srec->status |= S_DUMMY(t);
JLOG( "~ %u %u %u", (srec->uid[F], srec->uid[N], srec->status), "was too big" ); JLOG( "_ %u %u", (srec->uid[F], srec->uid[N]), "placeholder only - too big" );
}
any_new[t] = 1;
} else {
if (srec->status == S_SKIPPED) {
debug( "-> still too big\n" );
} else {
srec->status = S_SKIPPED;
JLOG( "~ %u %u %u", (srec->uid[F], srec->uid[N], srec->status), "skipping - too big" );
}
} }
any_new[t] = 1;
} }
} }
@ -1790,14 +1955,22 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
debug( "synchronizing flags\n" ); debug( "synchronizing flags\n" );
for (srec = svars->srecs; srec; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if ((srec->status & S_DEAD) || !srec->uid[F] || !srec->uid[N]) if (srec->status & S_DEAD)
continue; continue;
for (t = 0; t < 2; t++) { for (t = 0; t < 2; t++) {
if (!srec->uid[t])
continue;
aflags = srec->aflags[t]; aflags = srec->aflags[t];
dflags = srec->dflags[t]; dflags = srec->dflags[t];
if (srec->wstate & W_DELETE) { if (srec->wstate & (W_DELETE|W_PURGE)) {
if (!aflags) { if (!aflags) {
/* This deletion propagation goes the other way round. */ // This deletion propagation goes the other way round, or
// this deletion of a dummy happens on the other side.
continue;
}
if (!srec->msg[t] && (svars->opts[t] & OPEN_OLD)) {
// The message disappeared. This can happen, because the wstate may
// come from the journal, and things could have happened meanwhile.
continue; continue;
} }
} else { } else {
@ -1874,7 +2047,7 @@ msg_copied( int sts, uint uid, copy_vars_t *vars )
sync_rec_t *srec = vars->srec; sync_rec_t *srec = vars->srec;
switch (sts) { switch (sts) {
case SYNC_OK: case SYNC_OK:
if (vars->msg->flags != srec->flags) { if (!(srec->wstate & W_UPGRADE) && vars->msg->flags != srec->flags) {
srec->flags = vars->msg->flags; srec->flags = vars->msg->flags;
JLOG( "* %u %u %u", (srec->uid[F], srec->uid[N], srec->flags), "%sed with flags", str_hl[t] ); JLOG( "* %u %u %u", (srec->uid[F], srec->uid[N], srec->flags), "%sed with flags", str_hl[t] );
} }
@ -1937,6 +2110,7 @@ msgs_copied( sync_vars_t *svars, int t )
cv->aux = AUX; cv->aux = AUX;
cv->srec = srec; cv->srec = srec;
cv->msg = tmsg; cv->msg = tmsg;
cv->minimal = (srec->status & S_DUMMY(t));
copy_msg( cv ); copy_msg( cv );
svars->state[t] &= ~ST_SENDING_NEW; svars->state[t] &= ~ST_SENDING_NEW;
if (check_cancel( svars )) if (check_cancel( svars ))
@ -2085,6 +2259,7 @@ msgs_flags_set( sync_vars_t *svars, int t )
cv->aux = INV_AUX; cv->aux = INV_AUX;
cv->srec = NULL; cv->srec = NULL;
cv->msg = tmsg; cv->msg = tmsg;
cv->minimal = 0;
copy_msg( cv ); copy_msg( cv );
if (check_cancel( svars )) if (check_cancel( svars ))
goto out; goto out;