make the sync entry search in the journal replay wrap around at the end

of the list. the "always forward" assumption is violated in some cases.
This commit is contained in:
Oswald Buddenhagen 2006-02-02 10:44:19 +00:00
parent 5e01034aee
commit 8728dfdf21

View File

@ -309,8 +309,8 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
if ((jfp = fopen( jname, "r" ))) { if ((jfp = fopen( jname, "r" ))) {
if (!stat( nname, &st )) { if (!stat( nname, &st )) {
debug( "recovering journal ...\n" ); debug( "recovering journal ...\n" );
srec = 0;
line = 0; line = 0;
srec = recs;
while (fgets( buf, sizeof(buf), jfp )) { while (fgets( buf, sizeof(buf), jfp )) {
line++; line++;
if (!(t = strlen( buf )) || buf[t - 1] != '\n') { if (!(t = strlen( buf )) || buf[t - 1] != '\n') {
@ -319,80 +319,79 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
ret = SYNC_FAIL; ret = SYNC_FAIL;
goto bail; goto bail;
} }
if (buf[0] == '^') if (buf[0] == '(' || buf[0] == ')' ?
srec = recs; (sscanf( buf + 2, "%d", &t1 ) != 1) :
else { buf[0] == '-' || buf[0] == '|' ?
if (buf[0] == '(' || buf[0] == ')' ? (sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) :
(sscanf( buf + 2, "%d", &t1 ) != 1) : (sscanf( buf + 2, "%d %d %d", &t1, &t2, &t3 ) != 3))
buf[0] == '-' || buf[0] == '|' ? {
(sscanf( buf + 2, "%d %d", &t1, &t2 ) != 2) : fprintf( stderr, "Error: malformed journal entry at %s:%d\n", jname, line );
(sscanf( buf + 2, "%d %d %d", &t1, &t2, &t3 ) != 3)) fclose( jfp );
{ ret = SYNC_FAIL;
fprintf( stderr, "Error: malformed journal entry at %s:%d\n", jname, line ); goto bail;
fclose( jfp ); }
ret = SYNC_FAIL; if (buf[0] == '(')
goto bail; maxuid[M] = t1;
} else if (buf[0] == ')')
if (buf[0] == '(') maxuid[S] = t1;
maxuid[M] = t1; else if (buf[0] == '|') {
else if (buf[0] == ')') muidval = t1;
maxuid[S] = t1; suidval = t2;
else if (buf[0] == '|') { } else if (buf[0] == '+') {
muidval = t1; srec = nfmalloc( sizeof(*srec) );
suidval = t2; srec->uid[M] = t1;
} else if (buf[0] == '+') { srec->uid[S] = t2;
srec = nfmalloc( sizeof(*srec) ); srec->flags = t3;
srec->uid[M] = t1; debug( " new entry(%d,%d,%u)\n", t1, t2, t3 );
srec->uid[S] = t2; srec->msg[M] = srec->msg[S] = 0;
srec->status = 0;
srec->next = 0;
*srecadd = srec;
srecadd = &srec->next;
} else {
for (nsrec = srec; srec; srec = srec->next)
if (srec->uid[M] == t1 && srec->uid[S] == t2)
goto syncfnd;
for (srec = recs; srec != nsrec; srec = srec->next)
if (srec->uid[M] == t1 && srec->uid[S] == t2)
goto syncfnd;
fprintf( stderr, "Error: journal entry at %s:%d refers to non-existing sync state entry\n", jname, line );
fclose( jfp );
ret = SYNC_FAIL;
goto bail;
syncfnd:
debug( " entry(%d,%d,%u) ", srec->uid[M], srec->uid[S], srec->flags );
switch (buf[0]) {
case '-':
debug( "killed\n" );
srec->status = S_DEAD;
break;
case '<':
debug( "master now %d\n", t3 );
srec->uid[M] = t3;
break;
case '>':
debug( "slave now %d\n", t3 );
srec->uid[S] = t3;
break;
case '*':
debug( "flags now %d\n", t3 );
srec->flags = t3; srec->flags = t3;
debug( " new entry(%d,%d,%u)\n", t1, t2, t3 ); break;
srec->msg[M] = srec->msg[S] = 0; case '~':
srec->status = 0; debug( "expired now %d\n", t3 );
srec->next = 0; if (t3) {
*srecadd = srec; if (smaxxuid < t2)
srecadd = &srec->next; smaxxuid = t2;
} else { srec->status |= S_EXPIRED;
for (; srec; srec = srec->next) } else
if (srec->uid[M] == t1 && srec->uid[S] == t2) srec->status &= ~S_EXPIRED;
goto syncfnd; break;
fprintf( stderr, "Error: journal entry at %s:%d refers to non-existing sync state entry\n", jname, line ); default:
fprintf( stderr, "Error: unrecognized journal entry at %s:%d\n", jname, line );
fclose( jfp ); fclose( jfp );
ret = SYNC_FAIL; ret = SYNC_FAIL;
goto bail; goto bail;
syncfnd:
debug( " entry(%d,%d,%u) ", srec->uid[M], srec->uid[S], srec->flags );
switch (buf[0]) {
case '-':
debug( "killed\n" );
srec->status = S_DEAD;
break;
case '<':
debug( "master now %d\n", t3 );
srec->uid[M] = t3;
break;
case '>':
debug( "slave now %d\n", t3 );
srec->uid[S] = t3;
break;
case '*':
debug( "flags now %d\n", t3 );
srec->flags = t3;
break;
case '~':
debug( "expired now %d\n", t3 );
if (t3) {
if (smaxxuid < t2)
smaxxuid = t2;
srec->status |= S_EXPIRED;
} else
srec->status &= ~S_EXPIRED;
break;
default:
fprintf( stderr, "Error: unrecognized journal entry at %s:%d\n", jname, line );
fclose( jfp );
ret = SYNC_FAIL;
goto bail;
}
} }
} }
} }
@ -511,7 +510,6 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
minwuid = srec->uid[M]; minwuid = srec->uid[M];
} }
debug( " min non-orphaned master uid is %d\n", minwuid ); debug( " min non-orphaned master uid is %d\n", minwuid );
Fprintf( jfp, "^\n" ); /* if any S_EXP_S */
for (srec = recs; srec; srec = srec->next) { for (srec = recs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
continue; continue;
@ -650,7 +648,6 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
} }
debug( "synchronizing old entries\n" ); debug( "synchronizing old entries\n" );
Fprintf( jfp, "^\n" );
for (srec = recs; srec != *osrecadd; srec = srec->next) { for (srec = recs; srec != *osrecadd; srec = srec->next) {
if (srec->status & (S_DEAD|S_DONE)) if (srec->status & (S_DEAD|S_DONE))
continue; continue;
@ -778,7 +775,6 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
} }
} }
if (delt) { if (delt) {
Fprintf( jfp, "^\n" );
for (srec = recs; srec; srec = srec->next) { for (srec = recs; srec; srec = srec->next) {
if (srec->status & (S_DEAD|S_EXPIRED)) if (srec->status & (S_DEAD|S_EXPIRED))
continue; continue;
@ -868,7 +864,6 @@ sync_boxes( store_t *ctx[], const char *names[], channel_conf_t *chan )
debug( " min non-orphaned master uid is %d\n", minwuid ); debug( " min non-orphaned master uid is %d\n", minwuid );
} }
Fprintf( jfp, "^\n" );
for (srec = recs; srec; srec = srec->next) { for (srec = recs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
continue; continue;