wrap message trashing into simple transactions
trashing many messages at once inevitably overtaxes m$ exchange, and the connection breaks. without any progress tracking, it would restart from scratch each time, which would lead to a) it never finishing and b) many copies of the messages in the trash. full transactions as we do for "proper" syncing would be over the top, as it's not *that* bad if some messages get duplicated in the trash. so we record only the messages for which trashing completed, thus allowing some overlap between the attempts.
This commit is contained in:
parent
5b0c8cfa60
commit
2bba9b903c
3
TODO
3
TODO
|
@ -1,8 +1,5 @@
|
||||||
f{,data}sync() usage could be optimized by batching the calls.
|
f{,data}sync() usage could be optimized by batching the calls.
|
||||||
|
|
||||||
add some marker about message being already [remotely] trashed.
|
|
||||||
real transactions would be certainly not particularly useful ...
|
|
||||||
|
|
||||||
make SSL (connect) timeouts produce a bit more than "Unidentified socket error".
|
make SSL (connect) timeouts produce a bit more than "Unidentified socket error".
|
||||||
|
|
||||||
uidvalidity lock timeout handling would be a good idea.
|
uidvalidity lock timeout handling would be a good idea.
|
||||||
|
|
35
src/sync.c
35
src/sync.c
|
@ -175,6 +175,7 @@ typedef struct {
|
||||||
driver_t *drv[2];
|
driver_t *drv[2];
|
||||||
const char *orig_name[2];
|
const char *orig_name[2];
|
||||||
message_t *new_msgs[2];
|
message_t *new_msgs[2];
|
||||||
|
int_array_alloc_t trashed_msgs[2];
|
||||||
int state[2], ref_count, nsrecs, ret, lfd, existing, replayed;
|
int state[2], ref_count, nsrecs, ret, lfd, existing, replayed;
|
||||||
int new_pending[2], flags_pending[2], trash_pending[2];
|
int new_pending[2], flags_pending[2], trash_pending[2];
|
||||||
int maxuid[2]; /* highest UID that was already propagated */
|
int maxuid[2]; /* highest UID that was already propagated */
|
||||||
|
@ -799,7 +800,7 @@ load_state( sync_vars_t *svars )
|
||||||
}
|
}
|
||||||
if ((c = buf[0]) == '#' ?
|
if ((c = buf[0]) == '#' ?
|
||||||
(t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) :
|
(t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) :
|
||||||
c == '(' || c == ')' || c == '{' || c == '}' || c == '!' ?
|
c == '(' || c == ')' || c == '{' || c == '}' || c == '[' || c == ']' || c == '!' ?
|
||||||
(sscanf( buf + 2, "%d", &t1 ) != 1) :
|
(sscanf( buf + 2, "%d", &t1 ) != 1) :
|
||||||
c == '+' || c == '&' || c == '-' || c == '|' || c == '/' || c == '\\' ?
|
c == '+' || c == '&' || c == '-' || c == '|' || c == '/' || c == '\\' ?
|
||||||
(sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) :
|
(sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) :
|
||||||
|
@ -816,6 +817,10 @@ load_state( sync_vars_t *svars )
|
||||||
svars->newuid[M] = t1;
|
svars->newuid[M] = t1;
|
||||||
else if (c == '}')
|
else if (c == '}')
|
||||||
svars->newuid[S] = t1;
|
svars->newuid[S] = t1;
|
||||||
|
else if (c == '[')
|
||||||
|
*int_array_append( &svars->trashed_msgs[M] ) = t1;
|
||||||
|
else if (c == ']')
|
||||||
|
*int_array_append( &svars->trashed_msgs[S] ) = t1;
|
||||||
else if (c == '!')
|
else if (c == '!')
|
||||||
svars->smaxxuid = t1;
|
svars->smaxxuid = t1;
|
||||||
else if (c == '|') {
|
else if (c == '|') {
|
||||||
|
@ -1920,6 +1925,11 @@ flags_set_p2( sync_vars_t *svars, sync_rec_t *srec, int t )
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
void *aux;
|
||||||
|
message_t *msg;
|
||||||
|
} trash_vars_t;
|
||||||
|
|
||||||
static void msg_trashed( int sts, void *aux );
|
static void msg_trashed( int sts, void *aux );
|
||||||
static void msg_rtrashed( int sts, int uid, copy_vars_t *vars );
|
static void msg_rtrashed( int sts, int uid, copy_vars_t *vars );
|
||||||
|
|
||||||
|
@ -1927,6 +1937,7 @@ static void
|
||||||
msgs_flags_set( sync_vars_t *svars, int t )
|
msgs_flags_set( sync_vars_t *svars, int t )
|
||||||
{
|
{
|
||||||
message_t *tmsg;
|
message_t *tmsg;
|
||||||
|
trash_vars_t *tv;
|
||||||
copy_vars_t *cv;
|
copy_vars_t *cv;
|
||||||
|
|
||||||
if (!(svars->state[t] & ST_SENT_FLAGS) || svars->flags_pending[t])
|
if (!(svars->state[t] & ST_SENT_FLAGS) || svars->flags_pending[t])
|
||||||
|
@ -1938,14 +1949,18 @@ msgs_flags_set( sync_vars_t *svars, int t )
|
||||||
(svars->ctx[t]->conf->trash || (svars->ctx[1-t]->conf->trash && svars->ctx[1-t]->conf->trash_remote_new))) {
|
(svars->ctx[t]->conf->trash || (svars->ctx[1-t]->conf->trash && svars->ctx[1-t]->conf->trash_remote_new))) {
|
||||||
debug( "trashing in %s\n", str_ms[t] );
|
debug( "trashing in %s\n", str_ms[t] );
|
||||||
for (tmsg = svars->ctx[t]->msgs; tmsg; tmsg = tmsg->next)
|
for (tmsg = svars->ctx[t]->msgs; tmsg; tmsg = tmsg->next)
|
||||||
if ((tmsg->flags & F_DELETED) && (t == M || !tmsg->srec || !(tmsg->srec->status & (S_EXPIRE|S_EXPIRED)))) {
|
if ((tmsg->flags & F_DELETED) && !find_int_array( svars->trashed_msgs[t].array, tmsg->uid ) &&
|
||||||
|
(t == M || !tmsg->srec || !(tmsg->srec->status & (S_EXPIRE|S_EXPIRED)))) {
|
||||||
if (svars->ctx[t]->conf->trash) {
|
if (svars->ctx[t]->conf->trash) {
|
||||||
if (!svars->ctx[t]->conf->trash_only_new || !tmsg->srec || tmsg->srec->uid[1-t] < 0) {
|
if (!svars->ctx[t]->conf->trash_only_new || !tmsg->srec || tmsg->srec->uid[1-t] < 0) {
|
||||||
debug( "%s: trashing message %d\n", str_ms[t], tmsg->uid );
|
debug( "%s: trashing message %d\n", str_ms[t], tmsg->uid );
|
||||||
trash_total[t]++;
|
trash_total[t]++;
|
||||||
stats();
|
stats();
|
||||||
svars->trash_pending[t]++;
|
svars->trash_pending[t]++;
|
||||||
svars->drv[t]->trash_msg( svars->ctx[t], tmsg, msg_trashed, AUX );
|
tv = nfmalloc( sizeof(*tv) );
|
||||||
|
tv->aux = AUX;
|
||||||
|
tv->msg = tmsg;
|
||||||
|
svars->drv[t]->trash_msg( svars->ctx[t], tmsg, msg_trashed, tv );
|
||||||
if (check_cancel( svars ))
|
if (check_cancel( svars ))
|
||||||
goto out;
|
goto out;
|
||||||
} else
|
} else
|
||||||
|
@ -1982,13 +1997,17 @@ msgs_flags_set( sync_vars_t *svars, int t )
|
||||||
static void
|
static void
|
||||||
msg_trashed( int sts, void *aux )
|
msg_trashed( int sts, void *aux )
|
||||||
{
|
{
|
||||||
|
trash_vars_t *vars = (trash_vars_t *)aux;
|
||||||
DECL_SVARS;
|
DECL_SVARS;
|
||||||
|
|
||||||
if (sts == DRV_MSG_BAD)
|
if (sts == DRV_MSG_BAD)
|
||||||
sts = DRV_BOX_BAD;
|
sts = DRV_BOX_BAD;
|
||||||
if (check_ret( sts, aux ))
|
if (check_ret( sts, vars->aux ))
|
||||||
return;
|
return;
|
||||||
INIT_SVARS(aux);
|
INIT_SVARS(vars->aux);
|
||||||
|
debug( " -> trashed %s %d\n", str_ms[t], vars->msg->uid );
|
||||||
|
Fprintf( svars->jfp, "%c %d\n", "[]"[t], vars->msg->uid );
|
||||||
|
free( vars );
|
||||||
trash_done[t]++;
|
trash_done[t]++;
|
||||||
stats();
|
stats();
|
||||||
svars->trash_pending[t]--;
|
svars->trash_pending[t]--;
|
||||||
|
@ -2008,8 +2027,10 @@ msg_rtrashed( int sts, int uid ATTR_UNUSED, copy_vars_t *vars )
|
||||||
free( vars );
|
free( vars );
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
free( vars );
|
|
||||||
t ^= 1;
|
t ^= 1;
|
||||||
|
debug( " -> remote trashed %s %d\n", str_ms[t], vars->msg->uid );
|
||||||
|
Fprintf( svars->jfp, "%c %d\n", "[]"[t], vars->msg->uid );
|
||||||
|
free( vars );
|
||||||
trash_done[t]++;
|
trash_done[t]++;
|
||||||
stats();
|
stats();
|
||||||
svars->trash_pending[t]--;
|
svars->trash_pending[t]--;
|
||||||
|
@ -2105,6 +2126,8 @@ sync_bail( sync_vars_t *svars )
|
||||||
{
|
{
|
||||||
sync_rec_t *srec, *nsrec;
|
sync_rec_t *srec, *nsrec;
|
||||||
|
|
||||||
|
free( svars->trashed_msgs[M].array.data );
|
||||||
|
free( svars->trashed_msgs[S].array.data );
|
||||||
for (srec = svars->srecs; srec; srec = nsrec) {
|
for (srec = svars->srecs; srec; srec = nsrec) {
|
||||||
nsrec = srec->next;
|
nsrec = srec->next;
|
||||||
free( srec );
|
free( srec );
|
||||||
|
|
Loading…
Reference in New Issue
Block a user