implement Opmux; include "sys.m"; sys: Sys; write, millisec, pctl, fprint, fildes, QTDIR, FD: import sys; include "op.m"; op: Op; Tmsg, Rmsg, ODATA, OSTAT, OMORE: import op; include "error.m"; err: Error; stderr, kill: import err; include "opmux.m"; Opcall: adt { req: ref Op->Tmsg; repc: chan of ref Op->Rmsg; }; Stats: adt { nput, nget, nremove, nflush, nconc, nrecover: int; dump: fn(s: self ref Stats); }; # # Tags must be assigned by clients, so that the mux knows # how to flush a Tag # opc : chan of ref Opcall; oprdprocpid: int; stats: ref Stats; init(ofd: ref Sys->FD, o: Op, endc: chan of string) { sys = load Sys Sys->PATH; err = load Error Error->PATH; err->init(); op = o; stats = ref Stats(0, 0, 0, 0, 0, 0); opc = chan of ref Opcall; oprc:= chan of ref Rmsg; spawn oprdproc(ofd, oprc); <-oprc; spawn opmuxproc(ofd, opc, oprc, endc); } term() { opc <-= nil; } recover(ofd: ref FD, oprc: chan of ref Rmsg, reqs: list of ref Opcall): list of ref Opcall { kill(oprdprocpid, "kill"); spawn oprdproc(ofd, oprc); <-oprc; l: list of ref Opcall; for(; reqs != nil; reqs = tl reqs){ m := hd reqs; tmsg := m.req.pack(); nw := write(ofd, tmsg, len tmsg); if (nw != len tmsg){ fprint(stderr, "ofs: recover: failed: %s\n", m.req.text()); m.repc <-= ref Rmsg.Error(m.req.tag, "i/o error"); } else l = m :: l; } stats.nrecover++; return l; } Stats.dump(s: self ref Stats) { tot := s.nput + s.nget + s.nremove; fprint(stderr,"op:\n"); fprint(stderr,"\t%s\t%d\n", "put", s.nput); fprint(stderr,"\t%s\t%d\n", "get", s.nget); fprint(stderr,"\t%s\t%d\n", "remove", s.nremove); fprint(stderr, "\t%s\t%d\n", "flush", s.nflush); fprint(stderr, "\t%s\t%d\n", "recover", s.nrecover); fprint(stderr, "\tconc.\t%d\n", s.nconc); fprint(stderr,"\ttotal\t%d\n", tot); } oprdproc(ofd: ref FD, oprc: chan of ref Rmsg) { oprdprocpid = pctl(0, nil); oprc <-= nil; if (debug) fprint(stderr, "opmuxproc\n"); for(;;){ m := op->Rmsg.read(ofd, 0); if (debug){ if (m == nil) fprint(stderr, "oprdproc: eof\n"); } oprc <-= m; if (m == nil || tagof(m) == tagof(Rmsg.Readerror)) break; } if (debug) fprint(stderr, "oprdproc: exit\n"); } opmuxproc(ofd: ref FD, opcc: chan of ref Opcall, oprc: chan of ref Rmsg, endc: chan of string) { reqs: list of ref Opcall; # outstanding ones. broken := 0; for(;;){ alt { m := <- opcc => now := millisec(); if (debug && reqs != nil) rdump(reqs); if (m == nil){ if (debug) fprint(stderr, "opmux: hangup: eof\n"); abortall(reqs); kill(oprdprocpid, "kill"); endc <-= "hangup: eof"; exit; } if (debug) fprint(stderr, "\n%d\t<-op- %s\n", now, m.req.text()); if (broken){ r := ref Rmsg.Error(m.req.tag, "i/o error"); if (debug) fprint(stderr, "\n%d\t-op-> %s\n", now, r.text()); m.repc <-= r; continue; } pick x := m.req { Put => stats.nput++; Get => stats.nget++; Remove => stats.nremove++; Flush => stats.nflush++; } tmsg := m.req.pack(); nw := write(ofd, tmsg, len tmsg); reqs = m :: reqs; if (len reqs > stats.nconc) stats.nconc = len reqs; if (nw != len tmsg){ fprint(stderr, "opmux: hangup: write: %r\n"); if (recoverfn != nil && (ofd = recoverfn()) != nil){ reqs = recover(ofd, oprc, reqs); continue; } m.repc <-= ref Rmsg.Error(m.req.tag, "i/o error"); abortall(reqs); kill(oprdprocpid, "kill"); endc <-= "hangup: write"; exit; } rmsg := <- oprc => m: ref Opcall; last: int; if (rmsg == nil || tagof(rmsg) == tagof(Rmsg.Readerror)){ fprint(stderr, "opmux: hangup: read\n"); if (tagof(rmsg) == tagof(Rmsg.Readerror) && debug) fprint(stderr, "%s\n", rmsg.text()); if (recoverfn != nil && (ofd = recoverfn()) != nil){ reqs = recover(ofd, oprc, reqs); continue; } abortall(reqs); kill(oprdprocpid, "kill"); endc <-= "hangup: read"; broken = 1; exit; } (m, reqs, last) = muxreply(reqs, rmsg); if (m == nil || m.repc == nil){ fprint(stderr, "nil reply o no reply chan\n"); continue; } if (debug) fprint(stderr, "%d\t-op-> %s\n\n", millisec(), rmsg.text()); m.repc <-= rmsg; } } kill(oprdprocpid, "kill"); } dump() { stats.dump(); } rdump(reqs: list of ref Opcall) { stats.dump(); while(reqs != nil){ fprint(stderr, "\t- %s\n", (hd reqs).req.text()); reqs = tl reqs; } } muxreply(reqs: list of ref Opcall, rmsg: ref Rmsg): (ref Opcall, list of ref Opcall, int) { nreqs : list of ref Opcall; call: ref Opcall; done := 1; flushed := ~0; pick fmsg := rmsg { Flush => for(l := reqs; l != nil && flushed == 0; l = tl l){ m := hd l; if (m.req.tag == rmsg.tag) pick freq := m.req { Flush => flushed = freq.oldtag; } } } for(; reqs != nil; reqs = tl reqs){ m := hd reqs; if (m.req.tag == flushed){ m.repc <-= ref Rmsg.Error(m.req.tag, "flushed"); } else if (m.req.tag != rmsg.tag) nreqs = m :: nreqs; else { call = m; pick req := m.req { Get => if (req.nmsgs == 0 || --req.nmsgs > 0){ pick rm := rmsg { Get => if ((rm.mode&OSTAT) && (rm.stat.qid.qtype&QTDIR) && (req.mode&ODATA)) req.nmsgs = 0; # dirs accept any number of Rget. if (rm.mode&OMORE) done = 0; } } } if (!done) nreqs = m :: nreqs; } } if (call == nil) fprint(stderr, "opmux: no request for %s\n", rmsg.text()); else if (tagof(rmsg) != tagof(Rmsg.Error) && rmsg.mtype() != call.req.mtype()+1){ fprint(stderr, "opmux: type mismatch:\n"); fprint(stderr, "\tcall: %s\n\treply:%s\n", rmsg.text(), call.req.text()); } return (call, nreqs, done); } abortall(reqs: list of ref Opcall) { if (debug) fprint(stderr, "opmux: aborting\n"); while(reqs != nil){ m := hd reqs; m.repc <-= ref Rmsg.Error(m.req.tag, "i/o error"); reqs = tl reqs; } } rpc(t: ref Op->Tmsg) : chan of ref Op->Rmsg { rc := chan[1] of ref Op->Rmsg; if (opc == nil) fprint(stderr, "opmux: nil opc"); opc <-= ref Opcall(t, rc); return rc; }