From a8b26dc4ac4b7031be3688b2f3631881ebfd907a Mon Sep 17 00:00:00 2001 From: Oswald Buddenhagen Date: Sun, 15 Feb 2015 18:13:05 +0100 Subject: [PATCH] soft-limit peak memory usage propagating many messages from a fast store (typically maildir or a local IMAP server) to a slow asynchronous store could cause gigabytes of data being buffered. avoid this by throttling fetches if the target context reports memory usage above a configurable limit. REFMAIL: 9737edb14457c71af4ed156c1be0ae59@mpcjanssen.nl --- src/common.h | 2 ++ src/config.c | 8 ++++++++ src/driver.h | 3 +++ src/drv_imap.c | 20 +++++++++++++++++- src/drv_maildir.c | 7 +++++++ src/main.c | 2 ++ src/mbsync.1 | 8 ++++++++ src/socket.c | 2 ++ src/socket.h | 1 + src/sync.c | 52 +++++++++++++++++++++++++++++++---------------- 10 files changed, 87 insertions(+), 18 deletions(-) diff --git a/src/common.h b/src/common.h index 06c2e3e..ed278b4 100644 --- a/src/common.h +++ b/src/common.h @@ -76,6 +76,8 @@ extern int Pid; extern char Hostname[256]; extern const char *Home; +extern int BufferLimit; + /* util.c */ void ATTR_PRINTFLIKE(1, 2) debug( const char *, ... ); diff --git a/src/config.c b/src/config.c index 1c181ce..20f09d9 100644 --- a/src/config.c +++ b/src/config.c @@ -482,6 +482,14 @@ load_config( const char *where, int pseudo ) } } } + else if (!strcasecmp( "BufferLimit", cfile.cmd )) + { + BufferLimit = parse_size( &cfile ); + if (BufferLimit <= 0) { + error( "%s:%d: BufferLimit must be positive\n", cfile.file, cfile.line ); + cfile.err = 1; + } + } else if (!getopt_helper( &cfile, &gcops, &global_conf )) { error( "%s:%d: unknown section keyword '%s'\n", diff --git a/src/driver.h b/src/driver.h index e79a096..3aa30a9 100644 --- a/src/driver.h +++ b/src/driver.h @@ -246,6 +246,9 @@ struct driver { /* Commit any pending set_msg_flags() commands. */ void (*commit_cmds)( store_t *ctx ); + + /* Get approximate amount of memory occupied by the driver. */ + int (*memory_usage)( store_t *ctx ); }; void free_generic_messages( message_t * ); diff --git a/src/drv_imap.c b/src/drv_imap.c index 7146d26..2620f49 100644 --- a/src/drv_imap.c +++ b/src/drv_imap.c @@ -111,6 +111,7 @@ typedef struct imap_store { int nexttag, num_in_progress; struct imap_cmd *pending, **pending_append; struct imap_cmd *in_progress, **in_progress_append; + int buffer_mem; /* memory currently occupied by buffers in the queue */ /* Used during sequential operations like connect */ enum { GreetingPending = 0, GreetingBad, GreetingOk, GreetingPreauth } greeting; @@ -256,7 +257,10 @@ static void done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, int response ) { cmd->param.done( ctx, cmd, response ); - free( cmd->param.data ); + if (cmd->param.data) { + free( cmd->param.data ); + ctx->buffer_mem -= cmd->param.data_len; + } free( cmd->cmd ); free( cmd ); } @@ -299,6 +303,7 @@ send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd ) iov[1].len = cmd->param.data_len; iov[1].takeOwn = GiveOwn; cmd->param.data = 0; + ctx->buffer_mem -= cmd->param.data_len; iov[2].buf = "\r\n"; iov[2].len = 2; iov[2].takeOwn = KeepOwn; @@ -1317,6 +1322,7 @@ imap_socket_read( void *aux ) iov[0].len = cmdp->param.data_len; iov[0].takeOwn = GiveOwn; cmdp->param.data = 0; + ctx->buffer_mem -= cmdp->param.data_len; iov[1].buf = "\r\n"; iov[1].len = 2; iov[1].takeOwn = KeepOwn; @@ -2507,6 +2513,7 @@ imap_store_msg( store_t *gctx, msg_data_t *data, int to_trash, flagstr[d] = 0; INIT_IMAP_CMD(imap_cmd_out_uid, cmd, cb, aux) + ctx->buffer_mem += data->len; cmd->gen.param.data_len = data->len; cmd->gen.param.data = data->data; cmd->out_uid = -2; @@ -2621,6 +2628,16 @@ imap_commit_cmds( store_t *gctx ) (void)gctx; } +/******************* imap_memory_usage *******************/ + +static int +imap_memory_usage( store_t *gctx ) +{ + imap_store_t *ctx = (imap_store_t *)gctx; + + return ctx->buffer_mem + ctx->conn.buffer_mem; +} + /******************* imap_parse_store *******************/ imap_server_conf_t *servers, **serverapp = &servers; @@ -2899,4 +2916,5 @@ struct driver imap_driver = { imap_close_box, imap_cancel_cmds, imap_commit_cmds, + imap_memory_usage, }; diff --git a/src/drv_maildir.c b/src/drv_maildir.c index 27073eb..29d4c2e 100644 --- a/src/drv_maildir.c +++ b/src/drv_maildir.c @@ -1605,6 +1605,12 @@ maildir_commit_cmds( store_t *gctx ) (void) gctx; } +static int +maildir_memory_usage( store_t *gctx ATTR_UNUSED ) +{ + return 0; +} + static int maildir_parse_store( conffile_t *cfg, store_conf_t **storep ) { @@ -1672,4 +1678,5 @@ struct driver maildir_driver = { maildir_close_box, maildir_cancel_cmds, maildir_commit_cmds, + maildir_memory_usage, }; diff --git a/src/main.c b/src/main.c index beb6059..f587e2d 100644 --- a/src/main.c +++ b/src/main.c @@ -43,6 +43,8 @@ int Pid; /* for maildir and imap */ char Hostname[256]; /* for maildir */ const char *Home; /* for config */ +int BufferLimit = 10 * 1024 * 1024; + static void version( void ) { diff --git a/src/mbsync.1 b/src/mbsync.1 index e5ad1fb..02e89ad 100644 --- a/src/mbsync.1 +++ b/src/mbsync.1 @@ -583,6 +583,14 @@ This option is meaningless for \fBSyncState\fR if the latter is \fB*\fR, obviously. However, it also determines the default of \fBInfoDelimiter\fR. (Global default: \fI;\fR on Windows, \fI:\fR everywhere else) .. +.TP +\fBBufferLimit\fR \fIsize\fR[\fBk\fR|\fBm\fR][\fBb\fR] +The per-Channel, per-direction instantaneous memory usage above which +\fBmbsync\fR will refrain from using more memory. Note that this is no +absolute limit, as even a single message can consume more memory than +this. +(Default: \fI10M\fR) +.. .SH RECOMMENDATIONS Make sure your IMAP server does not auto-expunge deleted messages - it is slow, and semantically somewhat questionable. Specifically, Gmail needs to diff --git a/src/socket.c b/src/socket.c index b7de54b..5cde674 100644 --- a/src/socket.c +++ b/src/socket.c @@ -737,6 +737,7 @@ dispose_chunk( conn_t *conn ) buff_chunk_t *bc = conn->write_buf; if (!(conn->write_buf = bc->next)) conn->write_buf_append = &conn->write_buf; + conn->buffer_mem -= bc->len; free( bc ); } @@ -770,6 +771,7 @@ static void do_append( conn_t *conn, buff_chunk_t *bc ) { bc->next = 0; + conn->buffer_mem += bc->len; *conn->write_buf_append = bc; conn->write_buf_append = &bc->next; } diff --git a/src/socket.h b/src/socket.h index a420e49..7ea6086 100644 --- a/src/socket.h +++ b/src/socket.h @@ -104,6 +104,7 @@ typedef struct { int append_avail; /* space left in accumulating buffer */ #endif int write_offset; /* offset into buffer head */ + int buffer_mem; /* memory currently occupied by buffers in the queue */ /* reading */ int offset; /* start of filled bytes in buffer */ diff --git a/src/sync.c b/src/sync.c index 736f9ca..24d520d 100644 --- a/src/sync.c +++ b/src/sync.c @@ -154,6 +154,7 @@ typedef struct { store_t *ctx[2]; driver_t *drv[2]; const char *orig_name[2]; + message_t *new_msgs[2]; int state[2], ref_count, nsrecs, ret, lfd, existing, replayed; int new_total[2], new_done[2]; int flags_total[2], flags_done[2]; @@ -207,6 +208,7 @@ static int check_cancel( sync_vars_t *svars ); #define ST_CLOSING (1<<12) #define ST_CONFIRMED (1<<13) #define ST_PRESENT (1<<14) +#define ST_SENDING_NEW (1<<15) static void @@ -1336,7 +1338,6 @@ box_loaded( int sts, void *aux ) sync_rec_t *srec; sync_rec_map_t *srecmap; message_t *tmsg; - copy_vars_t *cv; flag_vars_t *fv; int uid, no[2], del[2], alive, todel, t1, t2; int sflags, nflags, aflags, dflags, nex; @@ -1724,21 +1725,7 @@ box_loaded( int sts, void *aux ) for (t = 0; t < 2; t++) { svars->newuid[t] = svars->ctx[t]->uidnext; Fprintf( svars->jfp, "%c %d\n", "{}"[t], svars->newuid[t] ); - for (tmsg = svars->ctx[1-t]->msgs; tmsg; tmsg = tmsg->next) { - if ((srec = tmsg->srec) && srec->tuid[0]) { - svars->new_total[t]++; - stats( svars ); - cv = nfmalloc( sizeof(*cv) ); - cv->cb = msg_copied; - cv->aux = AUX; - cv->srec = srec; - cv->msg = tmsg; - copy_msg( cv ); - if (check_cancel( svars )) - goto out; - } - } - svars->state[t] |= ST_SENT_NEW; + svars->new_msgs[t] = svars->ctx[1-t]->msgs; msgs_copied( svars, t ); if (check_cancel( svars )) goto out; @@ -1809,11 +1796,42 @@ static void sync_close( sync_vars_t *svars, int t ); static void msgs_copied( sync_vars_t *svars, int t ) { - if (!(svars->state[t] & ST_SENT_NEW) || svars->new_done[t] < svars->new_total[t]) + message_t *tmsg; + sync_rec_t *srec; + copy_vars_t *cv; + + if (svars->state[t] & ST_SENDING_NEW) return; sync_ref( svars ); + if (!(svars->state[t] & ST_SENT_NEW)) { + for (tmsg = svars->new_msgs[t]; tmsg; tmsg = tmsg->next) { + if ((srec = tmsg->srec) && srec->tuid[0]) { + if (svars->drv[t]->memory_usage( svars->ctx[t] ) >= BufferLimit) { + svars->new_msgs[t] = tmsg; + goto out; + } + svars->new_total[t]++; + stats( svars ); + svars->state[t] |= ST_SENDING_NEW; + cv = nfmalloc( sizeof(*cv) ); + cv->cb = msg_copied; + cv->aux = AUX; + cv->srec = srec; + cv->msg = tmsg; + copy_msg( cv ); + svars->state[t] &= ~ST_SENDING_NEW; + if (check_cancel( svars )) + goto out; + } + } + svars->state[t] |= ST_SENT_NEW; + } + + if (svars->new_done[t] < svars->new_total[t]) + goto out; + Fprintf( svars->jfp, "%c %d\n", ")("[t], svars->maxuid[1-t] ); sync_close( svars, 1-t ); if (check_cancel( svars ))