From 2bba9b903c7a4f26bd30cf9aac19b9bfd458916c Mon Sep 17 00:00:00 2001 From: Oswald Buddenhagen Date: Fri, 4 Nov 2016 21:23:39 +0100 Subject: [PATCH] wrap message trashing into simple transactions trashing many messages at once inevitably overtaxes m$ exchange, and the connection breaks. without any progress tracking, it would restart from scratch each time, which would lead to a) it never finishing and b) many copies of the messages in the trash. full transactions as we do for "proper" syncing would be over the top, as it's not *that* bad if some messages get duplicated in the trash. so we record only the messages for which trashing completed, thus allowing some overlap between the attempts. --- TODO | 3 --- src/sync.c | 35 +++++++++++++++++++++++++++++------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/TODO b/TODO index bfa068b..f991c43 100644 --- a/TODO +++ b/TODO @@ -1,8 +1,5 @@ f{,data}sync() usage could be optimized by batching the calls. -add some marker about message being already [remotely] trashed. -real transactions would be certainly not particularly useful ... - make SSL (connect) timeouts produce a bit more than "Unidentified socket error". uidvalidity lock timeout handling would be a good idea. diff --git a/src/sync.c b/src/sync.c index e5c532a..06b6b85 100644 --- a/src/sync.c +++ b/src/sync.c @@ -175,6 +175,7 @@ typedef struct { driver_t *drv[2]; const char *orig_name[2]; message_t *new_msgs[2]; + int_array_alloc_t trashed_msgs[2]; int state[2], ref_count, nsrecs, ret, lfd, existing, replayed; int new_pending[2], flags_pending[2], trash_pending[2]; int maxuid[2]; /* highest UID that was already propagated */ @@ -799,7 +800,7 @@ load_state( sync_vars_t *svars ) } if ((c = buf[0]) == '#' ? (t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) : - c == '(' || c == ')' || c == '{' || c == '}' || c == '!' ? + c == '(' || c == ')' || c == '{' || c == '}' || c == '[' || c == ']' || c == '!' ? (sscanf( buf + 2, "%d", &t1 ) != 1) : c == '+' || c == '&' || c == '-' || c == '|' || c == '/' || c == '\\' ? (sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) : @@ -816,6 +817,10 @@ load_state( sync_vars_t *svars ) svars->newuid[M] = t1; else if (c == '}') svars->newuid[S] = t1; + else if (c == '[') + *int_array_append( &svars->trashed_msgs[M] ) = t1; + else if (c == ']') + *int_array_append( &svars->trashed_msgs[S] ) = t1; else if (c == '!') svars->smaxxuid = t1; else if (c == '|') { @@ -1920,6 +1925,11 @@ flags_set_p2( sync_vars_t *svars, sync_rec_t *srec, int t ) } } +typedef struct { + void *aux; + message_t *msg; +} trash_vars_t; + static void msg_trashed( int sts, void *aux ); static void msg_rtrashed( int sts, int uid, copy_vars_t *vars ); @@ -1927,6 +1937,7 @@ static void msgs_flags_set( sync_vars_t *svars, int t ) { message_t *tmsg; + trash_vars_t *tv; copy_vars_t *cv; if (!(svars->state[t] & ST_SENT_FLAGS) || svars->flags_pending[t]) @@ -1938,14 +1949,18 @@ msgs_flags_set( sync_vars_t *svars, int t ) (svars->ctx[t]->conf->trash || (svars->ctx[1-t]->conf->trash && svars->ctx[1-t]->conf->trash_remote_new))) { debug( "trashing in %s\n", str_ms[t] ); for (tmsg = svars->ctx[t]->msgs; tmsg; tmsg = tmsg->next) - if ((tmsg->flags & F_DELETED) && (t == M || !tmsg->srec || !(tmsg->srec->status & (S_EXPIRE|S_EXPIRED)))) { + if ((tmsg->flags & F_DELETED) && !find_int_array( svars->trashed_msgs[t].array, tmsg->uid ) && + (t == M || !tmsg->srec || !(tmsg->srec->status & (S_EXPIRE|S_EXPIRED)))) { if (svars->ctx[t]->conf->trash) { if (!svars->ctx[t]->conf->trash_only_new || !tmsg->srec || tmsg->srec->uid[1-t] < 0) { debug( "%s: trashing message %d\n", str_ms[t], tmsg->uid ); trash_total[t]++; stats(); svars->trash_pending[t]++; - svars->drv[t]->trash_msg( svars->ctx[t], tmsg, msg_trashed, AUX ); + tv = nfmalloc( sizeof(*tv) ); + tv->aux = AUX; + tv->msg = tmsg; + svars->drv[t]->trash_msg( svars->ctx[t], tmsg, msg_trashed, tv ); if (check_cancel( svars )) goto out; } else @@ -1982,13 +1997,17 @@ msgs_flags_set( sync_vars_t *svars, int t ) static void msg_trashed( int sts, void *aux ) { + trash_vars_t *vars = (trash_vars_t *)aux; DECL_SVARS; if (sts == DRV_MSG_BAD) sts = DRV_BOX_BAD; - if (check_ret( sts, aux )) + if (check_ret( sts, vars->aux )) return; - INIT_SVARS(aux); + INIT_SVARS(vars->aux); + debug( " -> trashed %s %d\n", str_ms[t], vars->msg->uid ); + Fprintf( svars->jfp, "%c %d\n", "[]"[t], vars->msg->uid ); + free( vars ); trash_done[t]++; stats(); svars->trash_pending[t]--; @@ -2008,8 +2027,10 @@ msg_rtrashed( int sts, int uid ATTR_UNUSED, copy_vars_t *vars ) free( vars ); return; } - free( vars ); t ^= 1; + debug( " -> remote trashed %s %d\n", str_ms[t], vars->msg->uid ); + Fprintf( svars->jfp, "%c %d\n", "[]"[t], vars->msg->uid ); + free( vars ); trash_done[t]++; stats(); svars->trash_pending[t]--; @@ -2105,6 +2126,8 @@ sync_bail( sync_vars_t *svars ) { sync_rec_t *srec, *nsrec; + free( svars->trashed_msgs[M].array.data ); + free( svars->trashed_msgs[S].array.data ); for (srec = svars->srecs; srec; srec = nsrec) { nsrec = srec->next; free( srec );