Skip to content

Commit

Permalink
vere: fix epoc lifecycle error handling, enforce invariants (#602)
Browse files Browse the repository at this point in the history
This PR changes the chop command, removing log rollover so as to be
effective even when (nearly) out of disk space. It also includes
refactoring of the epoc lifecycle operations to centralize the
enforcement of their invariants, and improves error handling throughout.
  • Loading branch information
pkova authored Feb 8, 2024
2 parents dc5eb5e + 1294a60 commit 486c5c7
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 152 deletions.
215 changes: 145 additions & 70 deletions pkg/vere/disk.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,11 @@ u3_disk_walk_init(u3_disk* log_u,
eve_d,
c3_min(max_d, log_u->dun_d));

if ( c3n == wok_u->liv_o ) {
c3_free(wok_u);
return 0;
}

return wok_u;
}

Expand Down Expand Up @@ -1163,10 +1168,10 @@ u3_disk_epoc_zero(u3_disk* log_u)
return c3n;
}

/* u3_disk_epoc_roll: epoch rollover.
/* _disk_epoc_roll: epoch rollover.
*/
c3_o
u3_disk_epoc_roll(u3_disk* log_u, c3_d epo_d)
static c3_o
_disk_epoc_roll(u3_disk* log_u, c3_d epo_d)
{
u3_assert(epo_d);

Expand Down Expand Up @@ -1255,6 +1260,8 @@ u3_disk_epoc_roll(u3_disk* log_u, c3_d epo_d)

close(epo_i);

fprintf(stderr, "disk: created epoch %" PRIc3_d "\r\n", epo_d);

// load new epoch directory and set it in log_u
log_u->epo_d = epo_d;
log_u->ver_w = U3D_VERLAT;
Expand All @@ -1272,10 +1279,10 @@ u3_disk_epoc_roll(u3_disk* log_u, c3_d epo_d)
return c3n;
}

