fix socket_write() recursion

the synchronous writing to the socket would have typically invoked the
write callback, which would flush further commands, thus recursing.

we take the easy way out and make it fully asynchronous, i.e., no data
is sent before (re-)entering the event loop.

this also has the effect that socket_write() cannot fail any more, and
any errors will be reported asynchronously. this is consistent with
socket_read(), and produces cleaner code.

this introduces a marginal performance regression: the maildir driver is
synchronous, so all messages (which fit into memory) will be read before
any data is sent. this is not considered relevant.
This commit is contained in:
Oswald Buddenhagen 2015-05-09 19:17:41 +02:00
parent 2f7e60a3ed
commit 6c08f568d0
3 changed files with 55 additions and 69 deletions

View File

@ -266,7 +266,7 @@ done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, int response )
free( cmd );
}
static int
static void
send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
{
int bufl, litplus, iovcnt = 1;
@ -312,19 +312,13 @@ send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
iov[2].takeOwn = KeepOwn;
iovcnt = 3;
}
if (socket_write( &ctx->conn, iov, iovcnt ) < 0)
goto bail;
socket_write( &ctx->conn, iov, iovcnt );
if (cmd->param.to_trash && ctx->trashnc == TrashUnknown)
ctx->trashnc = TrashChecking;
cmd->next = 0;
*ctx->in_progress_append = cmd;
ctx->in_progress_append = &cmd->next;
ctx->num_in_progress++;
return 0;
bail:
done_imap_cmd( ctx, cmd, RESP_CANCEL );
return -1;
}
static int
@ -346,11 +340,10 @@ flush_imap_cmds( imap_store_t *ctx )
{
struct imap_cmd *cmd;
while ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
if ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
if (!(ctx->pending = cmd->next))
ctx->pending_append = &ctx->pending;
if (send_imap_cmd( ctx, cmd ) < 0)
return -1;
send_imap_cmd( ctx, cmd );
}
return 0;
}
@ -379,7 +372,7 @@ cancel_submitted_imap_cmds( imap_store_t *ctx )
}
}
static int
static void
submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
{
assert( ctx );
@ -396,10 +389,9 @@ submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
*ctx->pending_append = cmd;
ctx->pending_append = &cmd->next;
}
return 0;
} else {
send_imap_cmd( ctx, cmd );
}
return send_imap_cmd( ctx, cmd );
}
/* Minimal printf() replacement that supports an %\s format sequence to print backslash-escaped
@ -484,7 +476,7 @@ imap_vprintf( const char *fmt, va_list ap )
}
}
static int
static void
imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
void (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int response ),
const char *fmt, ... )
@ -497,7 +489,7 @@ imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
va_start( ap, fmt );
cmdp->cmd = imap_vprintf( fmt, ap );
va_end( ap );
return submit_imap_cmd( ctx, cmdp );
submit_imap_cmd( ctx, cmdp );
}
static void
@ -1332,8 +1324,7 @@ imap_socket_read( void *aux )
iov[1].buf = "\r\n";
iov[1].len = 2;
iov[1].takeOwn = KeepOwn;
if (socket_write( &ctx->conn, iov, 2 ) < 0)
return;
socket_write( &ctx->conn, iov, 2 );
} else if (cmdp->param.cont) {
if (cmdp->param.cont( ctx, cmdp, cmd ))
return;
@ -1369,9 +1360,8 @@ imap_socket_read( void *aux )
cmd2->orig_cmd = cmdp;
cmd2->gen.param.high_prio = 1;
p = strchr( cmdp->cmd, '"' );
if (imap_exec( ctx, &cmd2->gen, get_cmd_result_p2,
"CREATE %.*s", imap_strchr( p + 1, '"' ) - p + 1, p ) < 0)
return;
imap_exec( ctx, &cmd2->gen, get_cmd_result_p2,
"CREATE %.*s", imap_strchr( p + 1, '"' ) - p + 1, p );
continue;
}
resp = RESP_NO;
@ -1402,8 +1392,7 @@ imap_socket_read( void *aux )
return;
}
}
if (flush_imap_cmds( ctx ) < 0)
return;
flush_imap_cmds( ctx );
}
imap_invoke_bad_callback( ctx );
}
@ -1921,7 +1910,8 @@ do_sasl_auth( imap_store_t *ctx, struct imap_cmd *cmdp ATTR_UNUSED, const char *
iov[iovcnt].len = 2;
iov[iovcnt].takeOwn = KeepOwn;
iovcnt++;
return socket_write( &ctx->conn, iov, iovcnt );
socket_write( &ctx->conn, iov, iovcnt );
return 0;
bail:
imap_open_store_bail( ctx, FAIL_FINAL );
@ -2281,7 +2271,7 @@ imap_prepare_load_box( store_t *gctx, int opts )
gctx->opts = opts;
}
static int imap_submit_load( imap_store_t *, const char *, int, struct imap_cmd_refcounted_state * );
static void imap_submit_load( imap_store_t *, const char *, int, struct imap_cmd_refcounted_state * );
static void
imap_load_box( store_t *gctx, int minuid, int maxuid, int newuid, int *excs, int nexcs,
@ -2308,16 +2298,14 @@ imap_load_box( store_t *gctx, int minuid, int maxuid, int newuid, int *excs, int
if (i != j)
bl += sprintf( buf + bl, ":%d", excs[i] );
}
if (imap_submit_load( ctx, buf, 0, sts ) < 0)
goto done;
imap_submit_load( ctx, buf, 0, sts );
}
if (maxuid == INT_MAX)
maxuid = ctx->gen.uidnext ? ctx->gen.uidnext - 1 : 1000000000;
if (maxuid >= minuid) {
if ((ctx->gen.opts & OPEN_FIND) && minuid < newuid) {
sprintf( buf, "%d:%d", minuid, newuid - 1 );
if (imap_submit_load( ctx, buf, 0, sts ) < 0)
goto done;
imap_submit_load( ctx, buf, 0, sts );
if (newuid > maxuid)
goto done;
sprintf( buf, "%d:%d", newuid, maxuid );
@ -2332,10 +2320,10 @@ imap_load_box( store_t *gctx, int minuid, int maxuid, int newuid, int *excs, int
}
}
static int
static void
imap_submit_load( imap_store_t *ctx, const char *buf, int tuids, struct imap_cmd_refcounted_state *sts )
{
return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
"UID FETCH %s (UID%s%s%s)", buf,
(ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
(ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "",
@ -2396,14 +2384,14 @@ imap_make_flags( int flags, char *buf )
return d;
}
static int
static void
imap_flags_helper( imap_store_t *ctx, int uid, char what, int flags,
struct imap_cmd_refcounted_state *sts )
{
char buf[256];
buf[imap_make_flags( flags, buf )] = 0;
return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_set_flags_p2,
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_set_flags_p2,
"UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
}
@ -2422,8 +2410,10 @@ imap_set_msg_flags( store_t *gctx, message_t *msg, int uid, int add, int del,
}
if (add || del) {
struct imap_cmd_refcounted_state *sts = imap_refcounted_new_state( cb, aux );
if ((add && imap_flags_helper( ctx, uid, '+', add, sts ) < 0) ||
(del && imap_flags_helper( ctx, uid, '-', del, sts ) < 0)) {}
if (add)
imap_flags_helper( ctx, uid, '+', add, sts );
if (del)
imap_flags_helper( ctx, uid, '-', del, sts );
imap_refcounted_done( sts );
} else {
cb( DRV_OK, aux );
@ -2474,9 +2464,8 @@ imap_close_box( store_t *gctx,
}
if (!bl)
break;
if (imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
"UID EXPUNGE %s", buf ) < 0)
break;
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
"UID EXPUNGE %s", buf );
}
imap_refcounted_done( sts );
} else {
@ -2617,13 +2606,12 @@ imap_list_store( store_t *gctx, int flags,
imap_store_t *ctx = (imap_store_t *)gctx;
struct imap_cmd_refcounted_state *sts = imap_refcounted_new_state( cb, aux );
if (((flags & LIST_PATH) && (!(flags & LIST_INBOX) || !is_inbox( ctx, ctx->prefix, -1 )) &&
if ((flags & LIST_PATH) && (!(flags & LIST_INBOX) || !is_inbox( ctx, ctx->prefix, -1 )))
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
"LIST \"\" \"%\\s*\"", ctx->prefix ) < 0) ||
((flags & LIST_INBOX) && (!(flags & LIST_PATH) || *ctx->prefix) &&
"LIST \"\" \"%\\s*\"", ctx->prefix );
if ((flags & LIST_INBOX) && (!(flags & LIST_PATH) || *ctx->prefix))
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
"LIST \"\" INBOX*" ) < 0))
{}
"LIST \"\" INBOX*" );
imap_refcounted_done( sts );
}

View File

@ -755,6 +755,7 @@ do_queued_write( conn_t *conn )
return -1;
if (n != len) {
conn->write_offset += n;
conn->writing = 1;
return 0;
}
conn->write_offset = 0;
@ -764,6 +765,7 @@ do_queued_write( conn_t *conn )
if (conn->ssl && SSL_pending( conn->ssl ))
conf_wakeup( &conn->ssl_fake, 0 );
#endif
conn->writing = 0;
return conn->write_callback( conn->callback_aux );
}
@ -787,6 +789,8 @@ do_flush( conn_t *conn )
#ifdef HAVE_LIBZ
if (conn->out_z) {
int buf_avail = conn->append_avail;
if (!conn->z_written)
return;
do {
if (!bc) {
buf_avail = WRITE_CHUNK_SIZE;
@ -812,6 +816,7 @@ do_flush( conn_t *conn )
} while (!conn->out_z->avail_out);
conn->append_buf = bc;
conn->append_avail = buf_avail;
conn->z_written = 0;
} else
#endif
if (bc) {
@ -823,15 +828,15 @@ do_flush( conn_t *conn )
}
}
int
void
socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
{
int i, buf_avail, len, offset = 0, total = 0;
buff_chunk_t *bc, *exwb = conn->write_buf;
buff_chunk_t *bc;
for (i = 0; i < iovcnt; i++)
total += iov[i].len;
if (total >= WRITE_CHUNK_SIZE && pending_wakeup( &conn->fd_fake )) {
if (total >= WRITE_CHUNK_SIZE) {
/* If the new data is too big, queue the pending buffer to avoid latency. */
do_flush( conn );
}
@ -870,6 +875,7 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
bc->len = (char *)conn->out_z->next_out - bc->data;
buf_avail = conn->out_z->avail_out;
len -= conn->out_z->avail_in;
conn->z_written = 1;
} else
#endif
{
@ -898,17 +904,7 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
#ifdef HAVE_LIBZ
conn->append_avail = buf_avail;
#endif
/* Queue the pending write once the main loop goes idle. */
conf_wakeup( &conn->fd_fake,
#ifdef HAVE_LIBZ
/* Always give zlib a chance to flush its internal buffer. */
conn->out_z ||
#endif
bc ? 0 : -1 );
/* If no writes were queued before, ensure that flushing commences. */
if (!exwb)
return do_queued_write( conn );
return 0;
conf_wakeup( &conn->fd_fake, 0 );
}
static void
@ -963,10 +959,10 @@ socket_fake_cb( void *aux )
{
conn_t *conn = (conn_t *)aux;
buff_chunk_t *exwb = conn->write_buf;
/* Ensure that a pending write gets queued. */
do_flush( conn );
/* If no writes were queued before, ensure that flushing commences. */
if (!exwb)
/* If no writes are ongoing, start writing now. */
if (!conn->writing)
do_queued_write( conn );
}

View File

@ -83,6 +83,7 @@ typedef struct {
#ifdef HAVE_LIBZ
z_streamp in_z, out_z;
wakeup_t z_fake;
int z_written;
#endif
void (*bad_callback)( void *aux ); /* async fail while sending or listening */
@ -100,6 +101,7 @@ typedef struct {
/* writing */
buff_chunk_t *append_buf; /* accumulating buffer */
buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
int writing;
#ifdef HAVE_LIBZ
int append_avail; /* space left in accumulating buffer */
#endif
@ -145,6 +147,6 @@ typedef struct conn_iovec {
int len;
ownership_t takeOwn;
} conn_iovec_t;
int socket_write( conn_t *sock, conn_iovec_t *iov, int iovcnt );
void socket_write( conn_t *sock, conn_iovec_t *iov, int iovcnt );
#endif