soft-limit peak memory usage

propagating many messages from a fast store (typically maildir or a
local IMAP server) to a slow asynchronous store could cause gigabytes of
data being buffered. avoid this by throttling fetches if the target
context reports memory usage above a configurable limit.

REFMAIL: 9737edb14457c71af4ed156c1be0ae59@mpcjanssen.nl
This commit is contained in:
Oswald Buddenhagen 2015-02-15 18:13:05 +01:00
parent 4b31522fdf
commit a8b26dc4ac
10 changed files with 87 additions and 18 deletions

View File

@ -76,6 +76,8 @@ extern int Pid;
extern char Hostname[256];
extern const char *Home;
extern int BufferLimit;
/* util.c */
void ATTR_PRINTFLIKE(1, 2) debug( const char *, ... );

View File

@ -482,6 +482,14 @@ load_config( const char *where, int pseudo )
}
}
}
else if (!strcasecmp( "BufferLimit", cfile.cmd ))
{
BufferLimit = parse_size( &cfile );
if (BufferLimit <= 0) {
error( "%s:%d: BufferLimit must be positive\n", cfile.file, cfile.line );
cfile.err = 1;
}
}
else if (!getopt_helper( &cfile, &gcops, &global_conf ))
{
error( "%s:%d: unknown section keyword '%s'\n",

View File

@ -246,6 +246,9 @@ struct driver {
/* Commit any pending set_msg_flags() commands. */
void (*commit_cmds)( store_t *ctx );
/* Get approximate amount of memory occupied by the driver. */
int (*memory_usage)( store_t *ctx );
};
void free_generic_messages( message_t * );

View File

@ -111,6 +111,7 @@ typedef struct imap_store {
int nexttag, num_in_progress;
struct imap_cmd *pending, **pending_append;
struct imap_cmd *in_progress, **in_progress_append;
int buffer_mem; /* memory currently occupied by buffers in the queue */
/* Used during sequential operations like connect */
enum { GreetingPending = 0, GreetingBad, GreetingOk, GreetingPreauth } greeting;
@ -256,7 +257,10 @@ static void
done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, int response )
{
cmd->param.done( ctx, cmd, response );
free( cmd->param.data );
if (cmd->param.data) {
free( cmd->param.data );
ctx->buffer_mem -= cmd->param.data_len;
}
free( cmd->cmd );
free( cmd );
}
@ -299,6 +303,7 @@ send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
iov[1].len = cmd->param.data_len;
iov[1].takeOwn = GiveOwn;
cmd->param.data = 0;
ctx->buffer_mem -= cmd->param.data_len;
iov[2].buf = "\r\n";
iov[2].len = 2;
iov[2].takeOwn = KeepOwn;
@ -1317,6 +1322,7 @@ imap_socket_read( void *aux )
iov[0].len = cmdp->param.data_len;
iov[0].takeOwn = GiveOwn;
cmdp->param.data = 0;
ctx->buffer_mem -= cmdp->param.data_len;
iov[1].buf = "\r\n";
iov[1].len = 2;
iov[1].takeOwn = KeepOwn;
@ -2507,6 +2513,7 @@ imap_store_msg( store_t *gctx, msg_data_t *data, int to_trash,
flagstr[d] = 0;
INIT_IMAP_CMD(imap_cmd_out_uid, cmd, cb, aux)
ctx->buffer_mem += data->len;
cmd->gen.param.data_len = data->len;
cmd->gen.param.data = data->data;
cmd->out_uid = -2;
@ -2621,6 +2628,16 @@ imap_commit_cmds( store_t *gctx )
(void)gctx;
}
/******************* imap_memory_usage *******************/
static int
imap_memory_usage( store_t *gctx )
{
imap_store_t *ctx = (imap_store_t *)gctx;
return ctx->buffer_mem + ctx->conn.buffer_mem;
}
/******************* imap_parse_store *******************/
imap_server_conf_t *servers, **serverapp = &servers;
@ -2899,4 +2916,5 @@ struct driver imap_driver = {
imap_close_box,
imap_cancel_cmds,
imap_commit_cmds,
imap_memory_usage,
};

View File

@ -1605,6 +1605,12 @@ maildir_commit_cmds( store_t *gctx )
(void) gctx;
}
static int
maildir_memory_usage( store_t *gctx ATTR_UNUSED )
{
return 0;
}
static int
maildir_parse_store( conffile_t *cfg, store_conf_t **storep )
{
@ -1672,4 +1678,5 @@ struct driver maildir_driver = {
maildir_close_box,
maildir_cancel_cmds,
maildir_commit_cmds,
maildir_memory_usage,
};

View File

@ -43,6 +43,8 @@ int Pid; /* for maildir and imap */
char Hostname[256]; /* for maildir */
const char *Home; /* for config */
int BufferLimit = 10 * 1024 * 1024;
static void
version( void )
{

View File

@ -583,6 +583,14 @@ This option is meaningless for \fBSyncState\fR if the latter is \fB*\fR,
obviously. However, it also determines the default of \fBInfoDelimiter\fR.
(Global default: \fI;\fR on Windows, \fI:\fR everywhere else)
..
.TP
\fBBufferLimit\fR \fIsize\fR[\fBk\fR|\fBm\fR][\fBb\fR]
The per-Channel, per-direction instantaneous memory usage above which
\fBmbsync\fR will refrain from using more memory. Note that this is no
absolute limit, as even a single message can consume more memory than
this.
(Default: \fI10M\fR)
..
.SH RECOMMENDATIONS
Make sure your IMAP server does not auto-expunge deleted messages - it is
slow, and semantically somewhat questionable. Specifically, Gmail needs to

View File

@ -737,6 +737,7 @@ dispose_chunk( conn_t *conn )
buff_chunk_t *bc = conn->write_buf;
if (!(conn->write_buf = bc->next))
conn->write_buf_append = &conn->write_buf;
conn->buffer_mem -= bc->len;
free( bc );
}
@ -770,6 +771,7 @@ static void
do_append( conn_t *conn, buff_chunk_t *bc )
{
bc->next = 0;
conn->buffer_mem += bc->len;
*conn->write_buf_append = bc;
conn->write_buf_append = &bc->next;
}

View File

@ -104,6 +104,7 @@ typedef struct {
int append_avail; /* space left in accumulating buffer */
#endif
int write_offset; /* offset into buffer head */
int buffer_mem; /* memory currently occupied by buffers in the queue */
/* reading */
int offset; /* start of filled bytes in buffer */

View File

@ -154,6 +154,7 @@ typedef struct {
store_t *ctx[2];
driver_t *drv[2];
const char *orig_name[2];
message_t *new_msgs[2];
int state[2], ref_count, nsrecs, ret, lfd, existing, replayed;
int new_total[2], new_done[2];
int flags_total[2], flags_done[2];
@ -207,6 +208,7 @@ static int check_cancel( sync_vars_t *svars );
#define ST_CLOSING (1<<12)
#define ST_CONFIRMED (1<<13)
#define ST_PRESENT (1<<14)
#define ST_SENDING_NEW (1<<15)
static void
@ -1336,7 +1338,6 @@ box_loaded( int sts, void *aux )
sync_rec_t *srec;
sync_rec_map_t *srecmap;
message_t *tmsg;
copy_vars_t *cv;
flag_vars_t *fv;
int uid, no[2], del[2], alive, todel, t1, t2;
int sflags, nflags, aflags, dflags, nex;
@ -1724,21 +1725,7 @@ box_loaded( int sts, void *aux )
for (t = 0; t < 2; t++) {
svars->newuid[t] = svars->ctx[t]->uidnext;
Fprintf( svars->jfp, "%c %d\n", "{}"[t], svars->newuid[t] );
for (tmsg = svars->ctx[1-t]->msgs; tmsg; tmsg = tmsg->next) {
if ((srec = tmsg->srec) && srec->tuid[0]) {
svars->new_total[t]++;
stats( svars );
cv = nfmalloc( sizeof(*cv) );
cv->cb = msg_copied;
cv->aux = AUX;
cv->srec = srec;
cv->msg = tmsg;
copy_msg( cv );
if (check_cancel( svars ))
goto out;
}
}
svars->state[t] |= ST_SENT_NEW;
svars->new_msgs[t] = svars->ctx[1-t]->msgs;
msgs_copied( svars, t );
if (check_cancel( svars ))
goto out;
@ -1809,11 +1796,42 @@ static void sync_close( sync_vars_t *svars, int t );
static void
msgs_copied( sync_vars_t *svars, int t )
{
if (!(svars->state[t] & ST_SENT_NEW) || svars->new_done[t] < svars->new_total[t])
message_t *tmsg;
sync_rec_t *srec;
copy_vars_t *cv;
if (svars->state[t] & ST_SENDING_NEW)
return;
sync_ref( svars );
if (!(svars->state[t] & ST_SENT_NEW)) {
for (tmsg = svars->new_msgs[t]; tmsg; tmsg = tmsg->next) {
if ((srec = tmsg->srec) && srec->tuid[0]) {
if (svars->drv[t]->memory_usage( svars->ctx[t] ) >= BufferLimit) {
svars->new_msgs[t] = tmsg;
goto out;
}
svars->new_total[t]++;
stats( svars );
svars->state[t] |= ST_SENDING_NEW;
cv = nfmalloc( sizeof(*cv) );
cv->cb = msg_copied;
cv->aux = AUX;
cv->srec = srec;
cv->msg = tmsg;
copy_msg( cv );
svars->state[t] &= ~ST_SENDING_NEW;
if (check_cancel( svars ))
goto out;
}
}
svars->state[t] |= ST_SENT_NEW;
}
if (svars->new_done[t] < svars->new_total[t])
goto out;
Fprintf( svars->jfp, "%c %d\n", ")("[t], svars->maxuid[1-t] );
sync_close( svars, 1-t );
if (check_cancel( svars ))