/* u3_disk_epoc_kill: delete an epoch.
/* _disk_epoc_kill: delete an epoch.
*/
c3_o
u3_disk_epoc_kill(u3_disk* log_u, c3_d epo_d)
static c3_o
_disk_epoc_kill(u3_disk* log_u, c3_d epo_d)
{
// get epoch directory
c3_c epo_c[8193];
Expand Down Expand Up @@ -1315,20 +1322,26 @@ u3_disk_epoc_last(u3_disk* log_u, c3_d* lat_d)
u3_dire* die_u = u3_foil_folder(log_u->com_u->pax_c);
u3_dent* den_u = die_u->dil_u;
c3_o ret_o = c3n;

*lat_d = 0;
c3_d epo_d = 0;
c3_d val_d;
c3_i car_i;

while ( den_u ) {
c3_d epo_d = 0;
if ( 1 == sscanf(den_u->nam_c, "0i%" PRIc3_d, &epo_d) ) {
if ( (1 == sscanf(den_u->nam_c, "0i%" SCNu64 "%n", &val_d, &car_i))
&& (0 < car_i)
&& ('\0' == *(den_u->nam_c + car_i)) )
{
ret_o = c3y; // NB: returns yes if the directory merely exists
*lat_d = c3_max(epo_d, *lat_d); // update the latest epoch number
epo_d = c3_max(epo_d, val_d);
}

den_u = den_u->nex_u;
}

u3_dire_free(die_u);

*lat_d = epo_d;

return ret_o;
}

Expand All @@ -1340,12 +1353,17 @@ u3_disk_epoc_list(u3_disk* log_u, c3_d* sot_d)
u3_dire* ned_u = u3_foil_folder(log_u->com_u->pax_c);
u3_dent* den_u = ned_u->dil_u;
c3_z len_z = 0;
c3_i car_i;
c3_d val_d;

while ( den_u ) { // count epochs
c3_d tmp_d;
if ( 1 == sscanf(den_u->nam_c, "0i%" PRIc3_d, &tmp_d) ) {
if ( (1 == sscanf(den_u->nam_c, "0i%" SCNu64 "%n", &val_d, &car_i))
&& (0 < car_i)
&& ('\0' == *(den_u->nam_c + car_i)) )
{
len_z++;
}

den_u = den_u->nex_u;
}

Expand All @@ -1358,9 +1376,13 @@ u3_disk_epoc_list(u3_disk* log_u, c3_d* sot_d)
den_u = ned_u->dil_u;

while ( den_u ) {
if ( 1 == sscanf(den_u->nam_c, "0i%" PRIc3_d, (sot_d + len_z)) ) {
len_z++;
if ( (1 == sscanf(den_u->nam_c, "0i%" SCNu64 "%n", &val_d, &car_i))
&& (0 < car_i)
&& ('\0' == *(den_u->nam_c + car_i)) )
{
sot_d[len_z++] = val_d;
}

den_u = den_u->nex_u;
}

Expand All @@ -1387,52 +1409,27 @@ static c3_o
_disk_migrate(u3_disk* log_u, c3_d eve_d)
{
/* migration steps:
* 0. detect whether we need to migrate or not
* a. if it's a fresh boot via u3_Host.ops_u.nuu -> skip migration
* b. if log/data.mdb is readable and is not v3 -> execute migration
* if not -> skip migration (returns yes)
* 1. set log/data.mdb to version 2 (migration in progress)
* 2. initialize epoch 0i0
* a. creates epoch directory
* b. creates epoch version file
* c. creates binary version file
* d. creates hard links to data.mdb and lock.mdb in 0i0/
* e. deletes backup snapshot
* 3. create hard links to data.mdb and lock.mdb in 0i0/
* 4. use scratch space to initialize new log/data.db in log/tmp
* 5. save old metadata to new db in scratch space
* 6. clobber old log/data.mdb with new log/tmp/data.mdb
* 7. open epoch lmdb and set it in log_u
* 1. initialize epoch 0i0 (see u3_disk_epoc_zero)
* 2. create hard links to data.mdb and lock.mdb in 0i0/
* 3. use scratch space to initialize new log/data.db in log/tmp
* 4. save old metadata to new db in scratch space
* 5. clobber old log/data.mdb with new log/tmp/data.mdb
* 6. open epoch lmdb and set it in log_u
*/

// NB: requires that log_u->mdb_u is initialized to log/data.mdb
// XX: put old log in separate pointer (old_u?)?

// get metadata from old log
c3_d who_d[2];
c3_o fak_o;
c3_w lif_w;
c3_d who_d[2];
c3_o fak_o;
c3_w lif_w;

if ( c3y != u3_disk_read_meta(log_u->mdb_u, 0, who_d, &fak_o, &lif_w) ) {
fprintf(stderr, "disk: failed to read metadata\r\n");
return c3n;
}

// get first/last event numbers from old log
c3_d fir_d, las_d;
if ( c3n == u3_lmdb_gulf(log_u->mdb_u, &fir_d, &las_d) ) {
fprintf(stderr, "disk: failed to get first/last event numbers\r\n");
return c3n;
}

// set version to 2 (migration in progress)
{
c3_w ver_w = U3D_VER2;
if ( c3n == _disk_save_meta(log_u->mdb_u, "version", 4, (c3_y*)&ver_w) ) {
fprintf(stderr, "disk: failed to set version to 2\r\n");
return c3n;
}
}

// finish with old log
u3_lmdb_exit(log_u->mdb_u);
log_u->mdb_u = 0;
Expand All @@ -1447,15 +1444,6 @@ _disk_migrate(u3_disk* log_u, c3_d eve_d)

fprintf(stderr, "disk: migrating disk to v%d format\r\n", U3D_VERLAT);

// ensure there's a current snapshot
if ( eve_d != las_d ) {
fprintf(stderr, "disk: snapshot is out of date, please "
"start/shutdown your pier gracefully first\r\n");
fprintf(stderr, "disk: eve_d (%" PRIc3_d ") != las_d (%" PRIc3_d ")\r\n",
eve_d, las_d);
return c3n;
}

// initialize first epoch "0i0"
if ( c3n == u3_disk_epoc_zero(log_u) ) {
fprintf(stderr, "disk: failed to initialize first epoch\r\n");
Expand Down Expand Up @@ -1551,8 +1539,6 @@ _disk_migrate(u3_disk* log_u, c3_d eve_d)
return c3n;
}

// XX: maybe rollover

// success
fprintf(stderr, "disk: migrated disk to v%d format\r\n", U3D_VERLAT);

Expand Down Expand Up @@ -1584,32 +1570,50 @@ _disk_vere_diff(u3_disk* log_u)
void
u3_disk_kindly(u3_disk* log_u, c3_d eve_d)
{
// ensure there's a current snapshot
//
if ( eve_d != log_u->dun_d ) {
fprintf(stderr, "disk: snapshot (event %" PRIc3_d ") is out of date\r\n"
" (latest event is %" PRIc3_d "\r\n",
eve_d, log_u->dun_d);
// XX need better instructions
//
fprintf(stderr, "start/shutdown your pier gracefully first\r\n");
exit(1);
}

switch ( log_u->ver_w ) {
case U3D_VER1:
case U3D_VER1: {
// set version to 2 (migration in progress)
c3_w ver_w = U3D_VER2;
if ( c3n == _disk_save_meta(log_u->mdb_u, "version", 4, (c3_y*)&ver_w) ) {
fprintf(stderr, "disk: failed to set version to 2\r\n");
exit(1);
}
} // fallthru

case U3D_VER2: {
if ( c3n == _disk_migrate(log_u, eve_d) ) {
fprintf(stderr, "disk: failed to migrate event log\r\n");
exit(1);
}

if ( c3n == u3_disk_epoc_roll(log_u, log_u->dun_d) ) {
if ( c3n == _disk_epoc_roll(log_u, eve_d) ) {
fprintf(stderr, "disk: failed to initialize epoch\r\n");
exit(1);
}
break;
}
} break;

case U3D_VER3: {
if ( (0 == log_u->epo_d) ||
(c3y == _disk_vere_diff(log_u)) )
{
if ( c3n == u3_disk_epoc_roll(log_u, log_u->dun_d) ) {
if ( c3n == _disk_epoc_roll(log_u, eve_d) ) {
fprintf(stderr, "disk: failed to initialize epoch\r\n");
exit(1);
}
}
break;
}
} break;

default: {
fprintf(stderr, "disk: unknown disk version: %d\r\n", log_u->ver_w);
Expand All @@ -1618,6 +1622,69 @@ u3_disk_kindly(u3_disk* log_u, c3_d eve_d)
}
}

/* u3_disk_chop(): delete all but the latest 2 epocs.
*/
void
u3_disk_chop(u3_disk* log_u, c3_d eve_d)
{
c3_z len_z = u3_disk_epoc_list(log_u, 0);
c3_d* sot_d = c3_malloc(len_z * sizeof(c3_d));
u3_disk_epoc_list(log_u, sot_d);

if ( len_z <= 2 ) {
fprintf(stderr, "chop: nothing to do, have a great day\r\n");
exit(0); // enjoy
}

// delete all but the last two epochs
//
// XX parameterize the number of epochs to chop
//
for ( c3_z i_z = 2; i_z < len_z; i_z++ ) {
fprintf(stderr, "chop: deleting epoch 0i%" PRIu64 "\r\n",
sot_d[i_z]);
if ( c3y != _disk_epoc_kill(log_u, sot_d[i_z]) ) {
fprintf(stderr, "chop: failed to delete epoch 0i%" PRIu64 "\r\n", sot_d[i_z]);
exit(1);
}
}

// cleanup
c3_free(sot_d);

// success
fprintf(stderr, "chop: event log truncation complete\r\n");
}

/* u3_disk_roll(): rollover to a new epoc.
*/
void
u3_disk_roll(u3_disk* log_u, c3_d eve_d)
{
// XX get fir_d from log_u
c3_d fir_d, las_d;

if ( c3n == u3_lmdb_gulf(log_u->mdb_u, &fir_d, &las_d) ) {
fprintf(stderr, "roll: failed to read first/last event numbers\r\n");
exit(1);
}

if ( fir_d == las_d ) {
fprintf(stderr, "roll: latest epoch is empty\r\n");
exit(0);
}

if ( (eve_d != las_d) || (eve_d != log_u->dun_d) ) {
fprintf(stderr, "roll: shenanigans!\r\n");
exit(1);
}

else if ( c3n == _disk_epoc_roll(log_u, eve_d) ) {
fprintf(stderr, "roll: failed to create new epoch\r\n");
exit(1);
}
}

typedef enum {
_epoc_good = 0, // load successfully
_epoc_gone = 1, // version missing, total failure
Expand All @@ -1636,6 +1703,8 @@ _disk_epoc_load(u3_disk* log_u, c3_d lat_d)
{
c3_c ver_c[8];
c3_w ver_w;
c3_i car_i;

if ( c3n == _disk_epoc_meta(log_u, lat_d, "epoc",
sizeof(ver_c) - 1, ver_c) )
{
Expand All @@ -1645,7 +1714,10 @@ _disk_epoc_load(u3_disk* log_u, c3_d lat_d)
return _epoc_gone;
}

if ( 1 != sscanf(ver_c, "%d", &ver_w) ) {
if ( (1 != sscanf(ver_c, "%" SCNu32 "%n", &ver_w, &car_i))
&& (0 < car_i)
&& ('\0' == *(ver_c + car_i)) )
{
fprintf(stderr, "disk: failed to parse epoch version: '%s'\r\n", ver_c);
return _epoc_fail;
}
Expand Down Expand Up @@ -1690,6 +1762,9 @@ _disk_epoc_load(u3_disk* log_u, c3_d lat_d)
}

// initialize dun_d/sen_d values
//
// XX save [fir_d] in struct, check on play
//
log_u->dun_d = ( 0 != las_d ) ? las_d : lat_d;
log_u->sen_d = log_u->dun_d;

Expand Down Expand Up @@ -1937,7 +2012,7 @@ u3_disk_init(c3_c* pax_c, u3_disk_cb cb_u)
"falling back to previous at 0i%" PRIc3_d "\r\n",
lat_d, sot_d[1]);

u3_disk_epoc_kill(log_u, lat_d);
_disk_epoc_kill(log_u, lat_d);

lat_d = sot_d[1];
try_o = c3y;
Expand Down
Loading

0 comments on commit 486c5c7

Please sign in to comment.