# # Event ports. used by o/mero among other things # Events are strings posted by a single write to "/post". # To listen for events, create a file and write a regex(2) on it. # any event posted is matched against the regex, and served as a # reply to read (offset ignored) if it matches. implement Ports; include "sys.m"; sys: Sys; Dir, pctl, NEWPGRP, DMDIR, open, DMEXCL, OREAD, FD, OWRITE, ORCLOSE, FORKFD, ORDWR, FORKNS, NEWFD, MREPL, MBEFORE, MAFTER, MCREATE, pipe, mount, fprint, sprint, create, pwrite, read, QTDIR, QTFILE, fildes, Qid: import sys; include "draw.m"; include "styx.m"; styx: Styx; Rmsg, Tmsg: import styx; include "error.m"; err: Error; checkload, stderr, panic, kill, error: import err; include "styxservers.m"; styxs: Styxservers; Styxserver, readbytes, readstr, Eexists, Enotfound, Navigator, Fid: import styxs; nametree: Nametree; Tree: import nametree; include "daytime.m"; daytime: Daytime; now: import daytime; include "arg.m"; arg: Arg; usage: import arg; include "tbl.m"; tbl: Tbl; Table: import tbl; include "string.m"; str: String; splitr: import str; include "env.m"; env: Env; getenv: import env; include "regex.m"; regex: Regex; Re: import regex; Ports: module { init: fn(nil: ref Draw->Context, argv: list of string); }; Nevs: con 128; # max nb. of buffered events in queue. We drop events when full. Tmout: con 60; # after so many seconds of dropping events the file is removed. File: adt { path: big; name: string; evq: array of string; evhd: int; evtl: int; re: Re; req: ref Tmsg.Read; orclose: int; multi: int; atime: int; new: fn(q: big, name: string, n: int, orclose: int): ref File; read: fn(f: self ref File, m: ref Tmsg.Read); post: fn(f: self ref File, s: string): int; abort: fn(f: self ref File); flush: fn(tag: int); }; files: ref Table[ref File]; # indexed by qid. Qroot, Qpost, Qrecv: con big iota; qgen:= Qrecv; debug := 0; user: string; nevs := Nevs; unsent: ref File; srv: ref Styxserver; File.new(q: big, name: string, n: int, orclose: int): ref File { return ref File(q, name, array[n] of string, 0, 0, nil, nil, orclose, 0, now()); } File.abort(f: self ref File) { if (f.req != nil) srv.reply(ref Rmsg.Error(f.req.tag, "file was removed")); f.req = nil; } File.flush(tag: int) { for (i := 0; i < len files.items; i++) for (l := files.items[i]; l != nil; l = tl l){ (nil, f) := hd l; if (f.req != nil && f.req.tag == tag){ srv.reply(ref Rmsg.Error(f.req.tag, "flushed")); f.req = nil; return; } } } nxt(i: int): int { return (i+1) % nevs; } File.read(f: self ref File, m: ref Tmsg.Read) { f.atime = now(); if (f.req != nil){ srv.reply(ref Rmsg.Error(m.tag, "concurrent read")); return; } m.offset = big 0; if (f.evq[f.evhd] != nil){ data := ""; tot := 0; do { tot += len array of byte f.evq[f.evhd]; if (tot > m.count) break; data += f.evq[f.evhd]; f.evq[f.evhd] = nil; f.evhd = nxt(f.evhd); } while (f.multi && f.evq[f.evhd] != nil); srv.reply(readstr(m, data)); } else f.req = m; } File.post(f: self ref File, ev: string): int { if (f.re == nil) # not programmed return 0; r := regex->execute(f.re, ev); if (r == nil) return 0; # no match if (f.req != nil){ srv.reply(readstr(f.req, ev)); f.req = nil; } else { f.evq[f.evtl] = ev; f.evtl = nxt(f.evtl); if (f.evtl == f.evhd){ f.evhd = nxt(f.evhd); # event lost if (now() - f.atime > Tmout) return -1; } } return 1; } readall(fname: string) : string { fd := open(fname, OREAD); if (fd == nil) return "none"; max : con int 1024; data := array[max] of byte; tot := nr := 0; do { nr = read(fd, data[tot:], len data - tot); if (nr > 0) tot += nr; } while(nr > 0 && tot < len data); if (tot == 0) return "none"; return string data[0:tot]; } newdir(name: string, perm: int, qid: big): Dir { d := sys->zerodir; d.name = name; d.uid = user; d.gid = user; d.qid.path = qid; if (perm & DMDIR) d.qid.qtype = QTDIR; else d.qid.qtype = QTFILE; d.mode = perm; return d; } fsreq(srv: ref Styxserver, tree: ref Tree, req: ref Tmsg) : ref Rmsg { pick m := req { Create => (fid, mode, d, e) := srv.cancreate(m); if (e != nil) return ref Rmsg.Error(m.tag, e); if (mode&DMDIR) return ref Rmsg.Error(m.tag, "can't create directories"); if (d.name == "post") return ref Rmsg.Error(m.tag, Eexists); f := File.new(++qgen, d.name, nevs, (mode&ORCLOSE)); d.qid = Qid(qgen, 0, 0); d.atime = d.mtime = now(); d.mode |= DMEXCL; e = tree.create(Qroot, *d); if (e != nil) return ref Rmsg.Error(m.tag, e); fid.open(mode, d.qid); files.add(int fid.path, f); if (d.name == "unsent"){ unsent = f; (f.re, nil) = regex->compile(".*", 0); } return ref Rmsg.Create(m.tag, d.qid, srv.iounit()); Remove => (fid, nil, e) := srv.canremove(m); srv.delfid(fid); if (e != nil) return ref Rmsg.Error(m.tag, e); if (fid.path == Qpost) return ref Rmsg.Error(m.tag, "permission denied"); e = tree.remove(fid.path); if (e != nil) return ref Rmsg.Error(m.tag, e); f := files.del(int fid.path); if (f != nil){ f.abort(); if (f == unsent) unsent = nil; } return ref Rmsg.Remove(m.tag); Read => (fid, e) := srv.canread(m); if (e != nil) return ref Rmsg.Error(m.tag, e); if (fid.qtype&QTDIR){ srv.default(req); return nil; } if (fid.path < Qrecv) panic("reading from the wrong file"); f := files.find(int fid.path); if (f == nil) return ref Rmsg.Error(m.tag, Enotfound); f.read(m); Write => (fid, e) := srv.canwrite(m); if (e != nil) return ref Rmsg.Error(m.tag, e); if (fid.path == Qpost){ msg := string m.data; oldl : list of ref File; posted := 0; for (i := 0; i < len files.items; i++) for(l := files.items[i]; l != nil; l = tl l){ f := (hd l).t1; if (f != unsent){ pc := f.post(msg); if (pc < 0) oldl = f :: oldl; posted |= pc; } } if (!posted && unsent != nil) unsent.post(msg); for(; oldl != nil; oldl = tl oldl){ f := hd oldl; tree.remove(f.path); files.del(int f.path); } } else { f := files.find(int fid.path); if (f == nil) return ref Rmsg.Error(m.tag, Enotfound); s := string m.data; if (s == "multi" || s == "multi\n") f.multi = 1; else { (f.re, e) = regex->compile(string m.data, 0); if (e != nil) return ref Rmsg.Error(m.tag, e); } } return ref Rmsg.Write(m.tag, len m.data); Clunk => fid := srv.getfid(m.fid); if (fid == nil) return ref Rmsg.Error(m.tag, "bad fid"); if (fid.path >= Qpost){ f := files.find(int fid.path); if (f != nil && f.orclose){ f.abort(); tree.remove(fid.path); files.del(int fid.path); } } srv.delfid(fid); return ref Rmsg.Clunk(m.tag); Flush => File.flush(m.oldtag); return ref Rmsg.Flush(m.tag); * => return nil; } } fs(pidc: chan of int, fd: ref FD) { styx->init(); styxs->init(styx); user = getenv("user"); if (user == nil) user = readall("/dev/user"); if (pidc != nil) pidc <-= pctl(FORKNS|NEWPGRP|NEWFD, list of {0,1,2,fd.fd}); else pctl(NEWPGRP, nil); stderr = fildes(2); # lost by pctl (tree, navc) := nametree->start(); nav := Navigator.new(navc); (reqc, s) := Styxserver.new(fd, nav, Qroot); srv = s; tree.create(Qroot, newdir(".", DMDIR|8r775, Qroot)); tree.create(Qroot, newdir("post", 8r220, Qpost)); nullfile: ref File; files = Table[ref File].new(103, nullfile); for (;;) { req := <-reqc; if (req == nil) break; rep := fsreq(srv, tree, req); if (rep == nil) { if (tagof(req) != tagof(Tmsg.Read)) # read replies are async (events) srv.default(req); } else srv.reply(rep); } tree.quit(); kill(pctl(0, nil),"killgrp"); # be sure to quit } init(nil: ref Draw->Context, args: list of string) { sys = load Sys Sys->PATH; err = load Error Error->PATH; err->init(); str = checkload(load String String->PATH, String->PATH); styx = checkload(load Styx Styx->PATH, Styx->PATH); styxs = checkload(load Styxservers Styxservers->PATH, Styxservers->PATH); nametree = checkload(load Nametree Nametree->PATH, Nametree->PATH); nametree->init(); daytime = checkload(load Daytime Daytime->PATH, Daytime->PATH); tbl = checkload(load Tbl Tbl->PATH, Tbl->PATH); env = checkload(load Env Env->PATH, Env->PATH); daytime = checkload(load Daytime Daytime->PATH, Daytime->PATH); regex = checkload(load Regex Regex->PATH, Regex->PATH); arg = checkload(load Arg Arg->PATH, Arg->PATH); arg->init(args); arg->setusage("o/ports [-abcd] [-q n] [-m mnt]"); mnt: string; flag := MREPL|MCREATE; while((opt := arg->opt()) != 0) { case opt{ 'b' => flag = MBEFORE; 'a' => flag = MAFTER; 'c' => flag |= MCREATE; 'm' => mnt = arg->earg(); 'd' => debug = 1; styxs->traceset(1); 'q' => nevs = int arg->earg(); nevs ++; if (nevs < 3) nevs = 3; if (nevs > 10000) nevs = 10000; * => usage(); } } args = arg->argv(); if (len args != 0) usage(); if (mnt == nil) fs(nil, fildes(0)); else { pfds := array[2] of ref FD; if (pipe(pfds) < 0) error(sprint("o/ports: pipe: %r")); pidc := chan of int; spawn fs(pidc, pfds[0]); <-pidc; if (mount(pfds[1], nil, mnt, flag, nil) < 0) error(sprint("o/ports: mount: %r")); pfds[0] = nil; } }