#include "stdinc.h" #include "dat.h" #include "fns.h" typedef struct ASum ASum; struct ASum { Arena *arena; ASum *next; }; static void sealarena(Arena *arena); static int okarena(Arena *arena); static int loadarena(Arena *arena); static CIBlock *getcib(Arena *arena, int clump, int writing, CIBlock *rock); static void putcib(Arena *arena, CIBlock *cib); static void sumproc(void *); static QLock sumlock; static Rendez sumwait; static ASum *sumq; static ASum *sumqtail; static uchar zero[8192]; int arenasumsleeptime; int initarenasum(void) { needzeroscore(); /* OS X */ sumwait.l = &sumlock; if(vtproc(sumproc, nil) < 0){ seterr(EOk, "can't start arena checksum slave: %r"); return -1; } return 0; } /* * make an Arena, and initialize it based upon the disk header and trailer. */ Arena* initarena(Part *part, u64int base, u64int size, u32int blocksize) { Arena *arena; arena = MKZ(Arena); arena->part = part; arena->blocksize = blocksize; arena->clumpmax = arena->blocksize / ClumpInfoSize; arena->base = base + blocksize; arena->size = size - 2 * blocksize; if(loadarena(arena) < 0){ seterr(ECorrupt, "arena header or trailer corrupted"); freearena(arena); return nil; } if(okarena(arena) < 0){ freearena(arena); return nil; } if(arena->diskstats.sealed && scorecmp(zeroscore, arena->score)==0) backsumarena(arena); return arena; } void freearena(Arena *arena) { if(arena == nil) return; free(arena); } Arena* newarena(Part *part, u32int vers, char *name, u64int base, u64int size, u32int blocksize) { int bsize; Arena *arena; if(nameok(name) < 0){ seterr(EOk, "illegal arena name", name); return nil; } arena = MKZ(Arena); arena->part = part; arena->version = vers; if(vers == ArenaVersion4) arena->clumpmagic = _ClumpMagic; else{ do arena->clumpmagic = fastrand(); while(arena->clumpmagic==_ClumpMagic || arena->clumpmagic==0); } arena->blocksize = blocksize; arena->clumpmax = arena->blocksize / ClumpInfoSize; arena->base = base + blocksize; arena->size = size - 2 * blocksize; namecp(arena->name, name); bsize = sizeof zero; if(bsize > arena->blocksize) bsize = arena->blocksize; if(wbarena(arena)<0 || wbarenahead(arena)<0 || writepart(arena->part, arena->base, zero, bsize)<0){ freearena(arena); return nil; } return arena; } int readclumpinfo(Arena *arena, int clump, ClumpInfo *ci) { CIBlock *cib, r; cib = getcib(arena, clump, 0, &r); if(cib == nil) return -1; unpackclumpinfo(ci, &cib->data->data[cib->offset]); putcib(arena, cib); return 0; } int readclumpinfos(Arena *arena, int clump, ClumpInfo *cis, int n) { CIBlock *cib, r; int i; for(i = 0; i < n; i++){ cib = getcib(arena, clump + i, 0, &r); if(cib == nil) break; unpackclumpinfo(&cis[i], &cib->data->data[cib->offset]); putcib(arena, cib); } return i; } /* * write directory information for one clump * must be called the arena locked */ int writeclumpinfo(Arena *arena, int clump, ClumpInfo *ci) { CIBlock *cib, r; cib = getcib(arena, clump, 1, &r); if(cib == nil) return -1; dirtydblock(cib->data, DirtyArenaCib); packclumpinfo(ci, &cib->data->data[cib->offset]); putcib(arena, cib); return 0; } u64int arenadirsize(Arena *arena, u32int clumps) { return ((clumps / arena->clumpmax) + 1) * arena->blocksize; } /* * read a clump of data * n is a hint of the size of the data, not including the header * make sure it won't run off the end, then return the number of bytes actually read */ u32int readarena(Arena *arena, u64int aa, u8int *buf, long n) { DBlock *b; u64int a; u32int blocksize, off, m; long nn; if(n == 0) return -1; qlock(&arena->lock); a = arena->size - arenadirsize(arena, arena->memstats.clumps); qunlock(&arena->lock); if(aa >= a){ seterr(EOk, "reading beyond arena clump storage: clumps=%d aa=%lld a=%lld -1 clumps=%lld\n", arena->memstats.clumps, aa, a, arena->size - arenadirsize(arena, arena->memstats.clumps - 1)); return -1; } if(aa + n > a) n = a - aa; blocksize = arena->blocksize; a = arena->base + aa; off = a & (blocksize - 1); a -= off; nn = 0; for(;;){ b = getdblock(arena->part, a, OREAD); if(b == nil) return -1; m = blocksize - off; if(m > n - nn) m = n - nn; memmove(&buf[nn], &b->data[off], m); putdblock(b); nn += m; if(nn == n) break; off = 0; a += blocksize; } return n; } /* * write some data to the clump section at a given offset * used to fix up corrupted arenas. */ u32int writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n) { DBlock *b; u64int a; u32int blocksize, off, m; long nn; int ok; if(n == 0) return -1; qlock(&arena->lock); a = arena->size - arenadirsize(arena, arena->memstats.clumps); if(aa >= a || aa + n > a){ qunlock(&arena->lock); seterr(EOk, "writing beyond arena clump storage"); return -1; } blocksize = arena->blocksize; a = arena->base + aa; off = a & (blocksize - 1); a -= off; nn = 0; for(;;){ b = getdblock(arena->part, a, off != 0 || off + n < blocksize ? ORDWR : OWRITE); if(b == nil){ qunlock(&arena->lock); return -1; } dirtydblock(b, DirtyArena); m = blocksize - off; if(m > n - nn) m = n - nn; memmove(&b->data[off], &clbuf[nn], m); ok = 0; putdblock(b); if(ok < 0){ qunlock(&arena->lock); return -1; } nn += m; if(nn == n) break; off = 0; a += blocksize; } qunlock(&arena->lock); return n; } /* * allocate space for the clump and write it, * updating the arena directory ZZZ question: should this distinguish between an arena filling up and real errors writing the clump? */ u64int writeaclump(Arena *arena, Clump *c, u8int *clbuf, u64int start, u64int *pa) { DBlock *b; u64int a, aa; u32int clump, n, nn, m, off, blocksize; int ok; AState as; n = c->info.size + ClumpSize + U32Size; qlock(&arena->lock); aa = arena->memstats.used; if(arena->memstats.sealed || aa + n + U32Size + arenadirsize(arena, arena->memstats.clumps + 1) > arena->size){ if(!arena->memstats.sealed){ logerr(EOk, "seal memstats %s", arena->name); arena->memstats.sealed = 1; as.arena = arena; as.aa = start+aa; as.stats = arena->memstats; setdcachestate(&as); } qunlock(&arena->lock); return TWID64; } if(packclump(c, &clbuf[0], arena->clumpmagic) < 0){ qunlock(&arena->lock); return TWID64; } /* * write the data out one block at a time */ blocksize = arena->blocksize; a = arena->base + aa; off = a & (blocksize - 1); a -= off; nn = 0; for(;;){ b = getdblock(arena->part, a, off != 0 ? ORDWR : OWRITE); if(b == nil){ qunlock(&arena->lock); return TWID64; } dirtydblock(b, DirtyArena); m = blocksize - off; if(m > n - nn) m = n - nn; memmove(&b->data[off], &clbuf[nn], m); ok = 0; putdblock(b); if(ok < 0){ qunlock(&arena->lock); return TWID64; } nn += m; if(nn == n) break; off = 0; a += blocksize; } arena->memstats.used += c->info.size + ClumpSize; arena->memstats.uncsize += c->info.uncsize; if(c->info.size < c->info.uncsize) arena->memstats.cclumps++; clump = arena->memstats.clumps++; if(arena->memstats.clumps == 0) sysfatal("clumps wrapped"); arena->wtime = now(); if(arena->ctime == 0) arena->ctime = arena->wtime; writeclumpinfo(arena, clump, &c->info); wbarena(arena); /* set up for call to setdcachestate */ as.arena = arena; as.aa = start+arena->memstats.used; as.stats = arena->memstats; /* update this before calling setdcachestate so it cannot be behind dcache.diskstate */ *pa = start+aa; setdcachestate(&as); qunlock(&arena->lock); return aa; } int atailcmp(ATailStats *a, ATailStats *b) { /* good test */ if(a->used < b->used) return -1; if(a->used > b->used) return 1; /* suspect tests - why order this way? (no one cares) */ if(a->clumps < b->clumps) return -1; if(a->clumps > b->clumps) return 1; if(a->cclumps < b->cclumps) return -1; if(a->cclumps > b->cclumps) return 1; if(a->uncsize < b->uncsize) return -1; if(a->uncsize > b->uncsize) return 1; if(a->sealed < b->sealed) return -1; if(a->sealed > b->sealed) return 1; /* everything matches */ return 0; } void setatailstate(AState *as) { int i, j, osealed; Arena *a; Index *ix; trace(0, "setatailstate %s 0x%llux clumps %d", as->arena->name, as->aa, as->stats.clumps); /* * Look up as->arena to find index. */ ix = mainindex; for(i=0; inarenas; i++) if(ix->arenas[i] == as->arena) break; if(i==ix->narenas || as->aa < ix->amap[i].start || as->aa >= ix->amap[i].stop || as->arena != ix->arenas[i]){ fprint(2, "funny settailstate 0x%llux\n", as->aa); return; } for(j=0; j<=i; j++){ a = ix->arenas[j]; if(atailcmp(&a->diskstats, &a->memstats) == 0) continue; qlock(&a->lock); osealed = a->diskstats.sealed; if(j == i) a->diskstats = as->stats; else a->diskstats = a->memstats; wbarena(a); if(a->diskstats.sealed != osealed && !a->inqueue) sealarena(a); qunlock(&a->lock); } } /* * once sealed, an arena never has any data added to it. * it should only be changed to fix errors. * this also syncs the clump directory. */ static void sealarena(Arena *arena) { arena->inqueue = 1; backsumarena(arena); } void backsumarena(Arena *arena) { ASum *as; if(sumwait.l == nil) return; as = MK(ASum); if(as == nil) return; qlock(&sumlock); as->arena = arena; as->next = nil; if(sumq) sumqtail->next = as; else sumq = as; sumqtail = as; rwakeup(&sumwait); qunlock(&sumlock); } static void sumproc(void *unused) { ASum *as; Arena *arena; USED(unused); for(;;){ qlock(&sumlock); while(sumq == nil) rsleep(&sumwait); as = sumq; sumq = as->next; qunlock(&sumlock); arena = as->arena; free(as); sumarena(arena); } } void sumarena(Arena *arena) { ZBlock *b; DigestState s; u64int a, e; u32int bs; int t; u8int score[VtScoreSize]; bs = MaxIoSize; if(bs < arena->blocksize) bs = arena->blocksize; /* * read & sum all blocks except the last one */ memset(&s, 0, sizeof s); b = alloczblock(bs, 0, arena->part->blocksize); e = arena->base + arena->size; for(a = arena->base - arena->blocksize; a + arena->blocksize <= e; a += bs){ disksched(); while((t=arenasumsleeptime) == SleepForever){ sleep(1000); disksched(); } sleep(t); if(a + bs > e) bs = arena->blocksize; if(readpart(arena->part, a, b->data, bs) < 0) goto ReadErr; addstat(StatSumRead, 1); addstat(StatSumReadBytes, bs); sha1(b->data, bs, nil, &s); } /* * the last one is special, since it may already have the checksum included */ bs = arena->blocksize; if(readpart(arena->part, e, b->data, bs) < 0){ ReadErr: logerr(EOk, "sumarena can't sum %s, read at %lld failed: %r", arena->name, a); freezblock(b); return; } addstat(StatSumRead, 1); addstat(StatSumReadBytes, bs); sha1(b->data, bs-VtScoreSize, nil, &s); sha1(zeroscore, VtScoreSize, nil, &s); sha1(nil, 0, score, &s); /* * check for no checksum or the same * * the writepart is okay because we flushed the dcache in sealarena */ if(scorecmp(score, &b->data[bs - VtScoreSize]) != 0){ if(scorecmp(zeroscore, &b->data[bs - VtScoreSize]) != 0) logerr(EOk, "overwriting mismatched checksums for arena=%s, found=%V calculated=%V", arena->name, &b->data[bs - VtScoreSize], score); scorecp(&b->data[bs - VtScoreSize], score); if(writepart(arena->part, e, b->data, bs) < 0) logerr(EOk, "sumarena can't write sum for %s: %r", arena->name); } freezblock(b); qlock(&arena->lock); scorecp(arena->score, score); qunlock(&arena->lock); } /* * write the arena trailer block to the partition */ int wbarena(Arena *arena) { DBlock *b; int bad; if((b = getdblock(arena->part, arena->base + arena->size, OWRITE)) == nil){ logerr(EAdmin, "can't write arena trailer: %r"); return -1; } dirtydblock(b, DirtyArenaTrailer); bad = okarena(arena)<0 || packarena(arena, b->data)<0; putdblock(b); if(bad) return -1; return 0; } int wbarenahead(Arena *arena) { ZBlock *b; ArenaHead head; int bad; namecp(head.name, arena->name); head.version = arena->version; head.size = arena->size + 2 * arena->blocksize; head.blocksize = arena->blocksize; head.clumpmagic = arena->clumpmagic; b = alloczblock(arena->blocksize, 1, arena->part->blocksize); if(b == nil){ logerr(EAdmin, "can't write arena header: %r"); /* ZZZ add error message? */ return -1; } /* * this writepart is okay because it only happens * during initialization. */ bad = packarenahead(&head, b->data)<0 || writepart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize)<0 || flushpart(arena->part)<0; freezblock(b); if(bad) return -1; return 0; } /* * read the arena header and trailer blocks from disk */ static int loadarena(Arena *arena) { ArenaHead head; ZBlock *b; b = alloczblock(arena->blocksize, 0, arena->part->blocksize); if(b == nil) return -1; if(readpart(arena->part, arena->base + arena->size, b->data, arena->blocksize) < 0){ freezblock(b); return -1; } if(unpackarena(arena, b->data) < 0){ freezblock(b); return -1; } if(arena->version != ArenaVersion4 && arena->version != ArenaVersion5){ seterr(EAdmin, "unknown arena version %d", arena->version); freezblock(b); return -1; } scorecp(arena->score, &b->data[arena->blocksize - VtScoreSize]); if(readpart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize) < 0){ logerr(EAdmin, "can't read arena header: %r"); freezblock(b); return 0; } if(unpackarenahead(&head, b->data) < 0) logerr(ECorrupt, "corrupted arena header: %r"); else if(namecmp(arena->name, head.name)!=0 || arena->clumpmagic != head.clumpmagic || arena->version != head.version || arena->blocksize != head.blocksize || arena->size + 2 * arena->blocksize != head.size){ if(namecmp(arena->name, head.name)!=0) logerr(ECorrupt, "arena tail name %s head %s", arena->name, head.name); else if(arena->clumpmagic != head.clumpmagic) logerr(ECorrupt, "arena tail clumpmagic 0x%lux head 0x%lux", (ulong)arena->clumpmagic, (ulong)head.clumpmagic); else if(arena->version != head.version) logerr(ECorrupt, "arena tail version %d head version %d", arena->version, head.version); else if(arena->blocksize != head.blocksize) logerr(ECorrupt, "arena tail block size %d head %d", arena->blocksize, head.blocksize); else if(arena->size+2*arena->blocksize != head.size) logerr(ECorrupt, "arena tail size %lud head %lud", (ulong)arena->size+2*arena->blocksize, head.size); else logerr(ECorrupt, "arena header inconsistent with arena data"); } freezblock(b); return 0; } static int okarena(Arena *arena) { u64int dsize; int ok; ok = 0; dsize = arenadirsize(arena, arena->diskstats.clumps); if(arena->diskstats.used + dsize > arena->size){ seterr(ECorrupt, "arena %s used > size", arena->name); ok = -1; } if(arena->diskstats.cclumps > arena->diskstats.clumps) logerr(ECorrupt, "arena %s has more compressed clumps than total clumps", arena->name); /* * This need not be true if some of the disk is corrupted. * if(arena->diskstats.uncsize + arena->diskstats.clumps * ClumpSize + arena->blocksize < arena->diskstats.used) logerr(ECorrupt, "arena %s uncompressed size inconsistent with used space %lld %d %lld", arena->name, arena->diskstats.uncsize, arena->diskstats.clumps, arena->diskstats.used); */ if(arena->ctime > arena->wtime) logerr(ECorrupt, "arena %s creation time after last write time", arena->name); return ok; } static CIBlock* getcib(Arena *arena, int clump, int writing, CIBlock *rock) { int mode; CIBlock *cib; u32int block, off; if(clump >= arena->memstats.clumps){ seterr(EOk, "clump directory access out of range"); return nil; } block = clump / arena->clumpmax; off = (clump - block * arena->clumpmax) * ClumpInfoSize; cib = rock; cib->block = block; cib->offset = off; if(writing){ if(off == 0 && clump == arena->memstats.clumps-1) mode = OWRITE; else mode = ORDWR; }else mode = OREAD; cib->data = getdblock(arena->part, arena->base + arena->size - (block + 1) * arena->blocksize, mode); if(cib->data == nil) return nil; return cib; } static void putcib(Arena *arena, CIBlock *cib) { USED(arena); putdblock(cib->data); cib->data = nil; }