# # Translation from styx to op for ofs. # Either use the Cache or Op to operate on remote files. # # semantics: # dir (+initial data) fetched/checked upon walk # directories fully read. # dir hierarchies leading to files/dirs are created (faked) on demand. # files read on demand (after data read on first get) # creates deferred until first write, but for "mkdirs". # writethrough: # data pushed for write at 0; and partially filled puts # delayed write: # data pushed with full puts. # # All files with open fids are alive in the cache. # Op's fds correspond to cached files. This means that a clunk is # likely to clunk an op fd. Read and Write modes would use different fds. # # A coherency lag of N ms can be set. In this case, # files checked out not more than Nms ago are considered # up to date. For directories, this is tricky, because we must # still read directories for which we got just metadata, and not data. # in particular, directories invented during a walk. # # Each close in a file being written signals an end-of-put (clunk) # in the server. However, this does not make any distinction between different # client processes updating the same file in the server (which is reasonable, because # in that case there would be races in any case). implement Stop; include "sys.m"; sys: Sys; fprint, sprint, pctl, millisec, DMDIR, write, nulldir, tokenize, fildes, QTDIR, FD, Dir, Qid: import sys; include "names.m"; names: Names; cleanname: import names; include "string.m"; str: String; splitstrl: import str; include "styx.m"; styx: Styx; unpackdir: import styx; include "styxservers.m"; Enotfound, Eexists: import Styxservers; include "op.m"; OSTAT, NOFD, MAXDATA, OCREATE, ODATA, OMORE, Tmsg, Rmsg: import Op; include "opmux.m"; opmux: Opmux; include "error.m"; err: Error; stderr, panic: import err; include "stop.m"; include "ofstree.m"; ofstree: Ofstree; Cfile: import ofstree; # To put a limit on the number of processes that we might spawn # mainly because of Tputs for huge files. Nprocs: con 30; Xop: adt { fun: ref fn(x: Xop); tag: int; f: ref Cfile; path: string; d: ref Dir; data: array of byte; n: int; o: big; rc: chan of ref Crep; }; cfsreqc: chan of (ref Creq, chan of ref Crep); xcfsreqc: chan of Xop; xcfsendc: chan of int; donec: chan of ref Cfile; coherencylag := 0; maxwaiting := 0; maxheld := 0; init(s: Styx, m: Opmux, dir: string, lag: int): string { sys = load Sys Sys->PATH; err = load Error Error->PATH; err->init(); names = load Names Names->PATH; str = load String String->PATH; opmux = m; ofstree = load Ofstree Ofstree->PATH; if (sys == nil || names == nil || str == nil || opmux == nil || ofstree == nil) return "load failed in cache"; styx = s; coherencylag = lag; cfsreqc = chan of (ref Creq, chan of ref Crep); xcfsreqc = chan[5] of Xop; xcfsendc= chan of int; donec = chan[5] of ref Cfile; e := ofstree->init(sys, str, styx, err, names, dir); ofstree->debug = debug; if (e != nil) return e; spawn cfsproc(); spawn xcfsproc(); return nil; } term() { cfsreqc <-= (nil, nil); } Creq.text(r: self ref Creq): string { s := sprint("t%d q%bx", r.tag, r.qid); pick rr := r { Dump => s += " dump"; Remove => s += " remove"; Stat => s += " stat"; Sync => s += " sync"; Validate => s += " validate [" + rr.path + "]"; Walk1 => s += " walk1 [" + rr.name + "]"; Readdir => s += sprint(" readdir c%d o%bd", rr.cnt, rr.off); Pread => s += sprint(" pread c%d o%bd", rr.cnt, rr.off); Pwrite => s += sprint(" pwrite [%3.3s...] c%d o%bd", string rr.data, len rr.data, rr.off); Wstat => s += " wstat"; Create => s += " create [" + rr.d.name + "]"; Flush => s += sprint(" flush %d", rr.oldtag); } return s; } Crep.text(r: self ref Crep): string { s := ""; pick rr := r { Remove => s += " remove"; Stat => s += " stat"; if (rr.d != nil) s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers); Sync => s += " sync"; Validate => s += " validate"; if (rr.d != nil) s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers); Walk1 => s += " walk1"; if (rr.d != nil) s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers); Readdir => s += sprint(" readdir n%d", len rr.sons); Pread => s += sprint(" pread [%3.3s...] c%d", string rr.data, len rr.data); Pwrite => s += sprint(" pwrite c%d", rr.count); Wstat => s += " wstat"; if (rr.d != nil) s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers); Create => s += " create"; if (rr.d != nil) s += sprint(" [%s] q%bx:%d", rr.d.name, rr.d.qid.path, rr.d.qid.vers); Flush => s += sprint(" flush"); } if (r.err != nil) s += " errstr=" + r.err; return s; } crepcs: list of chan of ref Crep; getcrepc(): chan of ref Crep { if (crepcs == nil) return chan of ref Crep; else { c := hd crepcs; crepcs = tl crepcs; return c; } } putcrepc(c: chan of ref Crep) { crepcs = c::crepcs; } cacherpc(r: ref Creq) : ref Crep { c := getcrepc(); if (debug) fprint(stderr, " %d\tt%d %s\n", millisec(), r.tag, rep.text()); return rep; } dump() { cacherpc(ref Creq.Dump(0, big 0)); } validate(tag: int, qid: big, name: string): (ref Dir, string) { r := cacherpc(ref Creq.Validate(tag, qid, name)); pick rr := r { Validate => return (rr.d, rr.err); } panic("validatebug"); return (nil, nil); } create(tag: int, qid: big, d: ref Sys->Dir): (ref Sys->Dir, string) { r := cacherpc(ref Creq.Create(tag, qid, ref *d)); if (r.err == "invalid"){ cacherpc(ref Creq.Validate(tag, qid, "")); r = cacherpc(ref Creq.Create(tag, qid, ref *d)); } pick rr := r { Create => return (rr.d, rr.err); } panic("createbug"); return (nil, nil); } remove(tag: int, qid: big) : string { r := cacherpc(ref Creq.Remove(tag, qid)); pick rr := r { Remove => return rr.err; } panic("removebug"); return nil; } walk1(tag: int, qid: big, elem: string): (ref Sys->Dir, string) { r := cacherpc(ref Creq.Walk1(tag, qid, elem)); if (r.err == "invalid"){ cacherpc(ref Creq.Validate(tag, qid, elem)); r = cacherpc(ref Creq.Walk1(tag, qid, elem)); } pick rr := r { Walk1 => return (rr.d, rr.err); } panic("walk1bug"); return (nil, nil); } readdir(tag: int, qid: big, cnt: int, off: int): (list of Sys->Dir, string) { r := cacherpc(ref Creq.Readdir(tag, qid, cnt, big off)); if (r.err == "invalid"){ cacherpc(ref Creq.Validate(tag, qid, "")); r = cacherpc(ref Creq.Readdir(tag, qid, cnt, big off)); } pick rr := r { Readdir => return (rr.sons, rr.err); } panic("readdirbug"); return (nil, nil); } pread(tag: int, qid: big, cnt: int, off: big): (array of byte, string) { r := cacherpc(ref Creq.Pread(tag, qid, cnt, off)); if (r.err == "invalid"){ cacherpc(ref Creq.Validate(tag, qid, "")); r = cacherpc(ref Creq.Pread(tag, qid, cnt, off)); } pick rr := r { Pread => return (rr.data, rr.err); } panic("preadbug"); return (nil, nil); } pwrite(tag: int, qid: big, data: array of byte, off: big) : (int, string) { r := cacherpc(ref Creq.Pwrite(tag, qid, data, off)); if (r.err == "invalid"){ cacherpc(ref Creq.Validate(tag, qid, "")); r = cacherpc(ref Creq.Pwrite(tag, qid, data, off)); } pick rr := r { Pwrite => return (rr.count, rr.err); } panic("pwritebug"); return (0, nil); } stat(tag: int, qid: big): (ref Sys->Dir, string) { r := cacherpc(ref Creq.Stat(tag, qid)); if (r.err == "invalid"){ cacherpc(ref Creq.Validate(tag, qid, "")); r = cacherpc(ref Creq.Stat(tag, qid)); } pick rr := r { Stat => return (rr.d, rr.err); } panic("statbug"); return (nil, nil); } wstat(tag: int, qid: big, d: ref Sys->Dir): (ref Sys->Dir, string) { r := cacherpc(ref Creq.Wstat(tag, qid, ref *d)); if (r.err == "invalid"){ cacherpc(ref Creq.Validate(tag, qid, "")); r = cacherpc(ref Creq.Wstat(tag, qid, ref *d)); } pick rr := r { Wstat => return (rr.d, rr.err); } panic("wstatbug"); return (nil, nil); } flush(tag: int, old: int): string { r := cacherpc(ref Creq.Flush(tag, big 0, old)); pick rr := r { Sync => return rr.err; } panic("flushbug"); return nil; } sync(tag: int, qid: big): string { r := cacherpc(ref Creq.Sync(tag, qid)); pick rr := r { Sync => return rr.err; } panic("syncbug"); return nil; } opfname(d, n: string) : string { path: string; if (n == "" || n == "/") path = d; else if (d == "/") path = n; else path = d + "/" + n; path = cleanname(path); return path; } Wlist: type list of (ref Cfile, ref Creq, chan of ref Crep); waitingop(l: Wlist, rf: ref Cfile): (Wlist, ref Creq, chan of ref Crep) { rr: ref Creq; rrc: chan of ref Crep; rl : Wlist; for(; l != nil; l = tl l){ (f, r, rc) := hd l; if (f == rf && rr == nil) (rr, rrc) = (r, rc); else rl = (f, r, rc) :: rl; } return (rl, rr, rrc); } dumpwaiting(l: Wlist) { fprint(stderr, "\tcache: waiting:\n"); for(; l != nil; l = tl l){ (f, r, nil) := hd l; fprint(stderr, "\t\t%s: %s\n", f.d.name, r.text()); } } # Main cache mux # Requests are either being processed here, or spawned in auxiliary # processes speaking op (and going through the opmux), # or waiting for other requests to clear a file from being busy cfsproc() { waiting: Wlist; rootd := ref nulldir; rootd.name = "/"; rootd.uid = rootd.gid = rootd.muid = "sys"; rootd.qid = Qid(big 0, 0, QTDIR); root := Cfile.create(nil, rootd); if (root == nil) panic("cache: can't create root"); if (debug) fprint(stderr, "cache: new: %s\n", root.text()); readyfile: ref Cfile; taggen := 0; Loop: for (;;){ r: ref Creq; rc: chan of ref Crep; r = nil; rc = nil; # Attend first pending ops on a file that was busy. if (len waiting > maxwaiting) maxwaiting = len waiting; if (readyfile != nil){ (waiting, r, rc) = waitingop(waiting, readyfile); if (r != nil && debug) fprint(stderr, "\tcache: readyop: %s\n", r.text()); } if (debug) dumpwaiting(waiting); if (r == nil){ readyfile = nil; alt { (r, rc) = <-cfsreqc => ; f := <-donec => f.busy = 0; readyfile = f; if (debug) fprint(stderr, "\tcache: %s ready\n", f.d.name); continue Loop; } } if (r == nil){ if (debug) fprint(stderr, "mcache: stop\n"); break; } f := Cfile.find(r.qid); if (f != nil && f.d == nil) panic(sprint("cache: nil dir for file 16r%bx", r.qid)); pick rr := r { Flush => xcfsreqc <-= Xop(xopflush, taggen++,nil,nil,nil,nil,rr.oldtag,big 0,rc); Validate => if (f == nil){ rc <-= ref Crep.Validate("invalid", nil); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } # 1. Create a fake dir hierarchy for all but the last component. (nels, els) := tokenize(rr.path, "/"); wf := f; for (i := 0 ; i < nels -1; i++){ # find or create a fake entry (wf, nil) = wf.walkorcreate(hd els, nil); if (wf != nil && wf.busy){ if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (wf, r, rc) :: waiting; continue Loop; } els = tl els; } f = wf; # 2. try to validate the last component. # els could be nil (e.g., for "/"), or for a pure clone sf: ref Cfile; if (els == nil) sf = f; else { if (f == nil) panic("mcache validate bug"); sf = f.walk(hd els); } # sf is either nil, or points to the file (and maybe sf == f) if (sf != nil && sf.busy){ if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (sf, r, rc) :: waiting; continue; } now := millisec(); # This optimization does not seem to work in some cases if (0) if (sf == nil && f != sf && f.dirreaded && now - f.time < coherencylag){ rc <-= ref Crep.Validate("file does not exist", nil); continue; } if (sf != nil && now - sf.time < coherencylag){ rc <-= ref Crep.Validate(nil, ref *sf.d); continue; } # don't know. Ask the caller for help. Put the file on hold. isnew := 0; if (sf == nil){ # BUG: This is a problem. The cache should create the file # only when necessary to keep cached data. Otherwise, we # create the file here as a directory and it might later be a file, # Things are fixed when the cache finds out the problem, but # it would be better to make it right from the start. (sf, isnew) = f.walkorcreate(hd els, nil); } if (sf.busy){ if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (sf, r, rc) :: waiting; continue; } sf.busy = 1; path := sf.getpath(); xcfsreqc <-= Xop(xgetandvalidate, taggen++, sf, path, nil, nil, isnew, big 0, rc); Create => if (f == nil){ rc <-= ref Crep.Create("invalid", nil); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } (f,nil) = f.walkorcreate(rr.d.name, rr.d); if (f.busy){ if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } f.data = array[0] of byte; f.created = f.dirtyd = 1; if (f.d.qid.qtype&QTDIR){ f.busy = 1; path := f.getpath(); xcfsreqc <-= Xop(xputandcreate, taggen++, f, path, rr.d, nil, 0, big 0, rc); } else rc <-= ref Crep.Create(nil, ref *f.d); Remove => if (f == nil){ rc <-= ref Crep.Remove("invalid"); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } f.busy = 1; path := f.getpath(); xcfsreqc <-= Xop(xdelandremove, taggen++, f, path, nil, nil, 0, big 0, rc); Walk1 => if (f == nil){ rc <-= ref Crep.Walk1("invalid", nil); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } f = f.walk(rr.name); if (f != nil && f.busy){ if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } if (f == nil){ # if this error ever happens, try to (re)validate this file rc <-= ref Crep.Walk1("invalid", nil); } else rc <-= ref Crep.Walk1(nil, ref *f.d); Readdir => if (f == nil){ rc <-= ref Crep.Readdir("invalid", nil); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } rc <-= ref Crep.Readdir(nil, f.children(rr.cnt, int rr.off)); Pread => if (f == nil){ rc <-= ref Crep.Pread("invalid", nil); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } # we cache prefixes only, and don't believe that files # reporting zero length are indeed empty. if (rr.off >= f.d.length && f.d.length > big 0) rc <-= ref Crep.Pread(nil, array[0] of byte); else if (rr.off < big len f.data){ n := int rr.off + rr.cnt; if (n > len f.data) n = len f.data; rc <-= ref Crep.Pread(nil, f.data[int rr.off:n]); } else if (!(f.d.qid.qtype&QTDIR) && (cdata := f.pread(rr.cnt, rr.off)) != nil) rc <-= ref Crep.Pread(nil, cdata); else { path := f.getpath(); f.busy = 1; xcfsreqc <-= Xop(xgetandread, taggen++, f, path, nil, nil, rr.cnt, rr.off, rc); } Pwrite => # This is write-through (perhaps asynchronously) if (f == nil){ rc <-= ref Crep.Pwrite("invalid", -1); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } if (rr.off <= big len f.data){ if (int rr.off + len rr.data > MAXCACHED){ f.data = f.data[0:int rr.off]; } else { off := int rr.off; ndata := array[off + len rr.data] of byte; ndata[0:] = f.data[0:off]; ndata[off:] = rr.data; f.data = ndata; if (f.d.length < big len f.data) f.d.length = big len f.data; } } f.pwrite(rr.data, rr.off); path := f.getpath(); f.busy = 1; # keep this condition in sync with async in putandwrite() l := len f.d.name; if ((rr.off != big 0 && (l > 4 && (f.d.name[l-4:l] == ".dis" || f.d.name[l-4:l] == ".sbl"))) || (len rr.data == 8192 && !f.created && rr.off != big 0)) rc <-= ref Crep.Pwrite(nil, len rr.data); xcfsreqc <-= Xop(xputandwrite, taggen++, f, path, nil, rr.data, 0, rr.off, rc); Stat => if (f == nil){ rc <-= ref Crep.Stat("invalid", nil); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } rc <-= ref Crep.Stat(nil, ref *f.d); Wstat => if (f == nil){ rc <-= ref Crep.Wstat("invalid", nil); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } truncate := (f.d.length != big 0 && rr.d.length == big 0); if (truncate){ f.data = array[0] of byte; f.created = 1; # to truncate it } f.dirtyd = 1; f.busy = 1; path := f.getpath(); xcfsreqc <-= Xop(xputandwstat, taggen++, f, path, rr.d, nil, 0, big 0, rc); Sync => if (f == nil){ rc <-= ref Crep.Sync("invalid"); continue; } else if (f.busy) { if (debug) fprint(stderr, "\tcache: busy\n"); waiting = (f, r, rc) :: waiting; continue; } path := f.getpath(); f.busy = 1; xcfsreqc <-= Xop(xputandsync, taggen++, f, path, nil, nil, 0, big 0, rc); Dump => root.dump(0, "cfs tab:\n"); fprint(stderr, "cache: %d max busy files, %d max held ops\n", maxwaiting, maxheld); rc <-= ref Crep.Dump(nil); } } } # BUG: cache xfuncs and put a reasonably high limit to the number of processes. xcfsproc() { pending: list of Xop; procs: list of chan of Xop; nprocs := 0; idlec := chan of chan of Xop; for(;;){ alt { x := <-xcfsreqc => if (x.fun== nil) exit; xc: chan of Xop; if (procs != nil){ xc = hd procs; procs = tl procs; } else if (nprocs < Nprocs) { nprocs++; xc = chan of Xop; spawn xproc(xc, idlec); } else xc = nil; if (xc != nil) xc <-= x; else { pending = x :: pending; if (len pending > maxheld) maxheld = len pending; if (debug) fprint(stderr, "o/ofs: %d processes, op held\n", nprocs); } xc := <-idlec => if (pending != nil){ x := hd pending; pending = tl pending; if (debug) fprint(stderr, "o/ofs: held op released\n"); xc <-= x; } else procs = xc::procs; } } } xproc(xc: chan of Xop, idlec: chan of chan of Xop) { for(;;){ x := <-xc; x.fun(x); idlec <-= xc; } } xopflush(nil: Xop) { # opflush(tag, n, rc); BUG: pretend it's flushed. } opflush(tag, oldtag: int, rc: chan of ref Crep) { repc := opmux->rpc(ref Tmsg.Flush(2*tag, 2*oldtag)); <-repc; rc <-= ref Crep.Flush(nil); } # BUG: Doing a ls -ld on a file that has NOT read permission will make Tget fail # because we ask for data as well, and oxport would report a permission denied while trying to # open the file for reading. This must be changed. If OSTAT|ODATA is asked to oxport, # and only stat could be retrieved, only stat should be reported back, with just OSTAT in the # reply. Of course, this means that there are problems reading file data, and the client may issue # a Tget(ODATA) should it want to learn the cause (or probably assume just a permission denied). get(tag: int, path: string, fd: int, mode: int, cnt: int, off: big): (int, ref Dir, array of byte, string) { data := array[0] of byte; d : ref Dir; nm := 1; if (mode&ODATA){ while(nm < MAXNMSGS && off + big (Op->MAXDATA*nm) < big MAXCACHED) nm++; if (nm > 1 || off < big MAXCACHED) cnt = MAXDATA; } req := ref Tmsg.Get(tag, path, fd, mode, nm, off, cnt); repc := opmux->rpc(req); nmsgs := nm; d = nil; for (;;){ rep := <- repc; pick r := rep { Error => return (Op->NOFD, nil, nil, r.ename); Get => if ((r.mode&OSTAT) != 0){ d = ref r.stat; if ((d.qid.qtype&QTDIR) != 0) nmsgs = 0; } if ((r.mode &ODATA) != 0){ ndata := array[len data + len r.data] of byte; ndata[0:] = data; ndata[len data:] = r.data; data = ndata; } if (--nmsgs == 0 || (r.mode&OMORE) == 0) # last message return(r.fd, d, data, nil); * => panic("get0bug"); } } panic("get0bug"); return (Op->NOFD, nil, nil, "bug"); } xgetandvalidate(x: Xop) { getandvalidate(x.tag, x.f, x.path, x.n, x.rc); } getandvalidate(tag: int, f: ref Cfile, path: string, isnew: int, rc: chan of ref Crep) { (fd, d, data, e) := get(tag, path, f.oprdfd, OSTAT|ODATA|OMORE, MAXDATA, big 0); f.oprdfd = fd; if (e != nil || d == nil){ f.remove(); rc <-= ref Crep.Validate(e, nil); } else { if (isnew) f.serverqid = d.qid; else { if (f.serverqid.path != d.qid.path || f.serverqid.vers != d.qid.vers) if (!f.created) f.data = array[0] of byte; f.serverqid = d.qid; } f.wstat(d); if (d.qid.qtype&QTDIR){ f.dirreaded = 1; f.updatedirdata(data); } else { f.pwrite(data, big 0); if (!f.dirtyd && !f.created) f.data = data; } f.d.qid.vers = d.qid.vers; f.time = millisec(); rc <-= ref Crep.Validate(nil, ref *f.d); } donec <-= f; } xgetandread(x: Xop) { getandread(x.tag, x.f, x.path, x.n, x.o, x.rc); } getandread(tag: int, f: ref Cfile, path: string, cnt: int, off: big, rc: chan of ref Crep) { (fd, nil, data, e) := get(tag, path, f.oprdfd, ODATA|OMORE, cnt, off); f.oprdfd = fd; if (e != nil){ # fsremove(fstab, f.d.qid.path); should? rc <-= ref Crep.Pread(e, nil); } else { if (f.d.length != big 0){ # files with zero-length are probably streams. f.pwrite(data, off); # do not cache. odata := f.data; o := int off; if (o <= len odata){ if (o + len data > MAXCACHED) f.data = odata[0:o]; else { ndata := array[o + len data] of byte; ndata[0:] = odata[0:o]; ndata[o:] = data; f.data = ndata; if (f.d.length < big len f.data) f.d.length = big len f.data; } } } f.time = millisec(); rc <-= ref Crep.Pread(nil, data); } donec <-= f; } # BUG: Too many args; put does it all. put(tag: int, path: string, opfd: int, d: ref Dir, data: array of byte, off: big, mkit, more: int) : (int, int, int, string) { if (d == nil && data == nil && opfd == Op->NOFD) return (NOFD, 0, 0, nil); flag := 0; if (d != nil) flag |= OSTAT; if (data != nil && (!mkit || (d.qid.qtype&QTDIR) == 0)) flag |= ODATA; if (mkit) flag |= OCREATE; if (more) flag |= OMORE; if (data == nil) data = array[0] of byte; tot := 0; rfd := NOFD; mtime := 0; for(;;){ nw := len data[tot:]; if (nw > MAXDATA) nw = MAXDATA; if (d == nil) d = ref nulldir; rc := opmux->rpc(ref Tmsg.Put(tag, path, opfd, flag, *d, off, data[tot:tot+nw])); vers := 0; r := <- rc; pick rr := r { Put => if (rr.count != nw) return (NOFD, 0, 0, "short write"); vers = rr.qid.vers; rfd = rr.fd; mtime = rr.mtime; Error => return (NOFD, 0, 0, rr.ename); * => panic("putbug"); } tot += nw; if (tot == len data) return (rfd, vers, mtime, nil); flag &= ~(OSTAT|OCREATE); } panic("putbug"); return (NOFD, 0, 0, "putbug"); } xputandcreate(x: Xop) { putandcreate(x.tag, x.f, x.path, x.d, x.rc); } putandcreate(tag: int, f: ref Cfile, path: string, d: ref Dir, rc: chan of ref Crep) { (fd, vers, mtime, e) := put(tag, path, f.opwrfd, f.d, nil, big 0, 1, 1); f.opwrfd = fd; if (e != nil){ f.remove(); rc <-= ref Crep.Create(e, nil); } else { f.created = f.dirtyd = 0; f.wstat(d); if (vers != 0 || mtime != 0){ f.serverqid.vers = vers; f.d.qid.vers = vers; f.d.mtime = mtime; } f.time = millisec(); rc <-= ref Crep.Create(nil, ref *f.d); } donec <-= f; } xputandwrite(x: Xop) { putandwrite(x.tag, x.f, x.path, x.data, x.o, x.rc); } putandwrite(tag: int, f: ref Cfile, path: string, data: array of byte, off: big, rc: chan of ref Crep) { async := (len data == 8192 && !f.created && off != big 0); # (off % big (4*8192)) != big 0); l := len f.d.name; if (off != big 0 && (l > 4 && (f.d.name[l-4:l] == ".dis" || f.d.name[l-4:l] == ".sbl"))) async = 1; d: ref Dir; if (f.dirtyd) d = f.d; f.dirtyd = 0; mkit := f.created; f.created = 0; if (async) donec <-= f; (fd, vers, mtime, e) := put(tag, path, f.opwrfd, d, data, off, mkit, 1); f.opwrfd = fd; if (e != nil){ f.remove(); if (!async) rc <-= ref Crep.Pwrite(e, len data); else fprint(stderr, "\n*** ofs: delayed write error ***\n\n"); } else { if (!async){ f.serverqid.vers = vers; f.d.qid.vers = vers; f.d.mtime= mtime; f.time = millisec(); rc <-= ref Crep.Pwrite(nil, len data); } else { # Race, but safer this way. f.d.qid.vers = vers; if (mtime) f.d.mtime = mtime; } } if (!async) donec <-= f; } xputandwstat(x: Xop) { putandwstat(x.tag, x.f, x.path, x.d, x.rc); } putandwstat(tag: int, f: ref Cfile, path: string, d: ref Dir, rc: chan of ref Crep) { mkit := f.created; f.created = 0; f.dirtyd = 0; (fd, vers, mtime, e) := put(tag, path, f.opwrfd, d, nil, big 0, mkit, 1); f.opwrfd = fd; if (e != nil){ f.remove(); rc <-= ref Crep.Wstat(e, nil); } else { f.serverqid.vers = vers; f.wstat(d); f.d.mtime = mtime; f.time = millisec(); rc <-= ref Crep.Wstat(nil, ref *f.d); } donec <-= f; } xputandsync(x: Xop) { putandsync(x.tag, x.f, x.path, x.rc); } putandsync(tag: int, f: ref Cfile, path: string, rc: chan of ref Crep) { mkit := f.created; f.created = 0; d: ref Dir; if (f.dirtyd) d = ref *f.d; f.dirtyd = 0; f.oldname = nil; f.fsfd = nil; rc <-= ref Crep.Sync(nil); (fd, vers, mtime, e) := put(tag, path, f.opwrfd, d, nil, big 0, mkit, 0); f.opwrfd = fd; if (f.oprdfd != NOFD){ (fd, nil, nil, nil) = get(tag, path, f.oprdfd, OSTAT, 0, big 0); f.oprdfd = fd; } if (e != nil) f.remove(); else { f.serverqid.vers = vers; if (vers != 0) f.d.qid.vers = vers; if (mtime != 0) f.d.mtime = mtime; f.time = millisec(); } donec <-= f; } xdelandremove(x: Xop) { delandremove(x.tag, x.f, x.path, x.rc); } delandremove(tag: int, f: ref Cfile, path: string, rc: chan of ref Crep) { oprc := opmux->rpc(ref Tmsg.Remove(2*tag, path)); opr := <-oprc; e: string; pick oprr := opr { Remove => f.remove(); Error => e = oprr.ename; } rc <-= ref Crep.Remove(e); if (f.oprdfd != NOFD){ (fd, nil, nil, nil) := get(tag, path, f.oprdfd, OSTAT, 0, big 0); f.oprdfd = fd; } if (f.opwrfd != NOFD){ (fd, nil, nil, nil) := put(tag, path, f.opwrfd, nil, nil, big 0, 0, 0); f.opwrfd = fd; } donec <-= f; }