fix socket_write() recursion
the synchronous writing to the socket would have typically invoked the write callback, which would flush further commands, thus recursing. we take the easy way out and make it fully asynchronous, i.e., no data is sent before (re-)entering the event loop. this also has the effect that socket_write() cannot fail any more, and any errors will be reported asynchronously. this is consistent with socket_read(), and produces cleaner code. this introduces a marginal performance regression: the maildir driver is synchronous, so all messages (which fit into memory) will be read before any data is sent. this is not considered relevant.
This commit is contained in:
parent
2f7e60a3ed
commit
6c08f568d0
|
@ -266,7 +266,7 @@ done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, int response )
|
|||
free( cmd );
|
||||
}
|
||||
|
||||
static int
|
||||
static void
|
||||
send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
|
||||
{
|
||||
int bufl, litplus, iovcnt = 1;
|
||||
|
@ -312,19 +312,13 @@ send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
|
|||
iov[2].takeOwn = KeepOwn;
|
||||
iovcnt = 3;
|
||||
}
|
||||
if (socket_write( &ctx->conn, iov, iovcnt ) < 0)
|
||||
goto bail;
|
||||
socket_write( &ctx->conn, iov, iovcnt );
|
||||
if (cmd->param.to_trash && ctx->trashnc == TrashUnknown)
|
||||
ctx->trashnc = TrashChecking;
|
||||
cmd->next = 0;
|
||||
*ctx->in_progress_append = cmd;
|
||||
ctx->in_progress_append = &cmd->next;
|
||||
ctx->num_in_progress++;
|
||||
return 0;
|
||||
|
||||
bail:
|
||||
done_imap_cmd( ctx, cmd, RESP_CANCEL );
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int
|
||||
|
@ -346,11 +340,10 @@ flush_imap_cmds( imap_store_t *ctx )
|
|||
{
|
||||
struct imap_cmd *cmd;
|
||||
|
||||
while ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
|
||||
if ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) {
|
||||
if (!(ctx->pending = cmd->next))
|
||||
ctx->pending_append = &ctx->pending;
|
||||
if (send_imap_cmd( ctx, cmd ) < 0)
|
||||
return -1;
|
||||
send_imap_cmd( ctx, cmd );
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -379,7 +372,7 @@ cancel_submitted_imap_cmds( imap_store_t *ctx )
|
|||
}
|
||||
}
|
||||
|
||||
static int
|
||||
static void
|
||||
submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
|
||||
{
|
||||
assert( ctx );
|
||||
|
@ -396,10 +389,9 @@ submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd )
|
|||
*ctx->pending_append = cmd;
|
||||
ctx->pending_append = &cmd->next;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
send_imap_cmd( ctx, cmd );
|
||||
}
|
||||
|
||||
return send_imap_cmd( ctx, cmd );
|
||||
}
|
||||
|
||||
/* Minimal printf() replacement that supports an %\s format sequence to print backslash-escaped
|
||||
|
@ -484,7 +476,7 @@ imap_vprintf( const char *fmt, va_list ap )
|
|||
}
|
||||
}
|
||||
|
||||
static int
|
||||
static void
|
||||
imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
|
||||
void (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int response ),
|
||||
const char *fmt, ... )
|
||||
|
@ -497,7 +489,7 @@ imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp,
|
|||
va_start( ap, fmt );
|
||||
cmdp->cmd = imap_vprintf( fmt, ap );
|
||||
va_end( ap );
|
||||
return submit_imap_cmd( ctx, cmdp );
|
||||
submit_imap_cmd( ctx, cmdp );
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -1332,8 +1324,7 @@ imap_socket_read( void *aux )
|
|||
iov[1].buf = "\r\n";
|
||||
iov[1].len = 2;
|
||||
iov[1].takeOwn = KeepOwn;
|
||||
if (socket_write( &ctx->conn, iov, 2 ) < 0)
|
||||
return;
|
||||
socket_write( &ctx->conn, iov, 2 );
|
||||
} else if (cmdp->param.cont) {
|
||||
if (cmdp->param.cont( ctx, cmdp, cmd ))
|
||||
return;
|
||||
|
@ -1369,9 +1360,8 @@ imap_socket_read( void *aux )
|
|||
cmd2->orig_cmd = cmdp;
|
||||
cmd2->gen.param.high_prio = 1;
|
||||
p = strchr( cmdp->cmd, '"' );
|
||||
if (imap_exec( ctx, &cmd2->gen, get_cmd_result_p2,
|
||||
"CREATE %.*s", imap_strchr( p + 1, '"' ) - p + 1, p ) < 0)
|
||||
return;
|
||||
imap_exec( ctx, &cmd2->gen, get_cmd_result_p2,
|
||||
"CREATE %.*s", imap_strchr( p + 1, '"' ) - p + 1, p );
|
||||
continue;
|
||||
}
|
||||
resp = RESP_NO;
|
||||
|
@ -1402,8 +1392,7 @@ imap_socket_read( void *aux )
|
|||
return;
|
||||
}
|
||||
}
|
||||
if (flush_imap_cmds( ctx ) < 0)
|
||||
return;
|
||||
flush_imap_cmds( ctx );
|
||||
}
|
||||
imap_invoke_bad_callback( ctx );
|
||||
}
|
||||
|
@ -1921,7 +1910,8 @@ do_sasl_auth( imap_store_t *ctx, struct imap_cmd *cmdp ATTR_UNUSED, const char *
|
|||
iov[iovcnt].len = 2;
|
||||
iov[iovcnt].takeOwn = KeepOwn;
|
||||
iovcnt++;
|
||||
return socket_write( &ctx->conn, iov, iovcnt );
|
||||
socket_write( &ctx->conn, iov, iovcnt );
|
||||
return 0;
|
||||
|
||||
bail:
|
||||
imap_open_store_bail( ctx, FAIL_FINAL );
|
||||
|
@ -2281,7 +2271,7 @@ imap_prepare_load_box( store_t *gctx, int opts )
|
|||
gctx->opts = opts;
|
||||
}
|
||||
|
||||
static int imap_submit_load( imap_store_t *, const char *, int, struct imap_cmd_refcounted_state * );
|
||||
static void imap_submit_load( imap_store_t *, const char *, int, struct imap_cmd_refcounted_state * );
|
||||
|
||||
static void
|
||||
imap_load_box( store_t *gctx, int minuid, int maxuid, int newuid, int *excs, int nexcs,
|
||||
|
@ -2308,16 +2298,14 @@ imap_load_box( store_t *gctx, int minuid, int maxuid, int newuid, int *excs, int
|
|||
if (i != j)
|
||||
bl += sprintf( buf + bl, ":%d", excs[i] );
|
||||
}
|
||||
if (imap_submit_load( ctx, buf, 0, sts ) < 0)
|
||||
goto done;
|
||||
imap_submit_load( ctx, buf, 0, sts );
|
||||
}
|
||||
if (maxuid == INT_MAX)
|
||||
maxuid = ctx->gen.uidnext ? ctx->gen.uidnext - 1 : 1000000000;
|
||||
if (maxuid >= minuid) {
|
||||
if ((ctx->gen.opts & OPEN_FIND) && minuid < newuid) {
|
||||
sprintf( buf, "%d:%d", minuid, newuid - 1 );
|
||||
if (imap_submit_load( ctx, buf, 0, sts ) < 0)
|
||||
goto done;
|
||||
imap_submit_load( ctx, buf, 0, sts );
|
||||
if (newuid > maxuid)
|
||||
goto done;
|
||||
sprintf( buf, "%d:%d", newuid, maxuid );
|
||||
|
@ -2332,14 +2320,14 @@ imap_load_box( store_t *gctx, int minuid, int maxuid, int newuid, int *excs, int
|
|||
}
|
||||
}
|
||||
|
||||
static int
|
||||
static void
|
||||
imap_submit_load( imap_store_t *ctx, const char *buf, int tuids, struct imap_cmd_refcounted_state *sts )
|
||||
{
|
||||
return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
|
||||
"UID FETCH %s (UID%s%s%s)", buf,
|
||||
(ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
|
||||
(ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "",
|
||||
tuids ? " BODY.PEEK[HEADER.FIELDS (X-TUID)]" : "");
|
||||
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
|
||||
"UID FETCH %s (UID%s%s%s)", buf,
|
||||
(ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "",
|
||||
(ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "",
|
||||
tuids ? " BODY.PEEK[HEADER.FIELDS (X-TUID)]" : "");
|
||||
}
|
||||
|
||||
/******************* imap_fetch_msg *******************/
|
||||
|
@ -2396,15 +2384,15 @@ imap_make_flags( int flags, char *buf )
|
|||
return d;
|
||||
}
|
||||
|
||||
static int
|
||||
static void
|
||||
imap_flags_helper( imap_store_t *ctx, int uid, char what, int flags,
|
||||
struct imap_cmd_refcounted_state *sts )
|
||||
{
|
||||
char buf[256];
|
||||
|
||||
buf[imap_make_flags( flags, buf )] = 0;
|
||||
return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_set_flags_p2,
|
||||
"UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
|
||||
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_set_flags_p2,
|
||||
"UID STORE %d %cFLAGS.SILENT %s", uid, what, buf );
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -2422,8 +2410,10 @@ imap_set_msg_flags( store_t *gctx, message_t *msg, int uid, int add, int del,
|
|||
}
|
||||
if (add || del) {
|
||||
struct imap_cmd_refcounted_state *sts = imap_refcounted_new_state( cb, aux );
|
||||
if ((add && imap_flags_helper( ctx, uid, '+', add, sts ) < 0) ||
|
||||
(del && imap_flags_helper( ctx, uid, '-', del, sts ) < 0)) {}
|
||||
if (add)
|
||||
imap_flags_helper( ctx, uid, '+', add, sts );
|
||||
if (del)
|
||||
imap_flags_helper( ctx, uid, '-', del, sts );
|
||||
imap_refcounted_done( sts );
|
||||
} else {
|
||||
cb( DRV_OK, aux );
|
||||
|
@ -2474,9 +2464,8 @@ imap_close_box( store_t *gctx,
|
|||
}
|
||||
if (!bl)
|
||||
break;
|
||||
if (imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
|
||||
"UID EXPUNGE %s", buf ) < 0)
|
||||
break;
|
||||
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
|
||||
"UID EXPUNGE %s", buf );
|
||||
}
|
||||
imap_refcounted_done( sts );
|
||||
} else {
|
||||
|
@ -2617,13 +2606,12 @@ imap_list_store( store_t *gctx, int flags,
|
|||
imap_store_t *ctx = (imap_store_t *)gctx;
|
||||
struct imap_cmd_refcounted_state *sts = imap_refcounted_new_state( cb, aux );
|
||||
|
||||
if (((flags & LIST_PATH) && (!(flags & LIST_INBOX) || !is_inbox( ctx, ctx->prefix, -1 )) &&
|
||||
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
|
||||
"LIST \"\" \"%\\s*\"", ctx->prefix ) < 0) ||
|
||||
((flags & LIST_INBOX) && (!(flags & LIST_PATH) || *ctx->prefix) &&
|
||||
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
|
||||
"LIST \"\" INBOX*" ) < 0))
|
||||
{}
|
||||
if ((flags & LIST_PATH) && (!(flags & LIST_INBOX) || !is_inbox( ctx, ctx->prefix, -1 )))
|
||||
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
|
||||
"LIST \"\" \"%\\s*\"", ctx->prefix );
|
||||
if ((flags & LIST_INBOX) && (!(flags & LIST_PATH) || *ctx->prefix))
|
||||
imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_refcounted_done_box,
|
||||
"LIST \"\" INBOX*" );
|
||||
imap_refcounted_done( sts );
|
||||
}
|
||||
|
||||
|
|
30
src/socket.c
30
src/socket.c
|
@ -755,6 +755,7 @@ do_queued_write( conn_t *conn )
|
|||
return -1;
|
||||
if (n != len) {
|
||||
conn->write_offset += n;
|
||||
conn->writing = 1;
|
||||
return 0;
|
||||
}
|
||||
conn->write_offset = 0;
|
||||
|
@ -764,6 +765,7 @@ do_queued_write( conn_t *conn )
|
|||
if (conn->ssl && SSL_pending( conn->ssl ))
|
||||
conf_wakeup( &conn->ssl_fake, 0 );
|
||||
#endif
|
||||
conn->writing = 0;
|
||||
return conn->write_callback( conn->callback_aux );
|
||||
}
|
||||
|
||||
|
@ -787,6 +789,8 @@ do_flush( conn_t *conn )
|
|||
#ifdef HAVE_LIBZ
|
||||
if (conn->out_z) {
|
||||
int buf_avail = conn->append_avail;
|
||||
if (!conn->z_written)
|
||||
return;
|
||||
do {
|
||||
if (!bc) {
|
||||
buf_avail = WRITE_CHUNK_SIZE;
|
||||
|
@ -812,6 +816,7 @@ do_flush( conn_t *conn )
|
|||
} while (!conn->out_z->avail_out);
|
||||
conn->append_buf = bc;
|
||||
conn->append_avail = buf_avail;
|
||||
conn->z_written = 0;
|
||||
} else
|
||||
#endif
|
||||
if (bc) {
|
||||
|
@ -823,15 +828,15 @@ do_flush( conn_t *conn )
|
|||
}
|
||||
}
|
||||
|
||||
int
|
||||
void
|
||||
socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
|
||||
{
|
||||
int i, buf_avail, len, offset = 0, total = 0;
|
||||
buff_chunk_t *bc, *exwb = conn->write_buf;
|
||||
buff_chunk_t *bc;
|
||||
|
||||
for (i = 0; i < iovcnt; i++)
|
||||
total += iov[i].len;
|
||||
if (total >= WRITE_CHUNK_SIZE && pending_wakeup( &conn->fd_fake )) {
|
||||
if (total >= WRITE_CHUNK_SIZE) {
|
||||
/* If the new data is too big, queue the pending buffer to avoid latency. */
|
||||
do_flush( conn );
|
||||
}
|
||||
|
@ -870,6 +875,7 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
|
|||
bc->len = (char *)conn->out_z->next_out - bc->data;
|
||||
buf_avail = conn->out_z->avail_out;
|
||||
len -= conn->out_z->avail_in;
|
||||
conn->z_written = 1;
|
||||
} else
|
||||
#endif
|
||||
{
|
||||
|
@ -898,17 +904,7 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt )
|
|||
#ifdef HAVE_LIBZ
|
||||
conn->append_avail = buf_avail;
|
||||
#endif
|
||||
/* Queue the pending write once the main loop goes idle. */
|
||||
conf_wakeup( &conn->fd_fake,
|
||||
#ifdef HAVE_LIBZ
|
||||
/* Always give zlib a chance to flush its internal buffer. */
|
||||
conn->out_z ||
|
||||
#endif
|
||||
bc ? 0 : -1 );
|
||||
/* If no writes were queued before, ensure that flushing commences. */
|
||||
if (!exwb)
|
||||
return do_queued_write( conn );
|
||||
return 0;
|
||||
conf_wakeup( &conn->fd_fake, 0 );
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -963,10 +959,10 @@ socket_fake_cb( void *aux )
|
|||
{
|
||||
conn_t *conn = (conn_t *)aux;
|
||||
|
||||
buff_chunk_t *exwb = conn->write_buf;
|
||||
/* Ensure that a pending write gets queued. */
|
||||
do_flush( conn );
|
||||
/* If no writes were queued before, ensure that flushing commences. */
|
||||
if (!exwb)
|
||||
/* If no writes are ongoing, start writing now. */
|
||||
if (!conn->writing)
|
||||
do_queued_write( conn );
|
||||
}
|
||||
|
||||
|
|
|
@ -83,6 +83,7 @@ typedef struct {
|
|||
#ifdef HAVE_LIBZ
|
||||
z_streamp in_z, out_z;
|
||||
wakeup_t z_fake;
|
||||
int z_written;
|
||||
#endif
|
||||
|
||||
void (*bad_callback)( void *aux ); /* async fail while sending or listening */
|
||||
|
@ -100,6 +101,7 @@ typedef struct {
|
|||
/* writing */
|
||||
buff_chunk_t *append_buf; /* accumulating buffer */
|
||||
buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */
|
||||
int writing;
|
||||
#ifdef HAVE_LIBZ
|
||||
int append_avail; /* space left in accumulating buffer */
|
||||
#endif
|
||||
|
@ -145,6 +147,6 @@ typedef struct conn_iovec {
|
|||
int len;
|
||||
ownership_t takeOwn;
|
||||
} conn_iovec_t;
|
||||
int socket_write( conn_t *sock, conn_iovec_t *iov, int iovcnt );
|
||||
void socket_write( conn_t *sock, conn_iovec_t *iov, int iovcnt );
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue
Block a user