wrap message trashing into simple transactions

trashing many messages at once inevitably overtaxes m$ exchange, and the
connection breaks. without any progress tracking, it would restart from
scratch each time, which would lead to a) it never finishing and b) many
copies of the messages in the trash.

full transactions as we do for "proper" syncing would be over the top,
as it's not *that* bad if some messages get duplicated in the trash. so
we record only the messages for which trashing completed, thus allowing
some overlap between the attempts.
This commit is contained in:
Oswald Buddenhagen 2016-11-04 21:23:39 +01:00
parent 5b0c8cfa60
commit 2bba9b903c
2 changed files with 29 additions and 9 deletions

3
TODO
View File

@ -1,8 +1,5 @@
f{,data}sync() usage could be optimized by batching the calls. f{,data}sync() usage could be optimized by batching the calls.
add some marker about message being already [remotely] trashed.
real transactions would be certainly not particularly useful ...
make SSL (connect) timeouts produce a bit more than "Unidentified socket error". make SSL (connect) timeouts produce a bit more than "Unidentified socket error".
uidvalidity lock timeout handling would be a good idea. uidvalidity lock timeout handling would be a good idea.

View File

@ -175,6 +175,7 @@ typedef struct {
driver_t *drv[2]; driver_t *drv[2];
const char *orig_name[2]; const char *orig_name[2];
message_t *new_msgs[2]; message_t *new_msgs[2];
int_array_alloc_t trashed_msgs[2];
int state[2], ref_count, nsrecs, ret, lfd, existing, replayed; int state[2], ref_count, nsrecs, ret, lfd, existing, replayed;
int new_pending[2], flags_pending[2], trash_pending[2]; int new_pending[2], flags_pending[2], trash_pending[2];
int maxuid[2]; /* highest UID that was already propagated */ int maxuid[2]; /* highest UID that was already propagated */
@ -799,7 +800,7 @@ load_state( sync_vars_t *svars )
} }
if ((c = buf[0]) == '#' ? if ((c = buf[0]) == '#' ?
(t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) : (t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) :
c == '(' || c == ')' || c == '{' || c == '}' || c == '!' ? c == '(' || c == ')' || c == '{' || c == '}' || c == '[' || c == ']' || c == '!' ?
(sscanf( buf + 2, "%d", &t1 ) != 1) : (sscanf( buf + 2, "%d", &t1 ) != 1) :
c == '+' || c == '&' || c == '-' || c == '|' || c == '/' || c == '\\' ? c == '+' || c == '&' || c == '-' || c == '|' || c == '/' || c == '\\' ?
(sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) : (sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) :
@ -816,6 +817,10 @@ load_state( sync_vars_t *svars )
svars->newuid[M] = t1; svars->newuid[M] = t1;
else if (c == '}') else if (c == '}')
svars->newuid[S] = t1; svars->newuid[S] = t1;
else if (c == '[')
*int_array_append( &svars->trashed_msgs[M] ) = t1;
else if (c == ']')
*int_array_append( &svars->trashed_msgs[S] ) = t1;
else if (c == '!') else if (c == '!')
svars->smaxxuid = t1; svars->smaxxuid = t1;
else if (c == '|') { else if (c == '|') {
@ -1920,6 +1925,11 @@ flags_set_p2( sync_vars_t *svars, sync_rec_t *srec, int t )
} }
} }
typedef struct {
void *aux;
message_t *msg;
} trash_vars_t;
static void msg_trashed( int sts, void *aux ); static void msg_trashed( int sts, void *aux );
static void msg_rtrashed( int sts, int uid, copy_vars_t *vars ); static void msg_rtrashed( int sts, int uid, copy_vars_t *vars );
@ -1927,6 +1937,7 @@ static void
msgs_flags_set( sync_vars_t *svars, int t ) msgs_flags_set( sync_vars_t *svars, int t )
{ {
message_t *tmsg; message_t *tmsg;
trash_vars_t *tv;
copy_vars_t *cv; copy_vars_t *cv;
if (!(svars->state[t] & ST_SENT_FLAGS) || svars->flags_pending[t]) if (!(svars->state[t] & ST_SENT_FLAGS) || svars->flags_pending[t])
@ -1938,14 +1949,18 @@ msgs_flags_set( sync_vars_t *svars, int t )
(svars->ctx[t]->conf->trash || (svars->ctx[1-t]->conf->trash && svars->ctx[1-t]->conf->trash_remote_new))) { (svars->ctx[t]->conf->trash || (svars->ctx[1-t]->conf->trash && svars->ctx[1-t]->conf->trash_remote_new))) {
debug( "trashing in %s\n", str_ms[t] ); debug( "trashing in %s\n", str_ms[t] );
for (tmsg = svars->ctx[t]->msgs; tmsg; tmsg = tmsg->next) for (tmsg = svars->ctx[t]->msgs; tmsg; tmsg = tmsg->next)
if ((tmsg->flags & F_DELETED) && (t == M || !tmsg->srec || !(tmsg->srec->status & (S_EXPIRE|S_EXPIRED)))) { if ((tmsg->flags & F_DELETED) && !find_int_array( svars->trashed_msgs[t].array, tmsg->uid ) &&
(t == M || !tmsg->srec || !(tmsg->srec->status & (S_EXPIRE|S_EXPIRED)))) {
if (svars->ctx[t]->conf->trash) { if (svars->ctx[t]->conf->trash) {
if (!svars->ctx[t]->conf->trash_only_new || !tmsg->srec || tmsg->srec->uid[1-t] < 0) { if (!svars->ctx[t]->conf->trash_only_new || !tmsg->srec || tmsg->srec->uid[1-t] < 0) {
debug( "%s: trashing message %d\n", str_ms[t], tmsg->uid ); debug( "%s: trashing message %d\n", str_ms[t], tmsg->uid );
trash_total[t]++; trash_total[t]++;
stats(); stats();
svars->trash_pending[t]++; svars->trash_pending[t]++;
svars->drv[t]->trash_msg( svars->ctx[t], tmsg, msg_trashed, AUX ); tv = nfmalloc( sizeof(*tv) );
tv->aux = AUX;
tv->msg = tmsg;
svars->drv[t]->trash_msg( svars->ctx[t], tmsg, msg_trashed, tv );
if (check_cancel( svars )) if (check_cancel( svars ))
goto out; goto out;
} else } else
@ -1982,13 +1997,17 @@ msgs_flags_set( sync_vars_t *svars, int t )
static void static void
msg_trashed( int sts, void *aux ) msg_trashed( int sts, void *aux )
{ {
trash_vars_t *vars = (trash_vars_t *)aux;
DECL_SVARS; DECL_SVARS;
if (sts == DRV_MSG_BAD) if (sts == DRV_MSG_BAD)
sts = DRV_BOX_BAD; sts = DRV_BOX_BAD;
if (check_ret( sts, aux )) if (check_ret( sts, vars->aux ))
return; return;
INIT_SVARS(aux); INIT_SVARS(vars->aux);
debug( " -> trashed %s %d\n", str_ms[t], vars->msg->uid );
Fprintf( svars->jfp, "%c %d\n", "[]"[t], vars->msg->uid );
free( vars );
trash_done[t]++; trash_done[t]++;
stats(); stats();
svars->trash_pending[t]--; svars->trash_pending[t]--;
@ -2008,8 +2027,10 @@ msg_rtrashed( int sts, int uid ATTR_UNUSED, copy_vars_t *vars )
free( vars ); free( vars );
return; return;
} }
free( vars );
t ^= 1; t ^= 1;
debug( " -> remote trashed %s %d\n", str_ms[t], vars->msg->uid );
Fprintf( svars->jfp, "%c %d\n", "[]"[t], vars->msg->uid );
free( vars );
trash_done[t]++; trash_done[t]++;
stats(); stats();
svars->trash_pending[t]--; svars->trash_pending[t]--;
@ -2105,6 +2126,8 @@ sync_bail( sync_vars_t *svars )
{ {
sync_rec_t *srec, *nsrec; sync_rec_t *srec, *nsrec;
free( svars->trashed_msgs[M].array.data );
free( svars->trashed_msgs[S].array.data );
for (srec = svars->srecs; srec; srec = nsrec) { for (srec = svars->srecs; srec; srec = nsrec) {
nsrec = srec->next; nsrec = srec->next;
free( srec ); free( srec );