# Op to styx file server implement Ofs; include "sys.m"; sys: Sys; MREPL, FD, MAFTER, DMDIR, QTDIR, fprint, pctl, fildes, MBEFORE, MCREATE, create, OREAD, open, read, ORDWR, nulldir, dial, sprint, pipe, stat, mount, werrstr, OTRUNC: import sys; include "draw.m"; include "styx.m"; styx: Styx; Rmsg, Tmsg: import styx; include "ofsstyx.m"; styxs: Styxservers; Styxserver, Fid, Ebadfid, Navigator, Navop: import styxs; Enotdir, Enotfound: import styxs; include "arg.m"; arg: Arg; usage: import arg; include "keyring.m"; include "security.m"; include "env.m"; env: Env; getenv: import env; include "string.m"; str: String; splitstrl: import str; include "netutil.m"; util: Netutil; Client, netmkaddr, authfd : import util; include "op.m"; op: Op; include "opmux.m"; opmux: Opmux; include "stop.m"; stop: Stop; include "error.m"; err: Error; checkload, kill, stderr, error: import err; include "ofsnotify.m"; notify: Ofsnotify; Ofs: module { init: fn(nil: ref Draw->Context, argv: list of string); }; Stats: adt { nwalk, nopen, ncreate, nread, nwrite: int; nclunk, nstat, nremove, nwstat: int; dump: fn(s: self ref Stats); }; # Process model: # requestproc accepts one request after another from either the styxserver # or the navigator, and spawns one process per request (either # fsreq or navreq). Thus, we process concurrent requests. # # In the stop cache, a single process synchronizes access to the tree and takes # one request after another. However, it spawns other processes to speak # Op. Cached requests are replied by the central cfsproc process. Other # requests spawn a process to issue the Op RPC(s) concurrently. # Such Op speaker processes are spawned by xcfsproc(), to control the max # number of concurrent requests that we might place on the remote server. # # replyproc accepts one reply after another to be sent to the client debug := 0; doauth := 1; dostats := 0; stats : ref Stats; user := "none"; Stats.dump(s: self ref Stats) { tot := s.nwalk + s.nopen + s.ncreate + s.nread + s.nwrite; tot += s.nclunk + s.nstat + s.nremove + s.nwstat; tot += 2; # attach/version fprint(stderr,"styx:\n"); fprint(stderr,"\t%s\t%d\n", "walk", s.nwalk); fprint(stderr,"\t%s\t%d\n", "open", s.nopen); fprint(stderr,"\t%s\t%d\n", "create", s.ncreate); fprint(stderr,"\t%s\t%d\n", "read", s.nread); fprint(stderr,"\t%s\t%d\n", "write", s.nwrite); fprint(stderr,"\t%s\t%d\n", "clunk", s.nclunk); fprint(stderr,"\t%s\t%d\n", "stat", s.nstat); fprint(stderr,"\t%s\t%d\n", "remove", s.nremove); fprint(stderr,"\t%s\t%d\n", "wstat", s.nwstat); fprint(stderr,"\ttotal\t%d\n", tot); } navreq(m: ref Navop) { qid := m.path; pick n := m { Stat => n.reply <-= stop->stat(m.tag, qid); Walk => if (n.name == nil || n.name == ".") n.reply <-= stop->validate(m.tag, qid, ""); else if (n.name[0] == '/'){ # see ./ofsstyxservers.b # Got "/a/b/c" because we are about to process a Twalk # for {a, b, c} for qid. # Here is when we check that the cache entry is valid. stats.nwalk++; (nil, sub) := splitstrl(n.name, "!!DUMP"); if (sub != nil){ # stop->dump(); stats.dump(); opmux->dump(); } n.reply <-= stop->validate(m.tag, qid, n.name); } else{ # Then we get one-by-one walks n.reply <-= stop->walk1(m.tag, qid, n.name); } Readdir => # BUG: This asks stop to read a directory a lot of times, # even for a single directory. Fixing this requires changes to # Styxserver.read (label Dread in ofsstyx.b). # Indeed, the real BUG is that the navigator should go. # We can adjust ofsstyx.b to ask us what we know we have # in stop.b (dl, e) := stop->readdir(m.tag, qid, n.count, n.offset); if (e != nil) n.reply <-= (nil, e); else { for (; dl != nil; dl = tl dl) n.reply <-= (ref hd dl, nil); n.reply <-= (nil, nil); } } } ofsreq(srv: ref Styxserver, req: ref Tmsg) { if (req != nil) pick m := req { Readerror => fprint(stderr, "ofs: read error: %s\n", m.error); Attach => # Tattach wont Twalk "/", and stop validates at Twalk (nil, e) := stop->validate(m.tag, big 0, ""); if (e != nil) srv.reply(ref Rmsg.Error(m.tag, e)); else srv.attach(m); Open => stats.nopen++; umode := m.mode; # save OTRUNC fid := srv.open(m); if (fid != nil) if ((umode & OTRUNC) != 0){ d := ref nulldir; d.length = big 0; stop->wstat(m.tag, fid.path, d); # this truncates. } Read => stats.nread++; (fid, e) := srv.canread(m); if(e != nil){ srv.reply(ref Rmsg.Error(m.tag, e)); return; } if(fid.qtype & QTDIR){ srv.default(m); return; } (d, re) := stop->pread(m.tag, fid.path, m.count, m.offset); if (re != nil) srv.reply(ref Rmsg.Error(m.tag, re)); else srv.reply(ref Rmsg.Read(m.tag, d)); Write => stats.nwrite++; (fid, e) := srv.canwrite(m); if(e != nil){ srv.reply(ref Rmsg.Error(m.tag, e)); return; } we : string; cnt : int; if (len m.data > 0) (cnt, we) = stop->pwrite(m.tag, fid.path, m.data, m.offset); if (we != nil) srv.reply(ref Rmsg.Error(m.tag, we)); else srv.reply(ref Rmsg.Write(m.tag, cnt)); Flush => # BUG: we accept concurrent requests. # a flush should cancel them, as said in flush(5). stop->flush(m.tag, m.oldtag); srv.reply(ref Rmsg.Flush(m.tag)); Clunk => stats.nclunk++; fid := srv.getfid(m.fid); if(fid == nil) { srv.reply(ref Rmsg.Error(m.tag, Ebadfid)); return; } srv.delfid(fid); e := stop->sync(m.tag, fid.path); if (e != nil) srv.reply(ref Rmsg.Error(m.tag, e)); else srv.reply(ref Rmsg.Clunk(m.tag)); Create => stats.ncreate++; (fid, mode, d, e) := srv.cancreate(m); if (e != nil){ srv.reply(ref Rmsg.Error(m.tag, e)); return; } (d, e) = stop->create(m.tag, fid.path, d); if (e != nil){ srv.reply(ref Rmsg.Error(m.tag, e)); return; } fid.open(mode, d.qid); srv.reply(ref Rmsg.Create(m.tag, d.qid, 8*1024)); Remove => stats.nremove++; (fid, nil, e) := srv.canremove(m); if(e != nil) { srv.reply(ref Rmsg.Error(m.tag, e)); return; } e = stop->remove(m.tag, fid.path); if (e != nil){ srv.reply(ref Rmsg.Error(m.tag, e)); return; } srv.delfid(fid); srv.reply(ref Rmsg.Remove(m.tag)); Wstat => stats.nwstat++; fid := srv.getfid(m.fid); if(fid == nil) { srv.reply(ref Rmsg.Error(m.tag, Ebadfid)); return; } (nil, e) := stop->wstat(m.tag, fid.path, ref m.stat); if (e != nil) srv.reply(ref Rmsg.Error(m.tag, e)); else srv.reply(ref Rmsg.Wstat(m.tag)); * => srv.default(m); } } terminate(srv: ref Styxserver, navc: chan of ref Navop, optoo: int) { if (dostats){ stats.dump(); opmux->dump(); } if (notify != nil) notify->gone(); navc <-= nil; stop->term(); if (optoo) opmux->term(); srv.replychan <-= nil; kill(pctl(0,nil), "killgrp"); # kill tmsgreader and any other exit; } fsrequestproc(srv: ref Styxserver, reqc: chan of ref Tmsg, navc: chan of ref Navop, endc: chan of string) { if (debug) fprint(stderr, "echo killgrp >/prog/%d/ctl\n", pctl(0,nil)); if (notify != nil) notify->arrived(); for(;;){ # BUG: should cache fs and nav processes. alt { <- endc => terminate(srv, navc, 0); r := <-reqc => if (r == nil) terminate(srv, navc, 1); ofsreq(srv, r); } } } navrequestproc(navc: chan of ref Navop) { for(;;){ n := <-navc; if (n == nil) exit; navreq(n); } } replyproc(srv: ref Styxserver) { for(;;){ r := <- srv.replychan; if (r == nil) break; nw := srv.replydirect(r); if (nw < 0){ opmux->term(); stop->term(); break; } } } attach(path: string) : string { rc := opmux->rpc(ref (Op->Tmsg).Attach(0, "nemo", path)); r := <- rc; pick rr := r { Attach => return nil; Error => return rr.ename; * => return "can't attach"; } } dorecover := 0; recoveraddr: string; recoveralg: string; recoverkfile: string; recoverpath: string; recover(): ref FD { addr := recoveraddr; alg := recoveralg; kfile := recoverkfile; opfd, opcfd: ref FD; while(dorecover){ if (addr[0] == '/') opfd = open(addr, ORDWR); if (opfd == nil){ addr = netmkaddr(addr, "tcp", "op"); (rc, c) := dial(addr, nil); if (rc < 0) continue; opfd = c.dfd; opcfd = c.cfd; c.dfd = c.cfd = nil; } if (doauth){ (afd, ae) := authfd(opfd, Client, alg, kfile, addr); opfd = afd; afd = nil; if (debug && ae != nil) fprint(stderr, "ofs: authenticated: %s\n", ae); if (opfd == nil) continue; user = ae; } e := attach(recoverpath); if (e == nil) return opfd; } return nil; } service(pidc: chan of int, sfd, cfd: ref FD, cdir: string, path: string, lag: int) { pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWPGRP|Sys->NEWFD, list of {0,1,2,sfd.fd,cfd.fd}); stats = ref Stats(0, 0, 0, 0, 0, 0, 0, 0, 0); op->init(); endc := chan[1] of string; opmux->init(cfd, op, endc); e := attach(path); if (e != nil){ sfd = cfd = nil; error("ofs: can't attach"); } else if (debug) fprint(stderr, "attached as %s\n", user); styx->init(); styxs->init(styx); navc := chan of ref Navop; nav := Navigator.new(navc); (reqc, srv) := Styxserver.new(sfd, nav, big 0); srv.replychan = chan[10] of ref Styx->Rmsg; spawn replyproc(srv); if (stop->init(styx, opmux, cdir, lag) != nil){ fprint(stderr, "should not happen: cache init failed\n"); raise("fail:stop"); } spawn navrequestproc(navc); spawn fsrequestproc(srv, reqc, navc, endc); if (debug) fprint(stderr, "ps | grep ' %d ' \n", pctl(0,nil)); } oimport(fd: ref FD): string { data := array[10] of byte; nr := read(fd, data, 9); if (nr <= 0){ if (debug)fprint(stderr, "ofs: import: %r\n"); return nil; } dr := int (string data); data = array[dr] of byte; nr = read(fd, data, dr); if (nr != dr){ if (debug)fprint(stderr, "ofs: import: short read\n"); return nil; } return string data; } init(nil: ref Draw->Context, args: list of string) { sys = load Sys Sys->PATH; err = load Error Error->PATH; err->init(); styx = checkload(load Styx Styx->PATH, Styx->PATH); styxs = checkload(load Styxservers "/dis/o/ofsstyx.dis", "/dis/o/ofsstyx.dis"); env = checkload(load Env Env->PATH, Env->PATH); str = checkload(load String String->PATH, String->PATH); op = checkload(load Op Op->PATH, Op->PATH); opmux = checkload(load Opmux Opmux->PATH, Opmux->PATH); stop = checkload(load Stop Stop->PATH, Stop->PATH); util = checkload(load Netutil Netutil->PATH, Netutil->PATH); arg = checkload(load Arg Arg->PATH, Arg->PATH); mnt : string; cdir, alg, kfile : string; path := "/"; arg->init(args); lag := 1000; arg->setusage("ofs [-Adr] [-C alg] [-k keyfile] [-c dir] [-m mnt] [-l n] addr [path]"); while((opt := arg->opt()) != 0) { case opt{ 'A' => doauth = 0; 'C' => alg = arg->earg(); 'k' => kfile = arg->earg(); 'c' => cdir = arg->earg(); 'd' => debug++; if (debug > 1) opmux->debug = 1; if (debug > 2) stop->debug = 1; styxs->traceset(1); 'v' => dostats = 1; 'l' => lag = int arg->earg(); 'm' => mnt = arg->earg(); 'r' => dorecover = 1; * => usage(); } } addr : string; args = arg->argv(); case len args { 2 => addr = hd args; path = hd tl args; 1 => addr = hd args; * => usage(); } opfd, opcfd : ref FD; if (addr[0] == '/') opfd = open(addr, ORDWR); if (opfd == nil){ addr = netmkaddr(addr, "tcp", "op"); (rc, c) := dial(addr, nil); if (rc < 0) error(sprint("%s: %r\n", addr)); opfd = c.dfd; opcfd = c.cfd; c.dfd = c.cfd = nil; } if (doauth){ (afd, ae) := authfd(opfd, Client, alg, kfile, addr); opfd = afd; afd = nil; if (debug && ae != nil) fprint(stderr, "ofs: authenticated: %s\n", ae); if (opfd == nil) error(sprint("ofs: fail: %s: %r\n", addr)); user = ae; } pidc := chan[1] of int; if (mnt == nil) service(pidc, fildes(0), opfd, cdir, path, lag); else { if (mnt == "auto"){ sname := oimport(opfd); if (sname == nil) error("ofs: fail: import\n"); mnt = "/devs/" + sname; fprint(stderr, "ofs: importing %s\n", sname); create(mnt, OREAD, DMDIR|8r0555); # in case it does not exist notify = load Ofsnotify Ofsnotify->PATH; if (notify != nil) notify->init(sname); } else if (dorecover){ recoveraddr = addr; recoveralg = alg; recoverkfile= kfile; recoverpath = path; opmux->recoverfn = recover; } pfds := array[2] of ref FD; if (pipe(pfds) < 0) error(sprint("ofs: pipe: %r")); spawn service(pidc, pfds[0], opfd, cdir, path, lag); <- pidc; pfds[0] = nil; opfd = nil; if (mount(pfds[1], nil, mnt, MREPL|MCREATE, nil) < 0) error(sprint("ofs: mount: %r")); pfds[0] = nil; } }