## diffname gnot/stream.c 1990/03091 ## diff -e /dev/null /n/bootesdump/1990/03091/sys/src/9/68020/stream.c 0a #include "u.h" #include "lib.h" #include "mem.h" #include "dat.h" #include "fns.h" #include "io.h" #include "errno.h" #include "devtab.h" static void stputq(Queue*, Block*); Qinfo procinfo = { stputq, nullput, 0, 0, "process" } ; /*extern Qinfo noetherinfo; */ static Qinfo *lds[] = { /* &noetherinfo, */ 0 }; enum { Nclass=4, }; /* * All stream structures are ialloc'd at boot time */ Stream *slist; Queue *qlist; Block *blist; static Lock garbagelock; /* * The block classes. There are Nclass block sizes, each with its own free list. * All are ialloced at qinit() time. */ typedef struct { int size; Queue; } Bclass; Bclass bclass[Nclass]={ { 0 }, { 64 }, { 512 }, { 4096 }, }; /* * Allocate streams, queues, and blocks. Allocate n block classes with * 1/2(m+1) to class m < n-1 * 1/2(n-1) to class n-1 */ void streaminit(void) { int class, i, n; Block *bp; Bclass *bcp; slist = (Stream *)ialloc(conf.nstream * sizeof(Stream), 0); qlist = (Queue *)ialloc(conf.nqueue * sizeof(Queue), 0); blist = (Block *)ialloc(conf.nblock * sizeof(Block), 0); bp = blist; n = conf.nblock; for(class = 0; class < Nclass; class++){ if(class < Nclass-1) n = n/2; bcp = &bclass[class]; for(i = 0; i < n; i++) { if(bcp->size) bp->base = (uchar *)ialloc(bcp->size, 0); bp->lim = bp->base + bcp->size; bp->flags = class; freeb(bp); bp++; } } } /* * allocate a block */ static int isblock(void *arg) { Bclass *bcp; bcp = (Bclass *)arg; return bcp->first!=0; } Block * allocb(ulong size) { Block *bp; Bclass *bcp; int i; /* * map size to class */ for(bcp=bclass; bcp->sizefirst == 0){ unlock(bcp); print("waiting for blocks\n"); sleep(&bcp->r, isblock, (void *)bcp); lock(bcp); } bp = bcp->first; bcp->first = bp->next; if(bcp->first == 0) bcp->last = 0; unlock(bcp); /* * return an empty block */ bp->rptr = bp->wptr = bp->base; bp->next = 0; bp->type = M_DATA; bp->flags &= S_CLASS; return bp; } /* * Free a block. Poison its pointers so that someone trying to access * it after freeing will cause a dump. */ void freeb(Block *bp) { Bclass *bcp; bcp = &bclass[bp->flags & S_CLASS]; bp->rptr = bp->wptr = 0; lock(bcp); if(bcp->first) bcp->last->next = bp; else bcp->first = bp; bcp->last = bp; bp->next = 0; wakeup(&bcp->r); unlock(bcp); } /* * allocate a pair of queues. flavor them with the requested put routines. * the `QINUSE' flag on the read side is the only one used. */ static Queue * allocq(Qinfo *qi) { Queue *q, *wq; for(q=qlist; q<&qlist[conf.nqueue]; q++, q++) { if(q->flag == 0){ if(canlock(q)){ if(q->flag == 0) break; unlock(q); } } } if(q == &qlist[conf.nqueue]){ print("no more queues\n"); error(0, Enoqueue); } q->flag = QINUSE; q->r.p = 0; q->info = qi; q->put = qi->iput; wq = q->other = q + 1; wq->r.p = 0; wq->info = qi; wq->put = qi->oput; wq->other = q; unlock(q); return q; } /* * free a queue */ static void freeq(Queue *q) { Block *bp; q = RD(q); while(bp = getq(q)) freeb(bp); q = WR(q); while(bp = getq(q)) freeb(bp); RD(q)->flag = 0; } /* * push a queue onto a stream referenced by the proc side write q */ Queue * pushq(Stream* s, Qinfo *qi) { Queue *q; Queue *nq; q = RD(s->procq); /* * make the new queue */ nq = allocq(qi); /* * push */ RD(nq)->next = q; RD(WR(q)->next)->next = RD(nq); WR(nq)->next = WR(q)->next; WR(q)->next = WR(nq); if(qi->open) (*qi->open)(RD(nq), s); return WR(nq)->next; } /* * pop off the top line discipline */ static void popq(Stream *s) { Queue *q; if(s->procq->next == WR(s->devq)) error(0, Ebadld); q = s->procq->next; if(q->info->close) (*q->info->close)(RD(q)); s->procq->next = q->next; RD(q->next)->next = RD(s->procq); freeq(q); } /* * add a block (or list of blocks) to the end of a queue. return true * if one of the blocks contained a delimiter. */ int putq(Queue *q, Block *bp) { int delim; delim = 0; lock(q); if(q->first) q->last->next = bp; else q->first = bp; q->len += bp->wptr - bp->rptr; delim = bp->flags & S_DELIM; while(bp->next) { bp = bp->next; q->len += bp->wptr - bp->rptr; delim |= bp->flags & S_DELIM; } q->last = bp; if(q->len >= Streamhi) q->flag |= QHIWAT; unlock(q); return delim; } int putb(Blist *q, Block *bp) { int delim; delim = 0; if(q->first) q->last->next = bp; else q->first = bp; q->len += bp->wptr - bp->rptr; delim = bp->flags & S_DELIM; while(bp->next) { bp = bp->next; q->len += bp->wptr - bp->rptr; delim |= bp->flags & S_DELIM; } q->last = bp; bp->next = 0; return delim; } /* * add a block to the start of a queue */ static void putbq(Blist *q, Block *bp) { lock(q); if(q->first) bp->next = q->first; else q->last = bp; q->first = bp; q->len += bp->wptr - bp->rptr; unlock(q); } /* * remove the first block from a queue */ Block * getq(Queue *q) { Block *bp; lock(q); bp = q->first; if(bp) { q->first = bp->next; if(q->first == 0) q->last = 0; q->len -= bp->wptr - bp->rptr; if((q->flag&QHIWAT) && q->len < Streamhi/2){ wakeup(&q->other->next->other->r); q->flag &= ~QHIWAT; } bp->next = 0; } unlock(q); return bp; } Block * getb(Blist *q) { Block *bp; bp = q->first; if(bp) { q->first = bp->next; if(q->first == 0) q->last = 0; q->len -= bp->wptr - bp->rptr; bp->next = 0; } return bp; } /* * put a block into the bit bucket */ void nullput(Queue *q, Block *bp) { freeb(bp); error(0, Ehungup); } /* * find the info structure for line discipline 'name' */ static Qinfo * qinfofind(char *name) { Qinfo **qip; if(name == 0) error(0, Ebadld); for(qip = lds; *qip; qip++) if(strcmp((*qip)->name, name)==0) return *qip; error(0, Ebadld); } /* * send a hangup up a stream */ static void hangup(Stream *s) { Block *bp; bp = allocb(0); bp->type = M_HANGUP; (*s->devq->put)(s->devq, bp); } /* * parse a string and return a pointer to the second element if the * first matches name. bp->rptr will be updated to point to the * second element. * * return 0 if no match. * * it is assumed that the block data is null terminated. streamwrite * guarantees this. */ int streamparse(char *name, Block *bp) { int len; len = strlen(name); if(bp->wptr - bp->rptr < len) return 0; if(strncmp(name, (char *)bp->rptr, len)==0){ if(bp->rptr[len] == ' ') bp->rptr += len+1; else if(bp->rptr[len]) return 0; else bp->rptr += len; return 1; } return 0; } /* * the per stream directory structure */ Dirtab streamdir[]={ "data", Sdataqid, 0, 0600, "ctl", Sctlqid, 0, 0600, }; /* * A stream device consists of the contents of streamdir plus * any directory supplied by the actual device. * * values of s: * 0 to ntab-1 apply to the auxiliary directory. * ntab to ntab+Shighqid-Slowqid+1 apply to streamdir. */ int streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp) { Proc *p; char buf[NAMELEN]; if(s < ntab) tab = &tab[s]; else if(s < ntab + Shighqid - Slowqid + 1) tab = &streamdir[s - ntab]; else return -1; devdir(c, STREAMQID(STREAMID(c->qid),tab->qid), tab->name, tab->length, tab->perm, dp); return 1; } /* * create a new stream */ Stream * streamnew(Chan *c, Qinfo *qi) { Stream *s; Queue *q; /* * find a free stream struct */ for(s = slist; s < &slist[conf.nstream]; s++) { if(s->inuse == 0){ if(canlock(s)){ if(s->inuse == 0) break; unlock(s); } } } if(s == &slist[conf.nstream]){ print("no more streams\n"); error(0, Enostream); } if(waserror()){ unlock(s); streamclose(c); nexterror(); } /* * marry a stream and a channel */ if(c){ c->stream = s; s->type = c->type; s->dev = c->dev; s->id = STREAMID(c->qid); } else s->type = -1; /* * hang a device and process q off the stream */ s->inuse = 1; s->tag[0] = 0; q = allocq(&procinfo); s->procq = WR(q); q = allocq(qi); s->devq = RD(q); WR(s->procq)->next = WR(s->devq); RD(s->procq)->next = 0; RD(s->devq)->next = RD(s->procq); WR(s->devq)->next = 0; if(qi->open) (*qi->open)(RD(s->devq), s); c->flag |= COPEN; unlock(s); poperror(); return s; } /* * (Re)open a stream. If this is the first open, create a stream. */ void streamopen(Chan *c, Qinfo *qi) { Stream *s; Queue *q; /* * if the stream already exists, just up the reference count. */ for(s = slist; s < &slist[conf.nstream]; s++) { if(s->inuse && s->type == c->type && s->dev == c->dev && s->id == STREAMID(c->qid)){ lock(s); if(s->inuse && s->type == c->type && s->dev == c->dev && s->id == STREAMID(c->qid)){ s->inuse++; c->stream = s; unlock(s); return; } unlock(s); } } /* * create a new stream */ streamnew(c, qi); } /* * On the last close of a stream, for each queue on the * stream release its blocks and call its close routine. */ void streamclose(Chan *c) { Queue *q, *nq; Block *bp; Stream *s = c->stream; /* * if not open, ignore it */ if(!(c->flag & COPEN)) return; /* * decrement the reference cound */ lock(s); if(s->inuse != 1){ s->inuse--; unlock(c->stream); return; } /* * descend the stream closing the queues */ for(q = s->procq; q; q = q->next){ if(q->info->close) (*q->info->close)(q->other); if(q == s->devq->other) break; } /* * ascend the stream freeing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; freeq(q); } s->id = s->dev = s->type = 0; s->inuse--; unlock(s); } /* * put a block to be read into the queue. wakeup any waiting reader */ void stputq(Queue *q, Block *bp) { int i; if(bp->type == M_HANGUP){ freeb(bp); q->flag |= QHUNGUP; q->other->flag |= QHUNGUP; } else { lock(q); if(q->first) q->last->next = bp; else q->first = bp; q->last = bp; q->len += bp->wptr - bp->rptr; if(q->len >= Streamhi) q->flag |= QHIWAT; unlock(q); } wakeup(&q->r); } /* * read a string. update the offset accordingly. */ long stringread(Chan *c, uchar *buf, long n, char *str) { long i; i = strlen(str); i -= c->offset; if(ioffset, str, n); c->offset += n; return n; } /* * return true if there is an output buffer available */ static int isinput(void *x) { return ((Queue *)x)->first != 0; } /* * read until we fill the buffer or until a DELIM is encountered */ long streamread(Chan *c, void *vbuf, long n) { Block *bp; Stream *s; Queue *q; long rv = 0; int left, i, x; uchar *buf = vbuf; char num[32]; s = c->stream; switch(STREAMTYPE(c->qid)){ case Sdataqid: break; case Sctlqid: sprint(num, "%d", s->id); return stringread(c, buf, n, num); default: if(CHDIR & c->qid) return devdirread(c, vbuf, n, 0, 0, streamgen); else panic("streamread"); } /* * one reader at a time */ qlock(&s->rdlock); if(waserror()){ qunlock(&s->rdlock); nexterror(); } /* * sleep till data is available */ q = RD(s->procq); left = n; while(left){ bp = getq(q); if(bp == 0){ if(q->flag & QHUNGUP) break; sleep(&q->r, &isinput, (void *)q); continue; } i = bp->wptr - bp->rptr; if(i <= left){ memcpy(buf, bp->rptr, i); left -= i; buf += i; if(bp->flags & S_DELIM){ freeb(bp); break; } else freeb(bp); } else { memcpy(buf, bp->rptr, left); bp->rptr += left; putbq(q, bp); left = 0; } }; qunlock(&s->rdlock); poperror(); return n - left; } /* * Handle a ctl request. Streamwide requests are: * * hangup -- send an M_HANGUP up the stream * push ldname -- push the line discipline named ldname * pop -- pop a line discipline * * This routing is entrered with s->wrlock'ed and must unlock. */ static long streamctlwrite(Stream *s, void *a, long n) { Qinfo *qi; Block *bp; /* * package */ bp = allocb(n+1); memcpy(bp->wptr, a, n); bp->wptr[n] = 0; bp->wptr += n + 1; /* * check for standard requests */ if(streamparse("hangup", bp)){ hangup(s); freeb(bp); } else if(streamparse("push", bp)){ qi = qinfofind((char *)bp->rptr); pushq(s, qi); freeb(bp); } else if(streamparse("pop", bp)){ popq(s); freeb(bp); } else { bp->type = M_CTL; bp->flags |= S_DELIM; PUTNEXT(s->procq, bp); } return n; } /* * wait till there's room in the next stream */ static int notfull(void *arg) { Queue *q; q = (Queue *)arg; return q->len < Streamhi; } void flowctl(Queue *q) { if(q->next->len >= Streamhi) sleep(&q->r, notfull, q->next); } /* * send the request as a single delimited block */ long streamwrite(Chan *c, void *a, long n) { Stream *s; Block *bp; Queue *q; long rem; int i; /* * one writer at a time */ s = c->stream; qlock(&s->wrlock); if(waserror()){ qunlock(&s->wrlock); nexterror(); } /* * decode the qid */ switch(STREAMTYPE(c->qid)){ case Sdataqid: break; case Sctlqid: n = streamctlwrite(s, a, n); qunlock(&s->wrlock); poperror(); return n; default: panic("bad stream qid\n"); } /* * No writes allowed on hungup channels */ q = s->procq; if(q->other->flag & QHUNGUP) error(0, Ehungup); if(GLOBAL(a) || n==0){ /* * `a' is global to the whole system, just create a * pointer to it and pass it on. */ flowctl(q); bp = allocb(0); bp->rptr = bp->base = (uchar *)a; bp->wptr = bp->lim = (uchar *)a+n; bp->flags |= S_DELIM; bp->type = M_DATA; PUTNEXT(q, bp); } else { /* * `a' is in the user's address space, copy it into * system buffers and pass the buffers on. */ for(rem = n; ; rem -= i) { flowctl(q); bp = allocb(rem); i = bp->lim - bp->wptr; if(i >= rem){ memcpy(bp->wptr, a, rem); bp->flags |= S_DELIM; bp->wptr += rem; bp->type = M_DATA; PUTNEXT(q, bp); break; } else { memcpy(bp->wptr, a, i); bp->wptr += i; bp->type = M_DATA; PUTNEXT(q, bp); a = ((char*)a) + i; } } } qunlock(&s->wrlock); poperror(); return n; } . ## diffname gnot/stream.c 1990/0312 ## diff -e /n/bootesdump/1990/03091/sys/src/9/68020/stream.c /n/bootesdump/1990/0312/sys/src/9/68020/stream.c 888a } /* * like andrew's getmfields but no hidden state */ int getfields(char *lp, /* to be parsed */ char **fields, /* where to put pointers */ int n, /* number of pointers */ char sep /* separator */ ) { int i; for(i=0; lp && *lp && ioffset, n); . 632d 630a q->len += BLEN(bp); while(bp->next) { bp = bp->next; q->len += BLEN(bp); } . 624a wakeup(&q->other->r); . 418c if(BLEN(bp) < len) . 363a * make sure the first block has n bytes */ Block * pullup(Block *bp, int n) { Block *nbp; int i; /* * this should almost always be true, the rest it * just for to avoid every caller checking. */ if(BLEN(bp) >= n) return bp; /* * if not enough room in the first block, * add another to the front of the list. if(bp->lim - bp->rptr < n){ nbp = allocb(n); nbp->next = bp; bp = nbp; } /* * copy bytes from the trailing blocks into the first */ n -= BLEN(bp); while(nbp = bp->next){ i = BLEN(nbp); if(i > n) { memcpy(bp->wptr, nbp->rptr, n); bp->wptr += n; nbp->rptr += n; return bp; } else { memcpy(bp->wptr, nbp->rptr, i); bp->wptr += i; bp->next = nbp->next; nbp->next = 0; freeb(nbp); } } freeb(bp); return 0; } /* * grow the front of a list of blocks by n bytes */ Block * prepend(Block *bp, int n) { Block *nbp; if(bp->base && (bp->rptr - bp->base)>=n){ /* * room for channel number in first block of message */ bp->rptr -= n; return bp; } else { /* * make new block, put message number at end */ nbp = allocb(2); nbp->next = bp; nbp->wptr = nbp->lim; nbp->rptr = nbp->wptr - n; return nbp; } } /* . 357c q->len -= BLEN(bp); . 346a /* * remove the first block from a list of blocks */ . 337c q->len -= BLEN(bp); . 324c * remove the first block from a queue . 319c q->len += BLEN(bp); . 310c void . 303d 299c q->len += BLEN(bp); . 295c q->len += BLEN(bp); . 276c q->len += BLEN(bp); . 272c q->len += BLEN(bp); . 149a wakeup(&bcp->r); . 147,148d 145a tries = 0; while(bp->next){ if(++tries > 10){ dumpstack(); panic("freeb"); } bp = bp->next; } . 141a bp->rptr = bp->wptr = 0; . 140d 137a int tries; . 131,132c * Free a block (or list of blocks). Poison its pointers so that * someone trying to access it after freeing will cause a dump. . 111a qunlock(bcp); . 110a qlock(bcp); . 37c Blist; QLock; /* qlock for sleepers on r */ Rendez r; /* sleep here waiting for blocks */ . 15c &dkmuxinfo, &urpinfo, . 13a /* * line disciplines that can be pushed * * WARNING: this table should be the result of configuration */ extern Qinfo noetherinfo; extern Qinfo dkmuxinfo; extern Qinfo urpinfo; . 11,12c Qinfo procinfo = { stputq, nullput, 0, 0, "process" }; . 9a /* * process end line discipline */ . ## diffname gnot/stream.c 1990/0321 ## diff -e /n/bootesdump/1990/0312/sys/src/9/68020/stream.c /n/bootesdump/1990/0321/sys/src/9/68020/stream.c 742c if(delim) wakeup(&q->r); . 739a delim = 1; } . 738c if(q->len >= Streamhi){ . 735a delim |= bp->flags & S_DELIM; . 732a delim = bp->flags & S_DELIM; . 726a delim = 0; . 725a delim = 1; . 719c int delim; . 172c if(bcp->r.p) wakeup(&bcp->r); . 91d 54,55c { 68 }, { 260 }, . 33a for(i=0; ir, isblock, (void *)bcp, 250); . 127c if(loop++ > 10) panic("waiting for blocks\n"); . 114d 112c int loop=0; . ## diffname gnot/stream.c 1990/03292 ## diff -e /n/bootesdump/1990/0322/sys/src/9/68020/stream.c /n/bootesdump/1990/03292/sys/src/9/68020/stream.c 684c if(!c->stream) . 630d ## diffname gnot/stream.c 1990/0331 ## diff -e /n/bootesdump/1990/03292/sys/src/9/68020/stream.c /n/bootesdump/1990/0331/sys/src/9/68020/stream.c 828,829c if(q->flag & QHUNGUP){ if(s->hread++ < 3) break; else error(0, Ehungup); } . 616a s->hread = 0; . 127a } . 126c if(loop++ > 10){ dumpqueues(); dumpstack(); . 64a * Dump all block information of how many blocks are in which queues */ void dumpqueues(void) { Queue *q; int count; Block *bp; for(q = qlist; q < qlist + conf.nqueue; q++, q++){ if(!(q->flag & QINUSE)) continue; for(count = 0, bp = q->first; bp; bp = bp->next) count++; print("%s %ux RD count %d len %d", q->info->name, q, count, q->len); for(count = 0, bp = WR(q)->first; bp; bp = bp->next) count++; print(" WR count %d len %d\n", count, WR(q)->len); } } /* . 61c { 1024 }, . ## diffname gnot/stream.c 1990/0403 ## diff -e /n/bootesdump/1990/0331/sys/src/9/68020/stream.c /n/bootesdump/1990/0403/sys/src/9/68020/stream.c 1012c FLOWCTL(q); . 999c FLOWCTL(q); . 946,947c sleep(&q->r, notfull, q->next); . 941c return !QFULL(q->next); . 772c if(q->len >= Streamhi || q->nb >= Streambhi){ . 768a q->nb++; . 764a q->nb++; . 390c q->nb--; if((q->flag&QHIWAT) && q->len < Streamhi/2 && q->nb < Streambhi){ . 333c if(q->len >= Streamhi || q->nb >= Streambhi) . 329a q->nb++; . 325a q->nb++; . 238a wq->len = wq->nb = 0; . 232a q->len = q->nb = 0; . 150,151c print("waiting for blocks\n"); . 148c if(loop++ == 10){ . ## diffname gnot/stream.c 1990/0406 ## diff -e /n/bootesdump/1990/0403/sys/src/9/68020/stream.c /n/bootesdump/1990/0406/sys/src/9/68020/stream.c 944,947c return !QFULL((Queue *)arg); . 394c if((q->flag&QHIWAT) && q->lennbnb++; . ## diffname gnot/stream.c 1990/05313 ## diff -e /n/bootesdump/1990/0406/sys/src/9/68020/stream.c /n/bootesdump/1990/05313/sys/src/9/68020/stream.c 182a if((bp->flags&S_CLASS) >= Nclass) panic("freeb class"); . 83a print("\n"); for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){ lock(bcp); for(count = 0, bp = bcp->first; bp; count++, bp = bp->next) ; unlock(bcp); print("%d blocks of size %d\n", count, bcp->size); } print("\n"); . 82c print(" W c %d l %d f %ux\n", count, WR(q)->len, WR(q)->flag); . 79c print("%s %ux R c %d l %d f %ux", q->info->name, q, count, q->len, q->flag); . 73a print("\n"); . 72a Bclass *bcp; . 61c { 4096 }, . ## diffname gnot/stream.c 1990/0620 ## diff -e /n/bootesdump/1990/05313/sys/src/9/68020/stream.c /n/bootesdump/1990/0620/sys/src/9/68020/stream.c 757,758c /* * leave it and free it */ streamexit(s, 1); . 755c flushq(q); . 751c * ascend the stream flushing the queues . 749a . 735,736c if(s->opens != 1){ s->opens--; . 732c * decrement the reference count . 714a * Enter a stream. Increment the reference count so it can't disappear * under foot. */ int streamenter(Stream *s) { lock(s); if(s->opens == 0){ unlock(s); return -1; } s->inuse++; unlock(s); return 0; } /* * Decrement the reference count on a stream. If the count is * zero, free the stream. */ void streamexit(Stream *s, int locked) { Queue *q; Queue *nq; if(!locked) lock(s); s->inuse--; if(s->inuse != 0){ if(!locked) unlock(s); return; } /* * ascend the stream freeing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; freeq(q); } s->id = s->dev = s->type = 0; if(!locked) unlock(s); } /* . 699a s->opens++; . 662d 660a s->opens = 1; . 260a * flush a queue */ static void flushq(Queue *q) { Block *bp; q = RD(q); while(bp = getq(q)) freeb(bp); q = WR(q); while(bp = getq(q)) freeb(bp); } /* . ## diffname gnot/stream.c 1990/0621 ## diff -e /n/bootesdump/1990/0620/sys/src/9/68020/stream.c /n/bootesdump/1990/0621/sys/src/9/68020/stream.c 805,822c s->opens--; . 800,803c if(s->opens == 1){ /* * descend the stream closing the queues */ for(q = s->procq; q; q = q->next){ if(q->info->close) (*q->info->close)(q->other); /* this may be 2 streams joined device end to device end */ if(q == s->devq->other) break; } /* * ascend the stream flushing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; flushq(q); } . 766,774c s->inuse--; . 760,764c if(s->inuse == 1){ /* * ascend the stream freeing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; freeq(q); } s->id = s->dev = s->type = 0; . 534,535c if(bp->type == M_HANGUP) freeb(bp); else { freeb(bp); error(0, Ehungup); } . ## diffname gnot/stream.c 1990/0702 ## diff -e /n/bootesdump/1990/0621/sys/src/9/68020/stream.c /n/bootesdump/1990/0702/sys/src/9/68020/stream.c 248a wq->flag = QINUSE; . 85c print(" W c %d l %d f %ux r %ux\n", count, WR(q)->len, WR(q)->flag, &(WR(q)->r)); . 81,82c print("%s %ux R c %d l %d f %ux r %ux", q->info->name, q, count, q->len, q->flag, &(q->r)); . ## diffname gnot/stream.c 1990/0703 ## diff -e /n/bootesdump/1990/0702/sys/src/9/68020/stream.c /n/bootesdump/1990/0703/sys/src/9/68020/stream.c 86a dumpblocks(q, 'R'); dumpblocks(q, 'W'); . 83,85c print(" W n %d l %d f %ux r %ux\n", WR(q)->nb, WR(q)->len, WR(q)->flag, . 79,81c print("%s %ux R n %d l %d f %ux r %ux", q->info->name, q, q->nb, . 67a dumpblocks(Queue *q, char c) { Block *bp; uchar *cp; lock(q); for(bp = q->first; bp; bp = bp->next){ print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)); for(cp = bp->rptr; cpwptr && cprptr+10; cp++) print(" %uo", *cp); print("\n"); } unlock(q); } void . ## diffname gnot/stream.c 1990/0704 ## diff -e /n/bootesdump/1990/0703/sys/src/9/68020/stream.c /n/bootesdump/1990/0704/sys/src/9/68020/stream.c 100c dumpblocks(WR(q), 'W'); . ## diffname gnot/stream.c 1990/0801 ## diff -e /n/bootesdump/1990/0704/sys/src/9/68020/stream.c /n/bootesdump/1990/0801/sys/src/9/68020/stream.c 1157a } /* * stat a stream. the length is the number of bytes up to the * first delimiter. */ void streamstat(Chan *c, char *db, char *name) { Dir dir; Stream *s; Queue *q; Block *bp; long n; s = c->stream; if(s == 0) panic("streamstat"); q = RD(s->procq); lock(q); for(n=0, bp=q->first; bp; bp = bp->next){ n += BLEN(bp); if(bp->flags&S_DELIM) break; } unlock(q); devdir(c, c->qid, name, n, 0, &dir); convD2M(&dir, db); . 8a #include "fcall.h" . ## diffname gnot/stream.c 1990/08272 ## diff -e /n/bootesdump/1990/0801/sys/src/9/68020/stream.c /n/bootesdump/1990/08272/sys/src/9/68020/stream.c 988a if(left<0 || left>n){ print("streamread returns %d for a %d read\n", n-left, n); panic("streamread"); } . ## diffname gnot/stream.c 1990/0905 ## diff -e /n/bootesdump/1990/08272/sys/src/9/68020/stream.c /n/bootesdump/1990/0905/sys/src/9/68020/stream.c 234a * pad a block to the front with n bytes */ Block * padb(Block *bp, int n) { Block *nbp; if(bp->base && bp->rptr-bp->base>=n){ bp->rptr -= n; return bp; } else { nbp = allocb(n); nbp->wptr = nbp->lim; nbp->rptr = nbp->wptr - n; nbp->next = nbp; return nbp; } } /* . ## diffname gnot/stream.c 1990/09051 ## diff -e /n/bootesdump/1990/0905/sys/src/9/68020/stream.c /n/bootesdump/1990/09051/sys/src/9/68020/stream.c 249c nbp->next = bp; . ## diffname gnot/stream.c 1990/0911 ## diff -e /n/bootesdump/1990/09051/sys/src/9/68020/stream.c /n/bootesdump/1990/0911/sys/src/9/68020/stream.c 1212a } /* * Dump all block information of how many blocks are in which queues */ void dumpblocks(Queue *q, char c) { Block *bp; uchar *cp; lock(q); for(bp = q->first; bp; bp = bp->next){ print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)); for(cp = bp->rptr; cpwptr && cprptr+10; cp++) print(" %uo", *cp); print("\n"); } unlock(q); } void dumpqueues(void) { Queue *q; int count; Block *bp; Bclass *bcp; print("\n"); for(q = qlist; q < qlist + conf.nqueue; q++, q++){ if(!(q->flag & QINUSE)) continue; print("%s %ux R n %d l %d f %ux r %ux", q->info->name, q, q->nb, q->len, q->flag, &(q->r)); print(" W n %d l %d f %ux r %ux\n", WR(q)->nb, WR(q)->len, WR(q)->flag, &(WR(q)->r)); dumpblocks(q, 'R'); dumpblocks(WR(q), 'W'); } print("\n"); for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){ lock(bcp); for(count = 0, bp = bcp->first; bp; count++, bp = bp->next) ; unlock(bcp); print("%d blocks of size %d\n", count, bcp->size); } print("\n"); . 1009,1012d 589,591c for(qi = lds; qi; qi = qi->next) if(strcmp(qi->name, name)==0) return qi; . 585c Qinfo *qi; . 176,179d 162d 146a * make known a stream module and call its initialization routine, if * it has one. */ void newqinfo(Qinfo *qi) { qi->next = lds; lds = qi; if(qi->reset) (*qi->reset)(); } /* . 143a /* * make stream modules available */ streaminit0(); . 125a /* * allocate blocks, queues, and streams */ . 74,113d 65,72c #include "stream.h" . 27,39d 25c static Qinfo *lds; . 13d ## diffname gnot/stream.c 1990/0914 ## diff -e /n/bootesdump/1990/0911/sys/src/9/68020/stream.c /n/bootesdump/1990/0914/sys/src/9/68020/stream.c 1160d 1151,1158c n = 0; else { q = RD(s->procq); lock(q); for(n=0, bp=q->first; bp; bp = bp->next){ n += BLEN(bp); if(bp->flags&S_DELIM) break; } unlock(q); . 1107,1108c out: /* qunlock(&s->wrlock); poperror(); /**/ . 1056,1058c goto out; . 1046a */ . 1040,1041d 1037a s = c->stream; . ## diffname gnot/stream.c 1990/0925 ## diff -e /n/bootesdump/1990/0914/sys/src/9/68020/stream.c /n/bootesdump/1990/0925/sys/src/9/68020/stream.c 1107,1108d 1041,1049d 130c * look for a free block . ## diffname gnot/stream.c 1990/0928 ## diff -e /n/bootesdump/1990/0925/sys/src/9/68020/stream.c /n/bootesdump/1990/0928/sys/src/9/68020/stream.c 1097d 1060c if(!docopy && GLOBAL(a)){ . 1043,1051c if(STREAMTYPE(c->qid) != Sdataqid) return streamctlwrite(c, a, n); . 981a if(STREAMTYPE(c->qid) != Sctlqid) panic("streamctlwrite %lux", c->qid); s = c->stream; . 980a Stream *s; . 977c streamctlwrite(Chan *c, void *a, long n) . 919a s = c->stream; . 903,915c if(STREAMTYPE(c->qid) != Sdataqid) return streamctlread(c, vbuf, n); . 901d 898,899c int left, i; . 880a * return the stream id */ long streamctlread(Chan *c, void *vbuf, long n) { uchar *buf = vbuf; char num[32]; Stream *s; s = c->stream; if(STREAMTYPE(c->qid) == Sctlqid){ sprint(num, "%d", s->id); return stringread(c, buf, n, num); } else { if(CHDIR & c->qid) return devdirread(c, vbuf, n, 0, 0, streamgen); else panic("streamctlread"); } } /* . ## diffname gnot/stream.c 1990/0930 ## diff -e /n/bootesdump/1990/0928/sys/src/9/68020/stream.c /n/bootesdump/1990/0930/sys/src/9/68020/stream.c 908c Queue *q; q = (Queue *)x; return (q->flag&QHUNGUP) || q->first!=0; . ## diffname gnot/stream.c 1990/1009 ## diff -e /n/bootesdump/1990/0930/sys/src/9/68020/stream.c /n/bootesdump/1990/1009/sys/src/9/68020/stream.c 1177c print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' '); . 819a void streamclose(Chan *c) { /* * if no stream, ignore it */ if(!c->stream) return; streamclose1(c->stream); } . 783,788d 780d 776c streamclose1(Stream *s) . 724c c->stream = streamnew(c->type, c->dev, STREAMID(c->qid), qi, 0); . 702c * if the stream already exists, just increment the reference counts. . 673c if(noopen) s->opens = 0; else s->opens = 1; . 661,667c s->type = type; s->dev = dev; s->id = id; . 659c * identify the stream . 654c streamclose1(s); . 631c streamnew(ushort type, ushort dev, ushort id, Qinfo *qi, int noopen) . 628c * create a new stream, if noopen is non-zero, don't increment the open count . 244a wq->ptr = 0; . 237a q->ptr = 0; . ## diffname gnot/stream.c 1990/1011 ## diff -e /n/bootesdump/1990/1009/sys/src/9/68020/stream.c /n/bootesdump/1990/1011/sys/src/9/68020/stream.c 787,795c if(!waserror()){ /* * descend the stream closing the queues */ for(q = s->procq; q; q = q->next){ if(q->info->close) (*q->info->close)(q->other); /* * this may be 2 streams joined device end to device end */ if(q == s->devq->other) break; } poperror(); . ## diffname gnot/stream.c 1990/1018 ## diff -e /n/bootesdump/1990/1011/sys/src/9/68020/stream.c /n/bootesdump/1990/1018/sys/src/9/68020/stream.c 1050a qunlock(&q->rlock); . 1049a qlock(&q->rlock); . ## diffname gnot/stream.c 1990/1024 ## diff -e /n/bootesdump/1990/1018/sys/src/9/68020/stream.c /n/bootesdump/1990/1024/sys/src/9/68020/stream.c 793a RD(q)->put = nullput; WR(q)->put = nullput; . 769a return rv; . 767a rv = s->inuse; . 753a int rv; . 749c int . ## diffname gnot/stream.c 1990/1101 ## diff -e /n/bootesdump/1990/1024/sys/src/9/68020/stream.c /n/bootesdump/1990/1101/sys/src/9/68020/stream.c 797d ## diffname gnot/stream.c 1990/1104 ## diff -e /n/bootesdump/1990/1101/sys/src/9/68020/stream.c /n/bootesdump/1990/1104/sys/src/9/68020/stream.c 758a if(s->opens != 0) print("streamexit %d %s\n", s->opens, s->devq->info->name); . 754a char *name; . ## diffname gnot/stream.c 1990/11062 ## diff -e /n/bootesdump/1990/1104/sys/src/9/68020/stream.c /n/bootesdump/1990/11062/sys/src/9/68020/stream.c 135a if(waserror()){ qunlock(bcp); nexterror(); } . ## diffname gnot/stream.c 1990/1113 ## diff -e /n/bootesdump/1990/11062/sys/src/9/68020/stream.c /n/bootesdump/1990/1113/sys/src/9/68020/stream.c 1065a poperror(); . 1063a if(waserror()){ qunlock(&q->rlock); nexterror(); } . 141a poperror(); . ## diffname gnot/stream.c 1990/11151 ## diff -e /n/bootesdump/1990/1113/sys/src/9/68020/stream.c /n/bootesdump/1990/11151/sys/src/9/68020/stream.c 1009,1010d 831c qunlock(s); . 817,823c /* * ascend the stream flushing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; flushq(q); . 815a } . 814a /* * this may be 2 streams joined device end to device end */ if(q == s->devq->other) break; . 807,812d 799,803c /* * descend the stream closing the queues */ for(q = s->procq; q; q = q->next){ if(!waserror()){ . 797c qlock(s); . 780c qunlock(s); . 766c panic("streamexit %d %s\n", s->opens, s->devq->info->name); . 763c qlock(s); . 746c qunlock(s); . 742c qunlock(s); . 740c qlock(s); . 723c qunlock(s); . 720c qunlock(s); . 713c qlock(s); . 693c qunlock(s); . 660c qunlock(s); . 651c qunlock(s); . 648c if(canqlock(s)){ . 19c Qinfo procinfo = { stputq, nullput, 0, 0, "process" }; . ## diffname gnot/stream.c 1990/11161 ## diff -e /n/bootesdump/1990/11151/sys/src/9/68020/stream.c /n/bootesdump/1990/11161/sys/src/9/68020/stream.c 1015a * * This routing is entrered with s->wrlock'ed and must unlock. . 825,830c /* * ascend the stream flushing the queues */ for(q = s->devq; q; q = nq){ nq = q->next; flushq(q); } . 823d 815c WR(q)->put = nullput; . 813c poperror(); . ## diffname gnot/stream.c 1990/11211 ## diff -e /n/bootesdump/1990/11161/sys/src/9/68020/stream.c /n/bootesdump/1990/11211/sys/src/9/68020/stream.c 1106c error(Ehungup); . 1098c if(STREAMTYPE(c->qid.path) != Sdataqid) . 1026c if(STREAMTYPE(c->qid.path) != Sctlqid) . 981c error(Ehungup); . 956c if(STREAMTYPE(c->qid.path) != Sdataqid) . 925c if(CHDIR & c->qid.path) . 921c if(STREAMTYPE(c->qid.path) == Sctlqid){ . 848c return streamclose1(c->stream); . 840c int . 838a return rv; . 832c rv = --(s->opens); . 799a int rv; . 795c int . 737c c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0); . 723c && s->id == STREAMID(c->qid.path)){ . 719c && s->id == STREAMID(c->qid.path)){ . 664c error(Enostream); . 636c devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, tab->name, tab->length, . 611,612c "data", {Sdataqid}, 0, 0600, "ctl", {Sctlqid}, 0, 0600, . 561c error(Ebadld); . 557c error(Ebadld); . 544c error(Ehungup); . 338c error(Ebadld); . 242c error(Enoqueue); . ## diffname gnot/stream.c 1990/1127 ## diff -e /n/bootesdump/1990/11211/sys/src/9/68020/stream.c /n/bootesdump/1990/1127/sys/src/9/68020/stream.c 140a if(newblock(bcp) == 0) continue; . 113a * upgrade a block 0 block to another class (called with bcp qlocked) */ newblock(Bclass *bcp) { Page *page; int n; Block *bp; uchar *cp; if(bcp->made > bcp->lim) return; if(bcp == bclass){ /* * create some level zero blocks and return */ page = newpage(1, 0, 0); page->va = VA(kmap(page)); n = BY2PG/sizeof(Block); bp = (Block *)(page->va); while(n-- > 0){ bp->flags = 0; bp->base = bp->lim = bp->rptr = bp->wptr = 0; if(bcp->first) bcp->last->next = bp; else bcp->first = bp; bcp->last = bp; bcp->made++; bp++; } } else { /* * create a page worth of new blocks */ page = newpage(1, 0, 0); page->va = VA(kmap(page)); n = BY2PG/bcp->size; cp = (uchar *)(page->va); while(n-- > 0){ /* * upgrade a level 0 block */ bp = allocb(0); qlock(bclass); bclass->made--; bcp->made++; bp->flags = bcp - bclass; qunlock(bclass); /* * tack on the data area */ bp->base = bp->rptr = bp->wptr = cp; cp += bcp->size; bp->lim = cp; if(bcp->first) bcp->last->next = bp; else bcp->first = bp; bcp->last = bp; } } return; } /* . 84,91c bcp->lim = n; bcp->made = 0; . 77,78c /* * set limits on blocks */ . 73c * allocate queues, streams . 69d 46a int lim; int made; . 38d ## diffname gnot/stream.c 1990/1128 ## diff -e /n/bootesdump/1990/1127/sys/src/9/68020/stream.c /n/bootesdump/1990/1128/sys/src/9/68020/stream.c 1117a freeb(bp); } else if(streamparse("look", bp)){ qlook(s, (char *)bp->rptr); . 1083a * look ldname -- look for a line discipline . 1078a * look for an instance of the line discipline `name' on * the stream `s' */ void qlook(Stream *s, char *name) { Queue *q; for(q = s->procq; q; q = q->next){ if(strcmp(q->info->name, name) == 0) return; /* * this may be 2 streams joined device end to device end */ if(q == s->devq->other) break; } errors("not found"); } /* . ## diffname gnot/stream.c 1990/1210 # deleted ## diff -e /n/bootesdump/1990/1128/sys/src/9/68020/stream.c /n/bootesdump/1990/1210/sys/src/9/68020/stream.c 1,1344d