implement Pgcfs; # The Dbfs code I started with as a template said : # Copyright © 1999 Vita Nuova Limited. All rights reserved. # Revisions copyright © 2002 Vita Nuova Holdings Limited. All rights reserved. # Ok well, I raise you Postgresql catalogue tables as the datastore matt@proweb.co.uk is this GPL now ? Tis a bit confusing. I'm sure you don't mind. #pgcfs && cat /mnt/db/1 mk && ./pgcfs && ls -l /mnt/db/proc/ | grep www # mk && ./pgcfs && ls /mnt/db/user/www/proc debug : con 0; include "sys.m"; sys: Sys; Qid: import Sys; include "draw.m"; include "arg.m"; include "styx.m"; styx: Styx; Tmsg, Rmsg: import styx; include "styxservers.m"; styxservers: Styxservers; Fid, Styxserver, Navigator, Navop: import styxservers; Enotfound, Eperm, Ebadarg: import styxservers; include "bufio.m"; bufio: Bufio; Iobuf: import bufio; include "keyring.m"; keyring: Keyring; include "pgbase.m"; pgbase :PgBase; Connection : import pgbase; include "pg_catalogue.m"; pgcatalogue : PgCatalogue; Catalogue : import pgcatalogue; # XXX Record: adt { id: int; # file number in directory x: int; # index in file dirty: int; # modified but not written vers: int; # version data: array of byte; new: fn(x: array of byte): ref Record; print: fn(r: self ref Record, fd: ref Sys->FD); qid: fn(r: self ref Record): Sys->Qid; }; # XXX Database: adt { name: string; file: ref Iobuf; records: array of ref Record; dirty: int; vers: int; nextid: int; findrec: fn(db: self ref Database, id: int): ref Record; }; Pgcfs: module { init: fn(nil: ref Draw->Context, nil: list of string); }; Qdir, Qnew, Qdata, Qproc, Qprocdir, Quserdir, Quser, Qsysid, Quserprocdir, Quserproc: con iota; clockfd: ref Sys->FD; stderr: ref Sys->FD; database: ref Database; err_chan : chan of string; user: string; Eremoved: con "file removed"; pg_cat : ref Catalogue; usage() { sys->fprint(stderr, "Usage: dbfs [-a|-b|-ac|-bc] [-D] file mountpoint\n"); raise "fail:usage"; } nomod(s: string) { sys->fprint(stderr, "dbfs: can't load %s: %r\n", s); raise "fail:load"; } open_file(file : string, empty : int) : ref Iobuf { df := bufio->open(file, Sys->OREAD); if(df == nil && empty){ (rc, nil) := sys->stat(file); if(rc < 0) df = bufio->create(file, Sys->OREAD, 8r600); } if(df == nil){ sys->fprint(stderr, "dbfs: can't open %s: %r\n", file); raise "fail:open"; } return df; } create_pipes() : array of ref Sys->FD { fds := array[2] of ref Sys->FD; if(sys->pipe(fds) < 0){ sys->fprint(stderr, "dbfs: can't create pipe: %r\n"); raise "fail:pipe"; } return fds; } init(nil: ref Draw->Context, args: list of string) { sys = load Sys Sys->PATH; sys->pctl(Sys->FORKFD|Sys->NEWPGRP, nil); stderr = sys->fildes(2); styx = load Styx Styx->PATH; if(styx == nil) nomod(Styx->PATH); styx->init(); styxservers = load Styxservers Styxservers->PATH; if(styxservers == nil) nomod(Styxservers->PATH); styxservers->init(styx); bufio = load Bufio Bufio->PATH; if(bufio == nil) nomod(Bufio->PATH); arg := load Arg Arg->PATH; if(arg == nil) nomod(Arg->PATH); arg->init(args); err_chan = chan of string; spawn logger(); pgcatalogue = load PgCatalogue "pg_catalogue.dis"; conn := pgcatalogue->new_connection("127.0.0.1", "5432", "www", "", "study", nil, nil); pg_cat = pgcatalogue->new_catalogue(conn); if(pg_cat == nil) raise "catalogue creation failed"; pg_cat.err_chan = err_chan; pg_cat.sync(); flags := Sys->MREPL; flags |= Sys->MCREATE; mountpt := "/mnt/db"; sys->pctl(Sys->FORKFD, nil); user = rf("/dev/user"); if(user == nil) user = "inferno"; fds := create_pipes(); navops := chan of ref Navop; spawn navigator(navops); (tchan, srv) := Styxserver.new(fds[0], Navigator.new(navops), big Qdir); fds[0] = nil; pidc := chan of int; spawn serveloop(tchan, srv, pidc, navops); <-pidc; if(sys->mount(fds[1], nil, mountpt, flags, nil) < 0) { sys->fprint(stderr, "dbfs: mount failed: %r\n"); raise "fail:mount"; } } logger() { txt : string; while((txt = <- err_chan) != nil) { if(debug) sys->print("%s\n", txt); } if(debug) sys->print("Closing\n"); } rf(f: string): string { fd := sys->open(f, Sys->OREAD); if(fd == nil) return nil; b := array[Sys->NAMEMAX] of byte; n := sys->read(fd, b, len b); if(n < 0) return nil; return string b[0:n]; } #XXX dbread(db: ref Database): (ref Database, string) { db.file.seek(big 0, Sys->SEEKSTART); rl: list of ref Record; n := 0; for(;;){ (r, err) := getrec(db); if(err != nil) return (nil, err); # could press on without it, or make it the `file' contents if(r == nil) break; rl = r :: rl; n++; } db.nextid = n; db.records = array[n] of ref Record; for(; rl != nil; rl = tl rl){ r := hd rl; n--; r.id = n; r.x = n; db.records[n] = r; } return (db, nil); } #XXX getrec(db: ref Database): (ref Record, string) { r := ref Record(-1, -1, 0, 0, nil); data := ""; for(;;){ s := db.file.gets('\n'); if(s == nil){ if(data == nil) return (nil, nil); # BUG: distinguish i/o error from EOF? break; } if(s[len s - 1] != '\n') # return (nil, "file missing newline"); # possibly truncated s += "\n"; if(s == "\n") break; data += s; } r.data = array of byte data; return (r, nil); } #XXX dbsync(db: ref Database): int { if(db.dirty){ db.file = bufio->create(db.name, Sys->OWRITE, 8r666); if(db.file == nil) return -1; for(i := 0; i < len db.records; i++){ r := db.records[i]; if(r != nil && r.data != nil){ if(db.file.write(r.data, len r.data) != len r.data) return -1; db.file.putc('\n'); } } if(db.file.flush()) return -1; db.file = nil; db.dirty = 0; } return 0; } dbprint(db: ref Database) { stdout := sys->fildes(1); for(i := 0; i < len db.records; i++){ db.records[i].print(stdout); sys->print("\n"); } } Database.findrec(db: self ref Database, id: int): ref Record { for(i:=0; iFD) { if(r.data != nil) sys->write(fd, r.data, len r.data); } Record.qid(r: self ref Record): Sys->Qid { return Sys->Qid(QPATH(r.x, Qdata), r.vers, Sys->QTFILE); } serveloop(tchan: chan of ref Tmsg, srv: ref Styxserver, pidc: chan of int, navops: chan of ref Navop) { pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWFD, 1::2::srv.fd.fd::nil); Serve: while((gm := <-tchan) != nil){ pick m := gm { Readerror => sys->fprint(stderr, "dbfs: fatal read error: %s\n", m.error); break Serve; Open => c := srv.getfid(m.fid); if(c == nil || TYPE(c.path) != Qnew){ srv.open(m); # default action break; } if(c.uname != user) { srv.reply(ref Rmsg.Error(m.tag, Eperm)); break; } mode := styxservers->openmode(m.mode); if(mode < 0) { srv.reply(ref Rmsg.Error(m.tag, Ebadarg)); break; } # generate new file, change Fid's qid to match r := Record.new(array[0] of byte); qid := r.qid(); c.open(mode, qid); srv.reply(ref Rmsg.Open(m.tag, qid, srv.iounit())); Read => (c, err) := srv.canread(m); if(c == nil){ srv.reply(ref Rmsg.Error(m.tag, err)); break; } if(c.qtype & Sys->QTDIR){ srv.read(m); # does readdir break; } r := database.records[FILENO(c.path)]; if(r == nil) srv.reply(ref Rmsg.Error(m.tag, Eremoved)); else srv.reply(styxservers->readbytes(m, r.data)); Write => (c, merr) := srv.canwrite(m); if(c == nil){ srv.reply(ref Rmsg.Error(m.tag, merr)); break; } (value, err) := data2rec(m.data); if(err != nil){ srv.reply(ref Rmsg.Error(m.tag, err)); break; } fno := FILENO(c.path); r := database.records[fno]; if(r == nil){ srv.reply(ref Rmsg.Error(m.tag, Eremoved)); break; } r.data = value; r.vers++; database.dirty++; if(dbsync(database) == 0) srv.reply(ref Rmsg.Write(m.tag, len m.data)); else srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); Clunk => # a transaction-oriented dbfs could delay updating the record until clunk srv.clunk(m); Remove => c := srv.getfid(m.fid); if(c == nil || c.qtype & Sys->QTDIR || TYPE(c.path) != Qdata){ # let it diagnose all the errors srv.remove(m); break; } r := database.records[FILENO(c.path)]; if(r != nil) r.data = nil; database.dirty++; srv.delfid(c); if(dbsync(database) == 0) srv.reply(ref Rmsg.Remove(m.tag)); else srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r"))); Wstat => srv.default(gm); # TO DO? * => srv.default(gm); } } navops <-= nil; # shut down navigator } dirslot(n: int): int { for(i := 0; i < len database.records; i++){ r := database.records[i]; if(r != nil && r.data != nil){ if(n == 0) return i; n--; } } return -1; } # # a record is (.+\n)*, without final empty line # data2rec(data: array of byte): (array of byte, string) { s: string; for(b := data; len b > 0;){ (b, s) = getline(b); if(s == nil || s[len s - 1] != '\n' || s == "\n") return (nil, "partial or malformed record"); # possibly truncated } return (data, nil); } getline(b: array of byte): (array of byte, string) { n := len b; for(i := 0; i < n; i++){ (ch, l, nil) := sys->byte2char(b, i); i += l; if(l == 0 || ch == '\n') break; } return (b[i:], string b[0:i]); } serveloopX(tchan: chan of ref Tmsg, srv: ref Styxserver, pidc: chan of int, navops: chan of ref Navop) { pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWFD, 1::2::srv.fd.fd::nil); Serve: while((gm := <-tchan) != nil){ err_chan <- = "Serve"; pick m := gm { Readerror => err_chan <- = "Readerror"; sys->fprint(stderr, "dbfs: fatal read error: %s\n", m.error); break Serve; Open => err_chan <- = "Open"; c := srv.getfid(m.fid); if(c == nil){ srv.open(m); # default action err_chan <- = "Open"; break; } err_chan <- = "Opened"; Read => err_chan <- = "Read"; (c, err) := srv.canread(m); if(c == nil){ srv.reply(ref Rmsg.Error(m.tag, err)); break; } if(c.qtype & Sys->QTDIR){ srv.read(m); # does readdir break; } case TYPE(c.path) { Qproc => err_chan <- = "Qproc"; pid := FILENO(c.path); if(pid < len pg_cat.procs) { srv.reply(styxservers->readbytes(m, array of byte pg_cat.proc_sql(pid))); } else { srv.reply(ref Rmsg.Error(m.tag, Eremoved)); } * => err_chan <- = "*"; srv.reply(nil); } Write => err_chan <- = "Write"; (c, merr) := srv.canwrite(m); if(c == nil){ srv.reply(ref Rmsg.Error(m.tag, merr)); break; } case TYPE(c.path) { Qproc => err_chan <- = "Qproc"; pid := FILENO(c.path); if(pid < len pg_cat.procs) { srv.reply(ref Rmsg.Write(m.tag, len m.data)); } else { srv.reply(ref Rmsg.Error(m.tag, Eremoved)); } * => err_chan <- = "*"; srv.reply(nil); } Clunk => err_chan <- = "Clunk"; # a transaction-oriented dbfs could delay updating the record until clunk srv.clunk(m); Remove => err_chan <- = "Remove"; c := srv.getfid(m.fid); if(c == nil || c.qtype & Sys->QTDIR){ srv.remove(m); break; } case TYPE(c.path) { Qproc => err_chan <- = "Qproc"; pid := FILENO(c.path); if(pg_cat.drop_proc(pid)) { srv.reply(ref Rmsg.Remove(m.tag)); } else { srv.reply(ref Rmsg.Error(m.tag, "Drop failed")); } * => err_chan <- = "*"; srv.reply(nil); } Wstat => err_chan <- = "Wstat"; srv.default(gm); # TO DO? * => err_chan <- = "Default"; srv.default(gm); } err_chan <- = "Served"; } err_chan <- = nil; navops <-= nil; # shut down navigator pgcatalogue->disconnect(pg_cat.conn); } dir(qid: Sys->Qid, name: string, length: big, uid: string, perm, mtime: int): ref Sys->Dir { d := ref sys->zerodir; d.qid = qid; if(qid.qtype & Sys->QTDIR) perm |= Sys->DMDIR; d.mode = perm; d.name = name; d.uid = uid; d.gid = uid; d.length = length; d.mtime = mtime; return d; } proc_dirname(id : int) : string { p := pg_cat.procs[id]; dirname := p.name + "+"; for(i := 0; i < p.nargs; i++) { if(p.argnames != nil && i < len p.argnames) dirname += p.argnames[i] + "-"; if(p.argtypes != nil && i < len p.argtypes) dirname += pg_cat.type_name(p.argtypes[i]); if(i < p.nargs - 1) dirname += ","; } return dirname; } dirgen(p: big): (ref Sys->Dir, string) { case TYPE(p) { Qdir => return (dir(Qid(QPATH(0, Qdir), 0, Sys->QTDIR), "/", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil); Qprocdir => return (dir(Qid(QPATH(0, Qprocdir), 0, Sys->QTDIR), "proc", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil); Qproc => id := FILENO(p); src := pg_cat.proc_sql(id); return (dir(Qid(QPATH(id, Qproc), 0, Sys->QTFILE), proc_dirname(id), big len src, pg_cat.user_name(pg_cat.procs[id].owner), 8r444, pg_cat.last_sync), nil); Quserprocdir => sysid := FILENO(p); return (dir(Qid(QPATH(sysid, Quserprocdir), 0, Sys->QTDIR), "proc", big 0, pg_cat.user_name(sysid), 8r555, pg_cat.last_sync), nil); Quserdir => return (dir(Qid(QPATH(0, Quserdir), 0, Sys->QTDIR), "user", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil); Quser => sysid := FILENO(p); return (dir(Qid(QPATH(sysid, Quser), 0, Sys->QTDIR), pg_cat.user_name(sysid), big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil); Qsysid => sysid := FILENO(p); return (dir(Qid(QPATH(sysid, Quser), 0, Sys->QTFILE), "sysid", big len string sysid, pg_cat.users[0].name, 8r444, pg_cat.last_sync), nil); * => err_chan <- = "dirgen *"; return (nil, Enotfound); } } navigator(navops: chan of ref Navop) { while((m := <-navops) != nil){ err_chan <-= "Navops"; pick n := m { Stat => err_chan <-= "Stat"; n.reply <-= dirgen(n.path); Walk => err_chan <-= "Walk to " + n.name; case TYPE(n.path) { Qdir => err_chan <-= " From Qdir"; case n.name { ".." => ; # nop "proc" => n.path = QPATH(0, Qprocdir); n.reply <-= dirgen(n.path); "user" => n.path = QPATH(0, Quserdir); n.reply <-= dirgen(n.path); * => n.reply <-= (nil, Enotfound); } Qprocdir => err_chan <-= " From Qprocdir"; case n.name { ".." => n.path = QPATH(0, Qdir); n.reply <-= dirgen(n.path); * => (numbits, bits) := sys->tokenize(n.name, "+"); err_chan <- = "numbits " + string numbits + " bits[0] " + hd bits; (numargs, args) := sys->tokenize(hd tl bits, ","); err_chan <- = "numargs " + string numargs; if(numargs > 0) err_chan <- = "args[0] " + hd args; n.reply <-= (nil, Enotfound); } Quserdir => err_chan <-= " From Quserdir"; case n.name { ".." => n.path = QPATH(0, Qdir); n.reply <-= dirgen(n.path); * => (nil, sysid) := pg_cat.user_sysid(n.name); if(sysid > 0) { n.path = QPATH(sysid, Quser); n.reply <-= dirgen(n.path); } else { n.reply <-= (nil, Enotfound); } } Quser => err_chan <-= " From Quser"; case n.name { ".." => n.path = QPATH(0, Quserdir); n.reply <-= dirgen(n.path); "sysid" => n.path = QPATH(FILENO(n.path), Qsysid); n.reply <-= dirgen(n.path); "proc" => n.path = QPATH(FILENO(n.path), Quserprocdir); n.reply <-= dirgen(n.path); * => n.reply <-= (nil, Enotfound); } * => err_chan <-= " From *"; n.reply <-= (nil, "not a directory"); } Readdir => err_chan <-= "Readdir"; i := n.offset; case TYPE(m.path) { Qprocdir => err_chan <-= "Qprocdir"; if(pg_cat.procs == nil) pg_cat.fill_procs(); if(pg_cat.procs != nil) { for(; --n.count >= 0 && i < len pg_cat.procs; i++) { n.reply <-= dirgen(QPATH(i, Qproc)); # n² but the file will be small } } Quserprocdir => if(pg_cat.procs == nil) pg_cat.fill_procs(); if(pg_cat.procs != nil) { sysid := FILENO(m.path); offset := i; for(k := 0 ; n.count > 0 && k < len pg_cat.procs; k++) { if(pg_cat.procs[k].owner == sysid) { if(offset == 0) { n.reply <-= dirgen(QPATH(k, Qproc)); # n² but the file will be small n.count--; } else { offset--; } } } } Quserdir => err_chan <-= "Quserdir"; if(pg_cat.users == nil) pg_cat.fill_users(); if(pg_cat.users != nil) { for(; --n.count >= 0 && i < len pg_cat.users; i++) { n.reply <-= dirgen(QPATH(pg_cat.users[i].sysid, Quser)); } } Quser => err_chan <-= "Quser"; if(i == 0 && n.count > 0) { n.reply <-= dirgen(QPATH(0,Qsysid)); i++; --n.count; } if(i == 1 && n.count-- > 0) n.reply <-= dirgen(QPATH(0,Qprocdir)); Qdir => err_chan <-= "Qdir"; if(i == 0 && n.count > 0) { n.reply <-= dirgen(QPATH(0,Qprocdir)); i++; --n.count; } if(i == 1 && n.count-- > 0) n.reply <-= dirgen(QPATH(0,Quserdir)); * => err_chan <-= "Not Dir"; n.reply <-= (nil, "not a directory"); } n.reply <-= (nil, nil); } err_chan <-= "Navigated"; } } QPATH(w, q: int): big { return big ((w<<8)|q); } TYPE(path: big): int { return int path & 16rFF; } FILENO(path: big) : int { return (int path >> 8) & 16rFFFFFF; }