implement CopyServer; include "sys.m"; include "draw.m"; include "string.m"; include "styx.m"; include "styxservers.m"; include "sh.m"; CopyServer: module { init: fn(ctxt: ref Draw->Context, argv: list of string); }; sys: Sys; str: String; styx: Styx; styxservers: Styxservers; nametree: Nametree; sh: Sh; Styxserver, Navigator: import styxservers; Tree: import nametree; Tmsg, Rmsg: import styx; SRCMNT: con "cpsrv_src"; # dirname prefix for temp source mount DSTMNT: con "cpsrv_dst"; # dirname prefix for temp dest mount RUNNING, EOF, DONE, RWERR, KILL, ABORT: con iota; # state codes CopyOpDesc: adt { # request arguments srcmntopt: string; # source mount option srcaddr: string; # source dial address srcfname: string; # source file pathname srcoff: big; # source read offset dstmntopt: string; # dest mount option dstaddr: string; # dest dial address dstfname: string; # dest file pathname dstoff: big; # dest write offset nofbytes: big; # nof bytes to copy from source to dest iounit: int; # data buffer size to use for copying delay: int; # artificial delay between r/w ops ctlfname: string; # name of control file for this copy op # additional state state: int; # current state of copy op bytecnt: big; # nof bytes copied so far srcmntdir: string; # source mount dir srcfd: ref sys->FD; # source file descriptor for reading dstmntdir: string; # dest mount dir dstfd: ref sys->FD; # dest file descriptor for writing qid: big; # qid of the copy op control file in the server tree mtag: int; # tag of the write request that started the copy op reply: ref Rmsg; # reply message to this request, for normal completion }; # BUG: # the r/w synchronization for the bytescopied field # between the copy thread and the main server thread is dirty # see BUG comments in routines handleTmsg and copyThread cqid := big 0; # seed for generating unique qids for the server file tree Qroot, Qctrl: big; # root and control file qids nfy: chan of ref CopyOpDesc; # notification copy threads -> main server thread cpopsl: list of ref CopyOpDesc; # list of ongoing copy ops debug: int; # print msgs sent/received and other state info # aux routines dprint(msg: string) { if (debug) sys->fprint(sys->fildes(2), "%s\n", msg); } buildDir(name: string, perm: int, qid: big): Sys->Dir { d := sys->zerodir; d.name = name; d.qid.path = qid; d.uid = "*"; d.gid = "*"; if (perm & Sys->DMDIR) d.qid.qtype = Sys->QTDIR; else d.qid.qtype = Sys->QTFILE; d.mode = perm; return(d); } # copy op management stuff; simple and non-optimized printCopyOps() { s := "ongoing copy ops: "; for (cl := cpopsl; cl != nil; cl = tl cl) s = s + sys->sprint("%s ",(hd cl).ctlfname); s = s + "~"; dprint(s); } initCopyOps() { cpopsl = nil; } addCopyOp(cpop: ref CopyOpDesc) { cpopsl = cpop :: cpopsl; printCopyOps(); } rmvCopyOp(cpop: ref CopyOpDesc) { cpopslnew: list of ref CopyOpDesc; cpopslnew = nil; for (cl := cpopsl; cl != nil; cl = tl cl) if (hd cl != cpop) cpopslnew = hd cl :: cpopslnew; cpopsl = cpopslnew; printCopyOps(); } fndCopyOpByName(ctlfname: string): int { for (cl := cpopsl; cl != nil; cl = tl cl) if ((hd cl).ctlfname == ctlfname) return(1); return(0); } fndCopyOpByQid(qid: big): ref CopyOpDesc { for (cl := cpopsl; cl != nil; cl = tl cl) if ((hd cl).qid == qid) return(hd cl); return(nil); } fndCopyOpByTag(mtag: int): ref CopyOpDesc { for (cl := cpopsl; cl != nil; cl = tl cl) if ((hd cl).mtag == mtag) return(hd cl); return(nil); } # cmd execution via shell runcmd(ctxt: ref Draw->Context, cmdline: string): string { sh = load Sh Sh->PATH; if (sh == nil) return(sys->sprint("could not load Sh module: %r")); (n, args) := sys->tokenize(cmdline, " \t\n"); if (n == 0) return sys->sprint("empty command line string\n"); c := chan of int; fd := sys->open("/prog/"+string sys->pctl(0,nil)+"/wait", Sys->OREAD); fds := array[2] of ref Sys->FD; sys->pipe(fds); spawn doruncmd(ctxt, args, fds[1], c); pid := <- c; pidstr := sys->sprint("%d ",pid); waitstr: string; buf := array [256] of byte; do { n = sys->read(fd, buf, len buf); waitstr = string buf[0:n]; } while (!str->prefix(pidstr, waitstr)); (res, d) := sys->fstat(fds[0]); if (d.length == big 0) { return(nil); } n = sys->read(fds[0], buf, len buf); return(string buf[0:n]); } doruncmd(ctxt: ref Draw->Context, args: list of string, errfd: ref sys->FD, c: chan of int) { pid := sys->pctl(sys->FORKFD, nil); sys->dup(errfd.fd, 2); c <-= pid; sh->run(ctxt, args); } # mount and unmount stuff; for convenience implemented using shell commands unmount(opt, mnt: string) { if (opt != nil) { runcmd(nil, "unmount "+mnt); runcmd(nil, "rm "+mnt); } } mount_open(mntopt, addr, mntdir, fname: string, off: big, mode : int): (string, ref sys->FD) { fpath := fname; if (fpath[0] != '/') fpath = "/" + fpath; if (mntopt != nil) { err := runcmd(nil, "mkdir " + mntdir); if (err != nil) return((err, nil)); authopt := ""; if (str->in('A', mntopt)) authopt = "-A"; if (str->in('o', mntopt)) err = runcmd(nil, "o/ofs" + " -m " + mntdir + " " + authopt + " " + addr + " /"); else if (str->in('s', mntopt)) err = runcmd(nil, "mount" + " " + authopt + " " + addr + " " + mntdir); else { sys->fprint(sys->fildes(2), "copy server fatal: illegal option: %s", mntopt); raise "fatal:mntopt"; } if (err != nil) { runcmd(nil, "rm " + mntdir); return((err, nil)); } fpath = mntdir + fpath; } fd: ref sys->FD; if (mode == sys->OREAD) fd = sys->open(fpath, mode); else if (mode == sys->OWRITE) fd = sys->create(fpath, mode, 8r777); #BUG: # ownership and access rights when creating a file? if (fd == nil) { err := sys->sprint("could not open %s: %r", fname); unmount(mntopt, mntdir); return ((err, nil)); } sys->seek(fd, off, Sys->SEEKSTART); return((nil, fd)); } # cmd parsing stuff isValidMntOption(opt: string): int { return ((opt == "-s") || (opt == "-sA") | (opt == "-o") || (opt == "-oA")); } isValidInt(num: string): (int, int) { (i, s) := str->toint(num, 10); return(((s == nil) && (i >= 0), i)); } isValidBig(num: string): (int, big) { (b, s) := str->tobig(num, 10); return(((s == nil) && (b >= big 0), b)); } parseFileSpec(cmdlist: list of string): (string, string, string, string, big, list of string) { if (cmdlist == nil) return (("no file name", nil, nil, nil, big 0, nil)); fname := hd cmdlist; cmdlist = tl cmdlist; opt := string nil; addr := string nil; if (fname[0] == '-') { opt = fname; if (!isValidMntOption(opt)) return (("invalid mount option", nil, nil, nil, big 0, nil)); if (cmdlist == nil) return (("no dial address", nil, nil, nil, big 0, nil)); addr = hd cmdlist; cmdlist = tl cmdlist; if (cmdlist == nil) return (("no file name", nil, nil, nil, big 0, nil)); fname = hd cmdlist; cmdlist = tl cmdlist; } if (cmdlist == nil) return (("no offset", nil, nil, nil, big 0, nil)); (ok, off) := isValidBig(hd cmdlist); cmdlist = tl cmdlist; if (!ok) return (("invalid offset", nil, nil, nil, big 0, nil)); return((nil, opt, addr, fname, off, cmdlist)); } parseCmd(cmdline: string): (string, ref CopyOpDesc) { dprint(sys->sprint("request: %s", cmdline)); (n, cmdlist) := sys->tokenize(cmdline, " \n"); if (cmdlist == nil) return (("empty cmd string", nil)); cmd := hd cmdlist; cmdlist = tl cmdlist; if (cmd != "copy") return (("copy expected", nil)); cpop := ref CopyOpDesc(nil, nil, nil, big 0, nil, nil, nil, big 0, big 0, 0, 0, nil, 0, big 0, nil, nil, nil, nil, big 0, 0, nil); err: string; (err, cpop.srcmntopt, cpop.srcaddr, cpop.srcfname, cpop.srcoff, cmdlist) = parseFileSpec(cmdlist); if (err != nil) return (("source: " + err, nil)); (err, cpop.dstmntopt, cpop.dstaddr, cpop.dstfname, cpop.dstoff, cmdlist) = parseFileSpec(cmdlist); if (err != nil) return (("dest: " + err, nil)); if (len cmdlist != 4) return (("nof bytes, iounit, delay and ctlfname expected", nil)); ok: int; (ok, cpop.nofbytes) = isValidBig(hd cmdlist); cmdlist = tl cmdlist; if (!ok) return (("invalid nofbytes", nil)); (ok, cpop.iounit) = isValidInt(hd cmdlist); cmdlist = tl cmdlist; if (!ok) return (("invalid iounit", nil)); (ok, cpop.delay) = isValidInt(hd cmdlist); cmdlist = tl cmdlist; if (!ok) return (("invalid delay", nil)); cpop.ctlfname = hd cmdlist; cmdlist = tl cmdlist; return ((nil, cpop)); } # copy thread stuff startCopyThread(filetree: ref Tree, msg: ref Tmsg.Write): string { (err, cpop) := parseCmd(string msg.data); if (err != nil) return(err); if ((cpop.ctlfname == "ctl") || fndCopyOpByName(cpop.ctlfname)) return("ctl file name already in use"); cpop.srcmntdir = SRCMNT + "_" + cpop.ctlfname; (err, cpop.srcfd) = mount_open(cpop.srcmntopt, cpop.srcaddr, cpop.srcmntdir, cpop.srcfname, cpop.srcoff, sys->OREAD); if (err != nil) return(err); cpop.dstmntdir = DSTMNT + "_" + cpop.ctlfname; (err, cpop.dstfd) = mount_open(cpop.dstmntopt, cpop.dstaddr, cpop.dstmntdir, cpop.dstfname, cpop.dstoff, sys->OWRITE); if (err != nil) { unmount(cpop.srcmntopt, cpop.srcmntdir); return(err); } cpop.state = RUNNING; cpop.bytecnt = big 0; cpop.qid = cqid++; cpop.mtag = msg.tag; cpop.reply = ref Rmsg.Write(msg.tag, len msg.data); filetree.create(Qroot, buildDir(cpop.ctlfname, 8r666, cpop.qid)); #BUG: # ownership and access rights when creating a control file? addCopyOp(cpop); spawn copyThread(cpop); return(nil); } copyThread(cpop: ref CopyOpDesc) { cnt := big 0; data := array [cpop.iounit] of byte; dprint(sys->sprint("copyThread started: %s", cpop.ctlfname)); while (cpop.state == RUNNING) { rcnt := len data; if ((cpop.nofbytes > big 0) && (big rcnt > cpop.nofbytes - cnt)) rcnt = int (cpop.nofbytes - cnt); n1 := sys->read(cpop.srcfd, data, rcnt); if (n1 < 0) { cpop.reply = ref Rmsg.Error(cpop.mtag, sys->sprint("read error: %r")); cpop.state = RWERR; break; } else if (n1 == 0) { cpop.state = EOF; break; } n2:= sys->write(cpop.dstfd, data, n1); if (n2 != n1) { cpop.reply = ref Rmsg.Error(cpop.mtag, sys->sprint("write error: %r")); cpop.state = RWERR; break; } cnt = cnt + big n1; cpop.bytecnt = cnt; # BUG: # we change the value of bytescopied in "a single shot" # however, this does not guarantee read atomicity # so the main server thread may read invalid data # see BUG comment in function handleTMsg if (cnt == cpop.nofbytes) { cpop.state = DONE; break; } sys->sleep(cpop.delay); } if (cpop.state == KILL) cpop.reply = ref Rmsg.Error(cpop.mtag, sys->sprint("killed after %bd bytes", cnt)); nfy <- = cpop; # termination signal to main server thread dprint(sys->sprint("copyThread stopped: %s", cpop.ctlfname)); } gcCopyOp(srv: ref Styxserver, filetree: ref Tree, cpop: ref CopyOpDesc) { termcodes := array [ABORT+1] of {"RUNNING", "EOF", "DONE", "RWERRROR", "KILLED", "ABORTED"}; dprint(sys->sprint("term signal from %s: %s", cpop.ctlfname, termcodes[cpop.state])); unmount(cpop.srcmntopt, cpop.srcmntdir); unmount(cpop.dstmntopt, cpop.dstmntdir); filetree.remove(cpop.qid); rmvCopyOp(cpop); # send reply msg to unblock client if (cpop.state != ABORT) srv.reply(cpop.reply); } killCopyThread(cpop: ref CopyOpDesc, cmd: int) { cpop.state = cmd; # force copy thread to terminate, if not done yet } killAllCopyThreads(srv: ref Styxserver, filetree: ref Tree, cmd: int) { for (cl := cpopsl; cl != nil; cl = tl cl) killCopyThread(hd cl, cmd); while (cpopsl != nil) { cpop := <- nfy; gcCopyOp(srv, filetree, cpop); } } # main server thread stuff handleTmsg(srv: ref Styxserver, filetree: ref Tree, msg: ref Tmsg) { if (msg == nil) { killAllCopyThreads(srv, filetree, ABORT); dprint("stopping copy server"); srv.default(msg); # kills this process sys->fprint(sys->fildes(2), "copy server fatal: should be dead at this point"); raise "fatal:noterm"; } pick m := msg { Open => { fid := srv.open(m); if (fid != nil) { if ((fid.path != Qctrl) && (fid.path != Qroot) && ((cpop:=fndCopyOpByQid(fid.path)) != nil)) # cache for subsequent reads on this fid fid.data = array of byte sys->sprint("%bd",cpop.bytecnt); # BUG: # reading bytescopied is not necessarily atomic # see BUG comment in function copyThread } } Write => { (fid, err) := srv.canwrite(m); if (err != nil) srv.reply(ref Rmsg.Error(m.tag, err)); else if (fid.path == Qctrl) { err = startCopyThread(filetree, m); if (err != nil) srv.reply(ref Rmsg.Error(m.tag, err)); # if all went well, we do not reply to block the client } else { (n,cmd) := sys->tokenize(string m.data, " \n"); if ((n != 1) || (hd cmd != "kill")) srv.reply(ref Rmsg.Error(m.tag,"usage: kill")); else { if ((cpop:=fndCopyOpByQid(fid.path)) != nil) killCopyThread(cpop, KILL); srv.reply(ref Rmsg.Write(m.tag, len m.data)); # killing a terminated copy op succeeds as well } } } Read => { (fid, err) := srv.canread(m); if (err != nil) srv.reply(ref Rmsg.Error(m.tag, err)); else if ((fid.path == Qctrl) || (fid.path == Qroot)) srv.default(msg); else srv.reply(styxservers->readbytes(m, fid.data)); } Flush => { srv.default(msg); if ((cpop:=fndCopyOpByTag(m.oldtag)) != nil) killCopyThread(cpop, ABORT); } * => { srv.default(msg); } }; } serve(fd: ref sys->FD, c: chan of int) { pid := sys->pctl(Sys->FORKNS|Sys->NEWFD|Sys->NEWPGRP, list of {1,2,fd.fd}); styx->init(); styxservers->init(styx); nametree->init(); styxservers->traceset(debug); Qroot = cqid++; Qctrl = cqid++; (filetree, filetreeop) := nametree->start(); filetree.create(Qroot, buildDir(".", 8r555|Sys->DMDIR, Qroot)); filetree.create(Qroot, buildDir("ctl", 8r222, Qctrl)); (tchan, srv) := Styxserver.new(fd, Navigator.new(filetreeop), Qroot); initCopyOps(); nfy = chan of ref CopyOpDesc; dprint("copy server started"); c <- = pid; while (1) { alt { tmsg := <- tchan => handleTmsg(srv, filetree, tmsg); cpop := <- nfy => gcCopyOp(srv, filetree, cpop); } } } init(nil: ref Draw->Context, args: list of string) { dirname: string; ok: int; sys = load Sys Sys->PATH; str = load String String->PATH; if ((len args < 2) || (len args > 3)) { sys->print("usage: copyserver [-d] mnt\n"); exit; } if (len args == 2) { debug = 0; dirname = hd tl args; } else if (hd tl args == "-d") { debug = 1; dirname = hd tl tl args; } else { sys->print("usage: copyserver [-d] mnt\n"); exit; } styx = load Styx Styx->PATH; styxservers = load Styxservers Styxservers->PATH; nametree = load Nametree Nametree->PATH; fds := array[2] of ref Sys->FD; sys->pipe(fds); c := chan of int; spawn serve(fds[0], c); <- c; fds[0] = nil; res := sys->mount(fds[1], nil, dirname, sys->MREPL, nil); if (res == -1) { sys->print("could not mount: %r\n"); exit; } sys->print("server mounted on %s\n", dirname); fds[1] = nil; }