#include "stdinc.h" #include "dat.h" #include "fns.h" #include "error.h" static void diskThread(void *a); enum { /* * disable measurement since it gets alignment faults on BG * and the guts used to be commented out. */ Timing = 0, /* flag */ QueueSize = 100, /* maximum block to queue */ }; struct Disk { VtLock *lk; int ref; int fd; Header h; VtRendez *flow; VtRendez *starve; VtRendez *flush; VtRendez *die; int nqueue; Block *cur; /* block to do on current scan */ Block *next; /* blocks to do next scan */ }; /* keep in sync with Part* enum in dat.h */ static char *partname[] = { [PartError] "error", [PartSuper] "super", [PartLabel] "label", [PartData] "data", [PartVenti] "venti", }; Disk * diskAlloc(int fd) { u8int buf[HeaderSize]; Header h; Disk *disk; if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){ vtSetError("short read: %r"); vtOSError(); return nil; } if(!headerUnpack(&h, buf)){ vtSetError("bad disk header"); return nil; } disk = vtMemAllocZ(sizeof(Disk)); disk->lk = vtLockAlloc(); disk->starve = vtRendezAlloc(disk->lk); disk->flow = vtRendezAlloc(disk->lk); disk->flush = vtRendezAlloc(disk->lk); disk->fd = fd; disk->h = h; disk->ref = 2; vtThread(diskThread, disk); return disk; } void diskFree(Disk *disk) { diskFlush(disk); /* kill slave */ vtLock(disk->lk); disk->die = vtRendezAlloc(disk->lk); vtWakeup(disk->starve); while(disk->ref > 1) vtSleep(disk->die); vtUnlock(disk->lk); vtRendezFree(disk->flow); vtRendezFree(disk->starve); vtRendezFree(disk->die); vtLockFree(disk->lk); close(disk->fd); vtMemFree(disk); } static u32int partStart(Disk *disk, int part) { switch(part){ default: assert(0); case PartSuper: return disk->h.super; case PartLabel: return disk->h.label; case PartData: return disk->h.data; } } static u32int partEnd(Disk *disk, int part) { switch(part){ default: assert(0); case PartSuper: return disk->h.super+1; case PartLabel: return disk->h.data; case PartData: return disk->h.end; } } int diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf) { ulong start, end; u64int offset; int n, nn; start = partStart(disk, part); end = partEnd(disk, part); if(addr >= end-start){ vtSetError(EBadAddr); return 0; } offset = ((u64int)(addr + start))*disk->h.blockSize; n = disk->h.blockSize; while(n > 0){ nn = pread(disk->fd, buf, n, offset); if(nn < 0){ vtOSError(); return 0; } if(nn == 0){ vtSetError("eof reading disk"); return 0; } n -= nn; offset += nn; buf += nn; } return 1; } int diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf) { ulong start, end; u64int offset; int n; start = partStart(disk, part); end = partEnd(disk, part); if(addr >= end - start){ vtSetError(EBadAddr); return 0; } offset = ((u64int)(addr + start))*disk->h.blockSize; n = pwrite(disk->fd, buf, disk->h.blockSize, offset); if(n < 0){ vtOSError(); return 0; } if(n < disk->h.blockSize) { vtSetError("short write"); return 0; } return 1; } static void diskQueue(Disk *disk, Block *b) { Block **bp, *bb; vtLock(disk->lk); while(disk->nqueue >= QueueSize) vtSleep(disk->flow); if(disk->cur == nil || b->addr > disk->cur->addr) bp = &disk->cur; else bp = &disk->next; for(bb=*bp; bb; bb=*bp){ if(b->addr < bb->addr) break; bp = &bb->ionext; } b->ionext = bb; *bp = b; if(disk->nqueue == 0) vtWakeup(disk->starve); disk->nqueue++; vtUnlock(disk->lk); } void diskRead(Disk *disk, Block *b) { assert(b->iostate == BioEmpty || b->iostate == BioLabel); blockSetIOState(b, BioReading); diskQueue(disk, b); } void diskWrite(Disk *disk, Block *b) { assert(b->nlock == 1); assert(b->iostate == BioDirty); blockSetIOState(b, BioWriting); diskQueue(disk, b); } void diskWriteAndWait(Disk *disk, Block *b) { int nlock; /* * If b->nlock > 1, the block is aliased within * a single thread. That thread is us. * DiskWrite does some funny stuff with VtLock * and blockPut that basically assumes b->nlock==1. * We humor diskWrite by temporarily setting * nlock to 1. This needs to be revisited. */ nlock = b->nlock; if(nlock > 1) b->nlock = 1; diskWrite(disk, b); while(b->iostate != BioClean) vtSleep(b->ioready); b->nlock = nlock; } int diskBlockSize(Disk *disk) { return disk->h.blockSize; /* immuttable */ } int diskFlush(Disk *disk) { Dir dir; vtLock(disk->lk); while(disk->nqueue > 0) vtSleep(disk->flush); vtUnlock(disk->lk); /* there really should be a cleaner interface to flush an fd */ nulldir(&dir); if(dirfwstat(disk->fd, &dir) < 0){ vtOSError(); return 0; } return 1; } u32int diskSize(Disk *disk, int part) { return partEnd(disk, part) - partStart(disk, part); } static uintptr mypc(int x) { return getcallerpc(&x); } static char * disk2file(Disk *disk) { static char buf[256]; if (fd2path(disk->fd, buf, sizeof buf) < 0) strncpy(buf, "GOK", sizeof buf); return buf; } static void diskThread(void *a) { Disk *disk = a; Block *b; uchar *buf, *p; double t; int nio; vtThreadSetName("disk"); //fprint(2, "diskThread %d\n", getpid()); buf = vtMemAlloc(disk->h.blockSize); vtLock(disk->lk); if (Timing) { nio = 0; t = -nsec(); } for(;;){ while(disk->nqueue == 0){ if (Timing) { t += nsec(); if(nio >= 10000){ fprint(2, "disk: io=%d at %.3fms\n", nio, t*1e-6/nio); nio = 0; t = 0; } } if(disk->die != nil) goto Done; vtSleep(disk->starve); if (Timing) t -= nsec(); } assert(disk->cur != nil || disk->next != nil); if(disk->cur == nil){ disk->cur = disk->next; disk->next = nil; } b = disk->cur; disk->cur = b->ionext; vtUnlock(disk->lk); /* * no one should hold onto blocking in the * reading or writing state, so this lock should * not cause deadlock. */ if(0)fprint(2, "fossil: diskThread: %d:%d %x\n", getpid(), b->part, b->addr); bwatchLock(b); vtLock(b->lk); b->pc = mypc(0); assert(b->nlock == 1); switch(b->iostate){ default: abort(); case BioReading: if(!diskReadRaw(disk, b->part, b->addr, b->data)){ fprint(2, "fossil: diskReadRaw failed: %s: " "score %V: part=%s block %ud: %r\n", disk2file(disk), b->score, partname[b->part], b->addr); blockSetIOState(b, BioReadError); }else blockSetIOState(b, BioClean); break; case BioWriting: p = blockRollback(b, buf); /* NB: ctime result ends with a newline */ if(!diskWriteRaw(disk, b->part, b->addr, p)){ fprint(2, "fossil: diskWriteRaw failed: %s: " "score %V: date %s part=%s block %ud: %r\n", disk2file(disk), b->score, ctime(time(0)), partname[b->part], b->addr); break; } if(p != buf) blockSetIOState(b, BioClean); else blockSetIOState(b, BioDirty); break; } blockPut(b); /* remove extra reference, unlock */ vtLock(disk->lk); disk->nqueue--; if(disk->nqueue == QueueSize-1) vtWakeup(disk->flow); if(disk->nqueue == 0) vtWakeup(disk->flush); if(Timing) nio++; } Done: //fprint(2, "diskThread done\n"); disk->ref--; vtWakeup(disk->die); vtUnlock(disk->lk); vtMemFree(buf); }