wrap message storing into transactions. nice side effect: drivers don't

need to deal with line end conversion any move.
This commit is contained in:
Oswald Buddenhagen 2006-02-03 21:33:43 +00:00
parent 19128f1587
commit 850addecd5
6 changed files with 349 additions and 176 deletions

7
TODO
View File

@ -11,13 +11,6 @@ fix maildir_{open_store,list} to handle partial names (last char not slash).
add a way to automatically create and sync subfolders. add a way to automatically create and sync subfolders.
could store TUID even when UIDPLUS is supported. would avoid duplicated
messages after abort before new UID arrives.
decouple TUID search from append. that's a prerequisite for usable
MULTIAPPEND, and is generally good for async. should be way faster, too,
as it saves repeated mailbox rescans with single-file formats.
use MULTIAPPEND and FETCH with multiple messages. use MULTIAPPEND and FETCH with multiple messages.
create dummies describing MIME structure of messages bigger than MaxSize. create dummies describing MIME structure of messages bigger than MaxSize.

View File

@ -758,7 +758,7 @@ parse_fetch( imap_t *imap, char *cmd ) /* move this down */
if (is_atom( tmp )) if (is_atom( tmp ))
size = atoi( tmp->val ); size = atoi( tmp->val );
else else
fprintf( stderr, "IMAP error: unable to parse SIZE\n" ); fprintf( stderr, "IMAP error: unable to parse RFC822.SIZE\n" );
} else if (!strcmp( "BODY[]", tmp->val )) { } else if (!strcmp( "BODY[]", tmp->val )) {
tmp = tmp->next; tmp = tmp->next;
if (is_atom( tmp )) { if (is_atom( tmp )) {
@ -782,7 +782,6 @@ parse_fetch( imap_t *imap, char *cmd ) /* move this down */
msgdata = (msg_data_t *)cmdp->cb.ctx; msgdata = (msg_data_t *)cmdp->cb.ctx;
msgdata->data = body; msgdata->data = body;
msgdata->len = size; msgdata->len = size;
msgdata->crlf = 1;
if (status & M_FLAGS) if (status & M_FLAGS)
msgdata->flags = mask; msgdata->flags = mask;
} else if (uid) { /* ignore async flag updates for now */ } else if (uid) { /* ignore async flag updates for now */
@ -866,10 +865,14 @@ parse_search( imap_t *imap, char *cmd )
struct imap_cmd *cmdp; struct imap_cmd *cmdp;
int uid; int uid;
arg = next_arg( &cmd ); if (!(arg = next_arg( &cmd )))
if (!arg || !(uid = atoi( arg ))) { uid = -1;
else if (!(uid = atoi( arg ))) {
fprintf( stderr, "IMAP error: malformed SEARCH response\n" ); fprintf( stderr, "IMAP error: malformed SEARCH response\n" );
return; return;
} else if (next_arg( &cmd )) {
warn( "IMAP warning: SEARCH returns multiple matches\n" );
uid = -1; /* to avoid havoc */
} }
/* Find the first command that expects a UID - this is guaranteed /* Find the first command that expects a UID - this is guaranteed
@ -1546,89 +1549,15 @@ imap_trash_msg( store_t *gctx, message_t *msg )
msg->uid, ctx->prefix, gctx->conf->trash ); msg->uid, ctx->prefix, gctx->conf->trash );
} }
#define TUIDL 8
static int static int
imap_store_msg( store_t *gctx, msg_data_t *data, int *uid ) imap_store_msg( store_t *gctx, msg_data_t *data, int *uid )
{ {
imap_store_t *ctx = (imap_store_t *)gctx; imap_store_t *ctx = (imap_store_t *)gctx;
imap_t *imap = ctx->imap; imap_t *imap = ctx->imap;
struct imap_cmd_cb cb; struct imap_cmd_cb cb;
char *fmap, *buf;
const char *prefix, *box; const char *prefix, *box;
int ret, i, j, d, len, extra, nocr; int ret, d;
int start, sbreak = 0, ebreak = 0; char flagstr[128];
char flagstr[128], tuid[TUIDL * 2 + 1];
memset( &cb, 0, sizeof(cb) );
fmap = data->data;
len = data->len;
nocr = !data->crlf;
extra = 0, i = 0;
if (!CAP(UIDPLUS) && uid) {
nloop:
start = i;
while (i < len)
if (fmap[i++] == '\n') {
extra += nocr;
if (i - 2 + nocr == start) {
sbreak = ebreak = i - 2 + nocr;
goto mktid;
}
if (!memcmp( fmap + start, "X-TUID: ", 8 )) {
extra -= (ebreak = i) - (sbreak = start) + nocr;
goto mktid;
}
goto nloop;
}
/* invalid message */
free( fmap );
return DRV_MSG_BAD;
mktid:
for (j = 0; j < TUIDL; j++)
sprintf( tuid + j * 2, "%02x", arc4_getbyte() );
extra += 8 + TUIDL * 2 + 2;
}
if (nocr)
for (; i < len; i++)
if (fmap[i] == '\n')
extra++;
cb.dlen = len + extra;
buf = cb.data = nfmalloc( cb.dlen );
i = 0;
if (!CAP(UIDPLUS) && uid) {
if (nocr) {
for (; i < sbreak; i++)
if (fmap[i] == '\n') {
*buf++ = '\r';
*buf++ = '\n';
} else
*buf++ = fmap[i];
} else {
memcpy( buf, fmap, sbreak );
buf += sbreak;
}
memcpy( buf, "X-TUID: ", 8 );
buf += 8;
memcpy( buf, tuid, TUIDL * 2 );
buf += TUIDL * 2;
*buf++ = '\r';
*buf++ = '\n';
i = ebreak;
}
if (nocr) {
for (; i < len; i++)
if (fmap[i] == '\n') {
*buf++ = '\r';
*buf++ = '\n';
} else
*buf++ = fmap[i];
} else
memcpy( buf, fmap + i, len - i );
free( fmap );
d = 0; d = 0;
if (data->flags) { if (data->flags) {
@ -1637,6 +1566,9 @@ imap_store_msg( store_t *gctx, msg_data_t *data, int *uid )
} }
flagstr[d] = 0; flagstr[d] = 0;
memset( &cb, 0, sizeof(cb) );
cb.dlen = data->len;
cb.data = data->data;
if (!uid) { if (!uid) {
box = gctx->conf->trash; box = gctx->conf->trash;
prefix = ctx->prefix; prefix = ctx->prefix;
@ -1649,6 +1581,7 @@ imap_store_msg( store_t *gctx, msg_data_t *data, int *uid )
cb.create = (gctx->opts & OPEN_CREATE) != 0; cb.create = (gctx->opts & OPEN_CREATE) != 0;
/*if (ctx->currentnc) /*if (ctx->currentnc)
imap->caps = imap->rcaps & ~(1 << LITERALPLUS);*/ imap->caps = imap->rcaps & ~(1 << LITERALPLUS);*/
*uid = -2;
} }
cb.ctx = uid; cb.ctx = uid;
ret = imap_exec_m( ctx, &cb, "APPEND \"%s%s\" %s", prefix, box, flagstr ); ret = imap_exec_m( ctx, &cb, "APPEND \"%s%s\" %s", prefix, box, flagstr );
@ -1662,13 +1595,23 @@ imap_store_msg( store_t *gctx, msg_data_t *data, int *uid )
gctx->count++; gctx->count++;
} }
if (CAP(UIDPLUS) || !uid)
return DRV_OK; return DRV_OK;
}
/* Didn't receive an APPENDUID */ static int
imap_find_msg( store_t *gctx, const char *tuid, int *uid )
{
imap_store_t *ctx = (imap_store_t *)gctx;
struct imap_cmd_cb cb;
int ret;
memset( &cb, 0, sizeof(cb) );
cb.ctx = uid;
cb.uid = -1; /* we're looking for a UID */ cb.uid = -1; /* we're looking for a UID */
cb.data = 0; /* reset; ctx still set */ *uid = -1; /* in case we get no SEARCH response at all */
return imap_exec_m( ctx, &cb, "UID SEARCH HEADER X-TUID %s", tuid ); if ((ret = imap_exec_m( ctx, &cb, "UID SEARCH HEADER X-TUID %." stringify(TUIDL) "s", tuid )) != DRV_OK)
return ret;
return *uid < 0 ? DRV_MSG_BAD : DRV_OK;
} }
static int static int
@ -1813,6 +1756,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep, int *err )
} }
struct driver imap_driver = { struct driver imap_driver = {
DRV_CRLF,
imap_parse_store, imap_parse_store,
imap_open_store, imap_open_store,
imap_close_store, imap_close_store,
@ -1822,6 +1766,7 @@ struct driver imap_driver = {
imap_select, imap_select,
imap_fetch_msg, imap_fetch_msg,
imap_store_msg, imap_store_msg,
imap_find_msg,
imap_set_flags, imap_set_flags,
imap_trash_msg, imap_trash_msg,
imap_check, imap_check,

View File

@ -57,6 +57,7 @@ typedef struct maildir_store_conf {
typedef struct maildir_message { typedef struct maildir_message {
message_t gen; message_t gen;
char *base; char *base;
char tuid[TUIDL];
} maildir_message_t; } maildir_message_t;
typedef struct maildir_store { typedef struct maildir_store {
@ -182,6 +183,7 @@ typedef struct {
char *base; char *base;
int size; int size;
unsigned uid:31, recent:1; unsigned uid:31, recent:1;
char tuid[TUIDL];
} msg_t; } msg_t;
typedef struct { typedef struct {
@ -468,6 +470,7 @@ static int
maildir_scan( maildir_store_t *ctx, msglist_t *msglist ) maildir_scan( maildir_store_t *ctx, msglist_t *msglist )
{ {
DIR *d; DIR *d;
FILE *f;
struct dirent *e; struct dirent *e;
const char *u, *ru; const char *u, *ru;
#ifdef USE_DB #ifdef USE_DB
@ -570,6 +573,7 @@ maildir_scan( maildir_store_t *ctx, msglist_t *msglist )
entry->uid = uid; entry->uid = uid;
entry->recent = i; entry->recent = i;
entry->size = 0; entry->size = 0;
entry->tuid[0] = 0;
} }
} }
closedir( d ); closedir( d );
@ -615,7 +619,7 @@ maildir_scan( maildir_store_t *ctx, msglist_t *msglist )
goto again; goto again;
} }
uid = entry->uid; uid = entry->uid;
if (ctx->gen.opts & OPEN_SIZE) if (ctx->gen.opts & (OPEN_SIZE|OPEN_FIND))
nfsnprintf( buf + bl, sizeof(buf) - bl, "%s/%s", subdirs[entry->recent], entry->base ); nfsnprintf( buf + bl, sizeof(buf) - bl, "%s/%s", subdirs[entry->recent], entry->base );
#ifdef USE_DB #ifdef USE_DB
} else if (ctx->db) { } else if (ctx->db) {
@ -624,7 +628,7 @@ maildir_scan( maildir_store_t *ctx, msglist_t *msglist )
return ret; return ret;
} }
entry->uid = uid; entry->uid = uid;
if (ctx->gen.opts & OPEN_SIZE) if (ctx->gen.opts & (OPEN_SIZE|OPEN_FIND))
nfsnprintf( buf + bl, sizeof(buf) - bl, "%s/%s", subdirs[entry->recent], entry->base ); nfsnprintf( buf + bl, sizeof(buf) - bl, "%s/%s", subdirs[entry->recent], entry->base );
#endif /* USE_DB */ #endif /* USE_DB */
} else { } else {
@ -645,6 +649,7 @@ maildir_scan( maildir_store_t *ctx, msglist_t *msglist )
memcpy( nbuf, buf, bl + 4 ); memcpy( nbuf, buf, bl + 4 );
nfsnprintf( nbuf + bl + 4, sizeof(nbuf) - bl - 4, "%s", entry->base ); nfsnprintf( nbuf + bl + 4, sizeof(nbuf) - bl - 4, "%s", entry->base );
if (rename( nbuf, buf )) { if (rename( nbuf, buf )) {
notok:
if (errno != ENOENT) { if (errno != ENOENT) {
perror( buf ); perror( buf );
maildir_uidval_unlock( ctx ); maildir_uidval_unlock( ctx );
@ -659,12 +664,23 @@ maildir_scan( maildir_store_t *ctx, msglist_t *msglist )
memcpy( entry->base, buf + bl + 4, fnl ); memcpy( entry->base, buf + bl + 4, fnl );
} }
if (ctx->gen.opts & OPEN_SIZE) { if (ctx->gen.opts & OPEN_SIZE) {
if (stat( buf, &st )) { if (stat( buf, &st ))
maildir_free_scan( msglist ); goto notok;
goto again;
}
entry->size = st.st_size; entry->size = st.st_size;
} }
if (ctx->gen.opts & OPEN_FIND) {
if (!(f = fopen( buf, "r" )))
goto notok;
while (fgets( nbuf, sizeof(nbuf), f )) {
if (!nbuf[0] || nbuf[0] == '\n')
break;
if (!memcmp( nbuf, "X-TUID: ", 8 ) && nbuf[8 + TUIDL] == '\n') {
memcpy( entry->tuid, nbuf + 8, TUIDL );
break;
}
}
fclose( f );
}
} }
ctx->uvok = 1; ctx->uvok = 1;
} }
@ -681,6 +697,7 @@ maildir_init_msg( maildir_store_t *ctx, maildir_message_t *msg, msg_t *entry )
msg->base = entry->base; msg->base = entry->base;
entry->base = 0; /* prevent deletion */ entry->base = 0; /* prevent deletion */
msg->gen.size = entry->size; msg->gen.size = entry->size;
strncpy( msg->tuid, entry->tuid, TUIDL );
if (entry->recent) if (entry->recent)
msg->gen.status |= M_RECENT; msg->gen.status |= M_RECENT;
if (ctx->gen.opts & OPEN_FLAGS) { if (ctx->gen.opts & OPEN_FLAGS) {
@ -902,7 +919,6 @@ maildir_fetch_msg( store_t *gctx, message_t *gmsg, msg_data_t *data )
return ret; return ret;
} }
fstat( fd, &st ); fstat( fd, &st );
data->crlf = 0;
data->len = st.st_size; data->len = st.st_size;
data->data = nfmalloc( data->len ); data->data = nfmalloc( data->len );
if (read( fd, data->data, data->len ) != data->len) { if (read( fd, data->data, data->len ) != data->len) {
@ -981,7 +997,6 @@ maildir_store_msg( store_t *gctx, msg_data_t *data, int *uid )
return DRV_BOX_BAD; return DRV_BOX_BAD;
} }
} }
strip_cr( data );
ret = write( fd, data->data, data->len ); ret = write( fd, data->data, data->len );
free( data->data ); free( data->data );
if (ret != data->len) { if (ret != data->len) {
@ -1003,6 +1018,20 @@ maildir_store_msg( store_t *gctx, msg_data_t *data, int *uid )
return DRV_OK; return DRV_OK;
} }
static int
maildir_find_msg( store_t *gctx, const char *tuid, int *uid )
{
message_t *msg;
/* using a hash table might turn out to be more appropriate ... */
for (msg = gctx->msgs; msg; msg = msg->next)
if (!(msg->status & M_DEAD) && !memcmp( ((maildir_message_t *)msg)->tuid, tuid, TUIDL )) {
*uid = msg->uid;
return DRV_OK;
}
return DRV_MSG_BAD;
}
static int static int
maildir_set_flags( store_t *gctx, message_t *gmsg, int uid, int add, int del ) maildir_set_flags( store_t *gctx, message_t *gmsg, int uid, int add, int del )
{ {
@ -1189,6 +1218,7 @@ maildir_parse_store( conffile_t *cfg, store_conf_t **storep, int *err )
} }
struct driver maildir_driver = { struct driver maildir_driver = {
0,
maildir_parse_store, maildir_parse_store,
maildir_open_store, maildir_open_store,
maildir_close_store, maildir_close_store,
@ -1198,6 +1228,7 @@ struct driver maildir_driver = {
maildir_select, maildir_select,
maildir_fetch_msg, maildir_fetch_msg,
maildir_store_msg, maildir_store_msg,
maildir_find_msg,
maildir_set_flags, maildir_set_flags,
maildir_trash_msg, maildir_trash_msg,
maildir_check, maildir_check,

View File

@ -31,6 +31,9 @@
#define as(ar) (sizeof(ar)/sizeof(ar[0])) #define as(ar) (sizeof(ar)/sizeof(ar[0]))
#define __stringify(x) #x
#define stringify(x) __stringify(x)
#if __GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ > 4) #if __GNUC__ > 2 || (__GNUC__ == 2 && __GNUC_MINOR__ > 4)
# define ATTR_UNUSED __attribute__((unused)) # define ATTR_UNUSED __attribute__((unused))
# define ATTR_NORETURN __attribute__((noreturn)) # define ATTR_NORETURN __attribute__((noreturn))
@ -137,6 +140,7 @@ typedef struct message {
#define OPEN_EXPUNGE (1<<5) #define OPEN_EXPUNGE (1<<5)
#define OPEN_SETFLAGS (1<<6) #define OPEN_SETFLAGS (1<<6)
#define OPEN_APPEND (1<<7) #define OPEN_APPEND (1<<7)
#define OPEN_FIND (1<<8)
typedef struct store { typedef struct store {
store_conf_t *conf; /* foreign */ store_conf_t *conf; /* foreign */
@ -146,7 +150,7 @@ typedef struct store {
char *path; /* own */ char *path; /* own */
message_t *msgs; /* own */ message_t *msgs; /* own */
int uidvalidity; int uidvalidity;
unsigned char opts; /* maybe preset? */ unsigned opts; /* maybe preset? */
/* note that the following do _not_ reflect stats from msgs, but mailbox totals */ /* note that the following do _not_ reflect stats from msgs, but mailbox totals */
int count; /* # of messages */ int count; /* # of messages */
int recent; /* # of recent messages - don't trust this beyond the initial read */ int recent; /* # of recent messages - don't trust this beyond the initial read */
@ -156,7 +160,6 @@ typedef struct {
char *data; char *data;
int len; int len;
unsigned char flags; unsigned char flags;
unsigned char crlf:1;
} msg_data_t; } msg_data_t;
#define DRV_OK 0 #define DRV_OK 0
@ -164,7 +167,12 @@ typedef struct {
#define DRV_BOX_BAD -2 #define DRV_BOX_BAD -2
#define DRV_STORE_BAD -3 #define DRV_STORE_BAD -3
#define DRV_CRLF 1
#define TUIDL 12
struct driver { struct driver {
int flags;
int (*parse_store)( conffile_t *cfg, store_conf_t **storep, int *err ); int (*parse_store)( conffile_t *cfg, store_conf_t **storep, int *err );
store_t *(*open_store)( store_conf_t *conf, store_t *oldctx ); store_t *(*open_store)( store_conf_t *conf, store_t *oldctx );
void (*close_store)( store_t *ctx ); void (*close_store)( store_t *ctx );
@ -174,6 +182,7 @@ struct driver {
int (*select)( store_t *ctx, int minuid, int maxuid, int *excs, int nexcs ); int (*select)( store_t *ctx, int minuid, int maxuid, int *excs, int nexcs );
int (*fetch_msg)( store_t *ctx, message_t *msg, msg_data_t *data ); int (*fetch_msg)( store_t *ctx, message_t *msg, msg_data_t *data );
int (*store_msg)( store_t *ctx, msg_data_t *data, int *uid ); /* if uid is null, store to trash */ int (*store_msg)( store_t *ctx, msg_data_t *data, int *uid ); /* if uid is null, store to trash */
int (*find_msg)( store_t *ctx, const char *tuid, int *uid );
int (*set_flags)( store_t *ctx, message_t *msg, int uid, int add, int del ); /* msg can be null, therefore uid as a fallback */ int (*set_flags)( store_t *ctx, message_t *msg, int uid, int add, int del ); /* msg can be null, therefore uid as a fallback */
int (*trash_msg)( store_t *ctx, message_t *msg ); /* This may expunge the original message immediately, but it needn't to */ int (*trash_msg)( store_t *ctx, message_t *msg ); /* This may expunge the original message immediately, but it needn't to */
int (*check)( store_t *ctx ); /* IMAP-style: flush */ int (*check)( store_t *ctx ); /* IMAP-style: flush */
@ -210,8 +219,6 @@ void free_string_list( string_list_t *list );
void free_generic_messages( message_t * ); void free_generic_messages( message_t * );
void strip_cr( msg_data_t *msgdata );
void *nfmalloc( size_t sz ); void *nfmalloc( size_t sz );
void *nfcalloc( size_t sz ); void *nfcalloc( size_t sz );
void *nfrealloc( void *mem, size_t sz ); void *nfrealloc( void *mem, size_t sz );
@ -233,6 +240,7 @@ unsigned char arc4_getbyte( void );
#define SYNC_OK 0 #define SYNC_OK 0
#define SYNC_FAIL 1 #define SYNC_FAIL 1
#define SYNC_BAD(ms) (2+(ms)) #define SYNC_BAD(ms) (2+(ms))
#define SYNC_NOGOOD 4 /* internal */
int sync_boxes( store_t *ctx[], const char *names[], channel_conf_t * ); int sync_boxes( store_t *ctx[], const char *names[], channel_conf_t * );

View File

@ -94,10 +94,11 @@ typedef struct sync_rec {
int uid[2]; int uid[2];
message_t *msg[2]; message_t *msg[2];
unsigned char status, flags, aflags[2], dflags[2]; unsigned char status, flags, aflags[2], dflags[2];
char tuid[TUIDL];
} sync_rec_t; } sync_rec_t;
static void static int
findmsgs( sync_rec_t *srecs, store_t *ctx[], int t ) findmsgs( sync_rec_t *srecs, store_t *ctx[], int t, FILE *jfp )
{ {
sync_rec_t *srec, *nsrec = 0; sync_rec_t *srec, *nsrec = 0;
message_t *msg; message_t *msg;
@ -105,6 +106,46 @@ findmsgs( sync_rec_t *srecs, store_t *ctx[], int t )
int uid; int uid;
char fbuf[16]; /* enlarge when support for keywords is added */ char fbuf[16]; /* enlarge when support for keywords is added */
if (jfp) {
/*
* Alternatively, the TUIDs could be fetched into the messages and
* looked up here. This would make the search faster (probably) and
* save roundtrips. On the downside, quite some additional data would
* have to be fetched for every message and the IMAP driver would be
* more complicated. This is a corner case anyway, so why bother.
*/
debug( "finding previously copied messages\n" );
for (srec = srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD)
continue;
if (srec->uid[t] == -2 && srec->tuid[0]) {
debug( " pair(%d,%d): lookup %s, TUID %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], str_ms[t], srec->tuid );
switch (ctx[t]->conf->driver->find_msg( ctx[t], srec->tuid, &uid )) {
case DRV_STORE_BAD: return SYNC_BAD(t);
case DRV_OK:
debug( " -> new UID %d\n", uid );
Fprintf( jfp, "%c %d %d %d\n", "<>"[t], srec->uid[M], srec->uid[S], uid );
srec->uid[t] = uid;
srec->tuid[0] = 0;
break;
default:
debug( " -> TUID lost\n" );
Fprintf( jfp, "& %d %d\n", srec->uid[M], srec->uid[S] );
srec->flags = 0;
srec->tuid[0] = 0;
break;
}
}
}
}
/*
* Mapping msg -> srec (this variant) is dog slow for new messages.
* Mapping srec -> msg is dog slow for deleted messages.
* One solution would be using binary search on an index array.
* msgs are already sorted by UID, srecs would have to be sorted by uid[t].
*/
debug( "matching messages against sync records\n" );
for (msg = ctx[t]->msgs; msg; msg = msg->next) { for (msg = ctx[t]->msgs; msg; msg = msg->next) {
uid = msg->uid; uid = msg->uid;
if (DFlags & DEBUG) { if (DFlags & DEBUG) {
@ -136,8 +177,122 @@ findmsgs( sync_rec_t *srecs, store_t *ctx[], int t )
nsrec = srec->next; nsrec = srec->next;
debug( "pairs %5d %s\n", srec->uid[1-t], diag ); debug( "pairs %5d %s\n", srec->uid[1-t], diag );
} }
return SYNC_OK;
} }
static int
copy_msg( store_t *ctx[], int t, message_t *tmsg, const char *tuid, int *uid )
{
msg_data_t msgdata;
char *fmap, *buf;
int i, len, extra, cra, crd, scr, tcr;
int start, sbreak = 0, ebreak = 0;
char c;
msgdata.flags = tmsg->flags;
switch (ctx[1-t]->conf->driver->fetch_msg( ctx[1-t], tmsg, &msgdata )) {
case DRV_STORE_BAD: return SYNC_BAD(1-t);
case DRV_BOX_BAD: return SYNC_FAIL;
case DRV_MSG_BAD: return SYNC_NOGOOD;
}
tmsg->flags = msgdata.flags;
scr = (ctx[1-t]->conf->driver->flags / DRV_CRLF) & 1;
tcr = (ctx[t]->conf->driver->flags / DRV_CRLF) & 1;
if (tuid || scr != tcr) {
fmap = msgdata.data;
len = msgdata.len;
cra = crd = 0;
if (scr > tcr)
crd = -1;
else if (scr < tcr)
crd = 1;
extra = 0, i = 0;
if (tuid) {
extra += 8 + TUIDL + 1 + tcr;
nloop:
start = i;
while (i < len) {
c = fmap[i++];
if (c == '\r')
extra += crd;
else if (c == '\n') {
extra += cra;
if (i - 2 + !scr == start) {
sbreak = ebreak = i - 2 + !scr; // precalc this!
goto oke;
}
if (!memcmp( fmap + start, "X-TUID: ", 8 )) {
extra -= (ebreak = i) - (sbreak = start);
goto oke;
}
goto nloop;
}
}
/* invalid message */
free( fmap );
return SYNC_NOGOOD;
}
oke:
if (cra || crd)
for (; i < len; i++) {
c = fmap[i++];
if (c == '\r')
extra += crd;
else if (c == '\n')
extra += cra;
}
msgdata.len = len + extra;
buf = msgdata.data = nfmalloc( msgdata.len );
i = 0;
if (tuid) {
if (cra) {
for (; i < sbreak; i++) {
if (fmap[i] == '\n')
*buf++ = '\r';
*buf++ = fmap[i];
}
} else if (crd) {
for (; i < sbreak; i++)
if (fmap[i] != '\r')
*buf++ = fmap[i];
} else {
memcpy( buf, fmap, sbreak );
buf += sbreak;
}
memcpy( buf, "X-TUID: ", 8 );
buf += 8;
memcpy( buf, tuid, TUIDL );
buf += TUIDL;
if (tcr)
*buf++ = '\r';
*buf++ = '\n';
i = ebreak;
}
if (cra) {
for (; i < len; i++) {
if (fmap[i] == '\n')
*buf++ = '\r';
*buf++ = fmap[i];
}
} else if (crd) {
for (; i < len; i++)
if (fmap[i] != '\r')
*buf++ = fmap[i];
} else
memcpy( buf, fmap + i, len - i );
free( fmap );
}
switch (ctx[t]->conf->driver->store_msg( ctx[t], &msgdata, uid )) {
case DRV_STORE_BAD: return SYNC_BAD(t);
case DRV_OK: return SYNC_OK;
default: return SYNC_FAIL;
}
}
/* cases: /* cases:
a) both non-null a) both non-null
@ -179,7 +334,7 @@ clean_strdup( const char *s )
return cs; return cs;
} }
#define JOURNAL_VERSION "1" #define JOURNAL_VERSION "2"
int int
sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan ) sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
@ -195,7 +350,6 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
int t1, t2, t3, t, uid, nmsgs; int t1, t2, t3, t, uid, nmsgs;
int lfd, ret, line, sline, todel, *mexcs, nmexcs, rmexcs; int lfd, ret, line, sline, todel, *mexcs, nmexcs, rmexcs;
unsigned char nflags, sflags, aflags, dflags; unsigned char nflags, sflags, aflags, dflags;
msg_data_t msgdata;
struct stat st; struct stat st;
struct flock lck; struct flock lck;
char fbuf[16]; /* enlarge when support for keywords is added */ char fbuf[16]; /* enlarge when support for keywords is added */
@ -301,6 +455,7 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
srec->flags = parse_flags( s ); srec->flags = parse_flags( s );
debug( " entry (%d,%d,%u,%s)\n", srec->uid[M], srec->uid[S], srec->flags, srec->status & S_EXPIRED ? "X" : "" ); debug( " entry (%d,%d,%u,%s)\n", srec->uid[M], srec->uid[S], srec->flags, srec->status & S_EXPIRED ? "X" : "" );
srec->msg[M] = srec->msg[S] = 0; srec->msg[M] = srec->msg[S] = 0;
srec->tuid[0] = 0;
srec->next = 0; srec->next = 0;
*srecadd = srec; *srecadd = srec;
srecadd = &srec->next; srecadd = &srec->next;
@ -339,9 +494,11 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
ret = SYNC_FAIL; ret = SYNC_FAIL;
goto bail; goto bail;
} }
if (buf[0] == '(' || buf[0] == ')' ? if (buf[0] == '#' ?
(t3 = 0, (sscanf( buf + 2, "%d %d %n", &t1, &t2, &t3 ) < 2) || !t3 || (t - t3 != TUIDL + 3)) :
buf[0] == '(' || buf[0] == ')' ?
(sscanf( buf + 2, "%d", &t1 ) != 1) : (sscanf( buf + 2, "%d", &t1 ) != 1) :
buf[0] == '-' || buf[0] == '|' || buf[0] == '/' || buf[0] == '\\' ? buf[0] == '+' || buf[0] == '&' || buf[0] == '-' || buf[0] == '|' || buf[0] == '/' || buf[0] == '\\' ?
(sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) : (sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) :
(sscanf( buf + 2, "%d %d %d", &t1, &t2, &t3 ) != 3)) (sscanf( buf + 2, "%d %d %d", &t1, &t2, &t3 ) != 3))
{ {
@ -361,10 +518,11 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
srec = nfmalloc( sizeof(*srec) ); srec = nfmalloc( sizeof(*srec) );
srec->uid[M] = t1; srec->uid[M] = t1;
srec->uid[S] = t2; srec->uid[S] = t2;
srec->flags = t3; debug( " new entry(%d,%d)\n", t1, t2 );
debug( " new entry(%d,%d,%u)\n", t1, t2, t3 );
srec->msg[M] = srec->msg[S] = 0; srec->msg[M] = srec->msg[S] = 0;
srec->status = 0; srec->status = 0;
srec->flags = 0;
srec->tuid[0] = 0;
srec->next = 0; srec->next = 0;
*srecadd = srec; *srecadd = srec;
srecadd = &srec->next; srecadd = &srec->next;
@ -386,13 +544,24 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
debug( "killed\n" ); debug( "killed\n" );
srec->status = S_DEAD; srec->status = S_DEAD;
break; break;
case '#':
debug( "TUID now %." stringify(TUIDL) "s\n", buf + t3 + 2 );
memcpy( srec->tuid, buf + t3 + 2, TUIDL );
break;
case '&':
debug( "TUID %." stringify(TUIDL) "s lost\n", srec->tuid );
srec->flags = 0;
srec->tuid[0] = 0;
break;
case '<': case '<':
debug( "master now %d\n", t3 ); debug( "master now %d\n", t3 );
srec->uid[M] = t3; srec->uid[M] = t3;
srec->tuid[0] = 0;
break; break;
case '>': case '>':
debug( "slave now %d\n", t3 ); debug( "slave now %d\n", t3 );
srec->uid[S] = t3; srec->uid[S] = t3;
srec->tuid[0] = 0;
break; break;
case '*': case '*':
debug( "flags now %d\n", t3 ); debug( "flags now %d\n", t3 );
@ -476,10 +645,17 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
if ((chan->ops[S] & (OP_NEW|OP_RENEW)) && chan->max_messages) if ((chan->ops[S] & (OP_NEW|OP_RENEW)) && chan->max_messages)
opts[S] |= OPEN_OLD|OPEN_NEW|OPEN_FLAGS; opts[S] |= OPEN_OLD|OPEN_NEW|OPEN_FLAGS;
if (line) if (line)
for (srec = recs; srec; srec = srec->next) for (srec = recs; srec; srec = srec->next) {
if (!(srec->status & S_DEAD) && ((mvBit(srec->status, S_EXPIRE, S_EXPIRED) ^ srec->status) & S_EXPIRED)) { if (srec->status & S_DEAD)
continue;
if ((mvBit(srec->status, S_EXPIRE, S_EXPIRED) ^ srec->status) & S_EXPIRED)
opts[S] |= OPEN_OLD|OPEN_FLAGS; opts[S] |= OPEN_OLD|OPEN_FLAGS;
break; if (srec->tuid[0]) {
if (srec->uid[M] == -2)
opts[M] |= OPEN_OLD|OPEN_FIND;
else if (srec->uid[S] == -2)
opts[S] |= OPEN_OLD|OPEN_FIND;
}
} }
driver[M]->prepare_opts( ctx[M], opts[M] ); driver[M]->prepare_opts( ctx[M], opts[M] );
driver[S]->prepare_opts( ctx[S], opts[S] ); driver[S]->prepare_opts( ctx[S], opts[S] );
@ -499,14 +675,12 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
case DRV_STORE_BAD: ret = SYNC_BAD(S); goto bail; case DRV_STORE_BAD: ret = SYNC_BAD(S); goto bail;
case DRV_BOX_BAD: ret = SYNC_FAIL; goto bail; case DRV_BOX_BAD: ret = SYNC_FAIL; goto bail;
} }
info( "%d messages, %d recent\n", ctx[S]->count, ctx[S]->recent );
findmsgs( recs, ctx, S );
if (suidval && suidval != ctx[S]->uidvalidity) { if (suidval && suidval != ctx[S]->uidvalidity) {
fprintf( stderr, "Error: UIDVALIDITY of slave changed\n" ); fprintf( stderr, "Error: UIDVALIDITY of slave changed\n" );
ret = SYNC_FAIL; ret = SYNC_FAIL;
goto bail; goto bail;
} }
info( "%d messages, %d recent\n", ctx[S]->count, ctx[S]->recent );
s = strrchr( dname, '/' ); s = strrchr( dname, '/' );
*s = 0; *s = 0;
@ -533,6 +707,9 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
if (!line) if (!line)
Fprintf( jfp, JOURNAL_VERSION "\n" ); Fprintf( jfp, JOURNAL_VERSION "\n" );
if ((ret = findmsgs( recs, ctx, S, line ? jfp : 0 )) != SYNC_OK)
goto finish;
mexcs = 0; mexcs = 0;
nmexcs = rmexcs = 0; nmexcs = rmexcs = 0;
minwuid = INT_MAX; minwuid = INT_MAX;
@ -607,14 +784,15 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
case DRV_STORE_BAD: ret = SYNC_BAD(M); goto finish; case DRV_STORE_BAD: ret = SYNC_BAD(M); goto finish;
case DRV_BOX_BAD: ret = SYNC_FAIL; goto finish; case DRV_BOX_BAD: ret = SYNC_FAIL; goto finish;
} }
info( "%d messages, %d recent\n", ctx[M]->count, ctx[M]->recent );
findmsgs( recs, ctx, M );
if (muidval && muidval != ctx[M]->uidvalidity) { if (muidval && muidval != ctx[M]->uidvalidity) {
fprintf( stderr, "Error: UIDVALIDITY of master changed\n" ); fprintf( stderr, "Error: UIDVALIDITY of master changed\n" );
ret = SYNC_FAIL; ret = SYNC_FAIL;
goto finish; goto finish;
} }
info( "%d messages, %d recent\n", ctx[M]->count, ctx[M]->recent );
if ((ret = findmsgs( recs, ctx, M, line ? jfp : 0 )) != SYNC_OK)
goto finish;
if (!muidval || !suidval) { if (!muidval || !suidval) {
muidval = ctx[M]->uidvalidity; muidval = ctx[M]->uidvalidity;
@ -628,66 +806,103 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
osrecadd = srecadd; osrecadd = srecadd;
for (t = 0; t < 2; t++) { for (t = 0; t < 2; t++) {
for (nmsgs = 0, tmsg = ctx[1-t]->msgs; tmsg; tmsg = tmsg->next) for (nmsgs = 0, tmsg = ctx[1-t]->msgs; tmsg; tmsg = tmsg->next)
if (tmsg->srec ? tmsg->srec->uid[t] < 0 && (chan->ops[t] & OP_RENEW) : (chan->ops[t] & OP_NEW)) { if (tmsg->srec ? tmsg->srec->uid[t] < 0 && (tmsg->srec->uid[t] == -1 ? (chan->ops[t] & OP_RENEW) : (chan->ops[t] & OP_NEW)) : (chan->ops[t] & OP_NEW)) {
debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] ); debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] );
if ((chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) if ((chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED))
debug( " not %sing - would be expunged anyway\n", str_hl[t] ); debug( " -> not %sing - would be expunged anyway\n", str_hl[t] );
else { else {
if (tmsg->srec) {
srec = tmsg->srec;
srec->status |= S_DONE;
debug( " -> pair(%d,%d) exists\n", srec->uid[M], srec->uid[S] );
} else {
srec = nfmalloc( sizeof(*srec) );
srec->next = 0;
*srecadd = srec;
srecadd = &srec->next;
srec->status = S_DONE;
srec->flags = 0;
srec->tuid[0] = 0;
srec->uid[1-t] = tmsg->uid;
srec->uid[t] = -2;
Fprintf( jfp, "+ %d %d\n", srec->uid[M], srec->uid[S] );
debug( " -> pair(%d,%d) created\n", srec->uid[M], srec->uid[S] );
}
if ((tmsg->flags & F_FLAGGED) || !chan->stores[t]->max_size || tmsg->size <= chan->stores[t]->max_size) { if ((tmsg->flags & F_FLAGGED) || !chan->stores[t]->max_size || tmsg->size <= chan->stores[t]->max_size) {
debug( " %sing it\n", str_hl[t] );
if (!nmsgs) if (!nmsgs)
info( t ? "Pulling new messages..." : "Pushing new messages..." ); info( t ? "Pulling new messages..." : "Pushing new messages..." );
else else
infoc( '.' ); infoc( '.' );
nmsgs++; nmsgs++;
msgdata.flags = tmsg->flags; if (tmsg->flags) {
switch (driver[1-t]->fetch_msg( ctx[1-t], tmsg, &msgdata )) { srec->flags = tmsg->flags;
case DRV_STORE_BAD: return SYNC_BAD(1-t); Fprintf( jfp, "* %d %d %u\n", srec->uid[M], srec->uid[S], srec->flags );
case DRV_BOX_BAD: return SYNC_FAIL; debug( " -> updated flags to %u\n", tmsg->flags );
case DRV_MSG_BAD: /* ok */ continue;
} }
tmsg->flags = msgdata.flags; for (t1 = 0; t1 < TUIDL; t1++) {
switch (driver[t]->store_msg( ctx[t], &msgdata, &uid )) { t2 = arc4_getbyte() & 0x3f;
case DRV_STORE_BAD: return SYNC_BAD(t); srec->tuid[t1] = t2 < 26 ? t2 + 'A' : t2 < 52 ? t2 + 'a' - 26 : t2 < 62 ? t2 + '0' - 52 : t2 == 62 ? '+' : '/';
default: return SYNC_FAIL; }
case DRV_OK: break; Fprintf( jfp, "# %d %d %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], srec->tuid );
debug( " -> %sing message, TUID %." stringify(TUIDL) "s\n", str_hl[t], srec->tuid );
switch ((ret = copy_msg( ctx, t, tmsg, srec->tuid, &uid ))) {
case SYNC_OK: break;
case SYNC_NOGOOD:
/* The error is either transient or the message is gone. */
debug( " -> killing (%d,%d)\n", srec->uid[M], srec->uid[S] );
srec->status = S_DEAD;
Fprintf( jfp, "- %d %d\n", srec->uid[M], srec->uid[S] );
continue;
default: goto finish;
} }
} else { } else {
if (tmsg->srec) { if (tmsg->srec) {
debug( " -> not %sing - still too big\n", str_hl[t] ); debug( " -> not %sing - still too big\n", str_hl[t] );
continue; continue;
} }
debug( " not %sing - too big\n", str_hl[t] ); debug( " -> not %sing - too big\n", str_hl[t] );
uid = -1; uid = -1;
} }
if (tmsg->srec) { if (srec->uid[t] != uid) {
srec = tmsg->srec; debug( " -> new UID %d\n", uid );
Fprintf( jfp, "%c %d %d %d\n", "<>"[t], srec->uid[M], srec->uid[S], uid ); Fprintf( jfp, "%c %d %d %d\n", "<>"[t], srec->uid[M], srec->uid[S], uid );
} else {
srec = nfmalloc( sizeof(*srec) );
srec->next = 0;
*srecadd = srec;
srecadd = &srec->next;
srec->uid[1-t] = tmsg->uid;
}
srec->uid[t] = uid; srec->uid[t] = uid;
srec->flags = tmsg->flags; srec->tuid[0] = 0;
srec->status = S_DONE; }
if (tmsg->srec) if (!tmsg->srec) {
Fprintf( jfp, "* %d %d %u\n", srec->uid[M], srec->uid[S], srec->flags );
else {
tmsg->srec = srec; tmsg->srec = srec;
if (maxuid[1-t] < tmsg->uid) { if (maxuid[1-t] < tmsg->uid) {
maxuid[1-t] = tmsg->uid; maxuid[1-t] = tmsg->uid;
Fprintf( jfp, "%c %d\n", ")("[t], tmsg->uid ); Fprintf( jfp, "%c %d\n", ")("[t], tmsg->uid );
} }
Fprintf( jfp, "+ %d %d %u\n", srec->uid[M], srec->uid[S], srec->flags );
} }
} }
} }
if (nmsgs) if (nmsgs)
info( " %d messages\n", nmsgs ); info( " %d messages\n", nmsgs );
} }
debug( "finding just copied messages\n" );
for (srec = recs; srec; srec = srec->next) {
if (srec->status & S_DEAD)
continue;
if (srec->tuid[0]) {
t = (srec->uid[M] == -2) ? M : S;
debug( " pair(%d,%d): lookup %s, TUID %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], str_ms[t], srec->tuid );
switch (driver[t]->find_msg( ctx[t], srec->tuid, &uid )) {
case DRV_STORE_BAD: ret = SYNC_BAD(t); goto finish;
case DRV_OK:
debug( " -> new UID %d\n", uid );
break;
default:
warn( "Warning: cannot find newly stored message %." stringify(TUIDL) "s on %s.\n", srec->tuid, str_ms[t] );
uid = 0;
break;
}
Fprintf( jfp, "%c %d %d %d\n", "<>"[t], srec->uid[M], srec->uid[S], uid );
srec->uid[t] = uid;
srec->tuid[0] = 0;
}
}
debug( "synchronizing old entries\n" ); debug( "synchronizing old entries\n" );
for (srec = recs; srec != *osrecadd; srec = srec->next) { for (srec = recs; srec != *osrecadd; srec = srec->next) {
@ -882,16 +1097,11 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
if (!tmsg->srec || tmsg->srec->uid[1-t] < 0) { if (!tmsg->srec || tmsg->srec->uid[1-t] < 0) {
if (!ctx[1-t]->conf->max_size || tmsg->size <= ctx[1-t]->conf->max_size) { if (!ctx[1-t]->conf->max_size || tmsg->size <= ctx[1-t]->conf->max_size) {
debug( " remote trashing message %d\n", tmsg->uid ); debug( " remote trashing message %d\n", tmsg->uid );
msgdata.flags = tmsg->flags; switch ((ret = copy_msg( ctx, 1 - t, tmsg, 0, 0 ))) {
switch (driver[t]->fetch_msg( ctx[t], tmsg, &msgdata )) { case SYNC_OK: break;
case DRV_OK: break; case SYNC_NOGOOD: ret = SYNC_FAIL; goto nexex;
case DRV_STORE_BAD: ret = SYNC_BAD(t); goto finish; case SYNC_FAIL: goto nexex;
default: ret = SYNC_FAIL; goto nexex; default: goto finish;
}
switch (driver[1-t]->store_msg( ctx[1-t], &msgdata, 0 )) {
case DRV_OK: break;
case DRV_STORE_BAD: ret = SYNC_BAD(1-t); goto finish;
default: ret = SYNC_FAIL; goto nexex;
} }
} else } else
debug( " not remote trashing message %d - too big\n", tmsg->uid ); debug( " not remote trashing message %d - too big\n", tmsg->uid );

View File

@ -145,20 +145,6 @@ free_generic_messages( message_t *msgs )
} }
} }
void
strip_cr( msg_data_t *msgdata )
{
int i, o;
if (msgdata->crlf) {
for (i = o = 0; i < msgdata->len; i++)
if (msgdata->data[i] != '\r')
msgdata->data[o++] = msgdata->data[i];
msgdata->len = o;
msgdata->crlf = 0;
}
}
#ifndef HAVE_VASPRINTF #ifndef HAVE_VASPRINTF
static int static int
vasprintf( char **strp, const char *fmt, va_list ap ) vasprintf( char **strp, const char *fmt, va_list ap )