#include #include #include #include "linuxsys.h" #include "linux.h" typedef struct Buf Buf; typedef struct Bufq Bufq; typedef struct Buffd Buffd; struct Buf { Buf *next; uchar *bp; uchar *ep; uchar data[]; }; struct Bufq { Buf *head; Buf **tail; }; struct Buffd { QLock lock; int fd; int pid; int shutdown; int eof; int error; Bufq rq; Rendez queuewait; Rendez readwait; Rendez killwait; }; enum { MAXREADSIZE = 1024*8, MAXQUEUESIZE = 1024*64, }; static int queuesize(Bufq *q) { Buf *b; int n; n = 0; for(b=q->head; b; b=b->next) n += (b->ep - b->bp); return n; } static void flushqueue(Bufq *q) { Buf *b, *x; for(b = q->head; b; b = x){ x = b->next; free(b); } q->head = nil; q->tail = &q->head; } static void killbuffd(Buffd *bfd) { if(bfd->fd != -1){ bfd->fd = -1; if(bfd->pid != -1){ int pid; pid = bfd->pid; rwakeup(&bfd->queuewait); rwakeup(&bfd->readwait); if(!postnote(PNPROC, pid, "rocktimer")) rsleep(&bfd->killwait); } } } static void destroybuffdtag(void *tag) { Buffd *bfd; bfd = *fdtagp(tag); *fdtagp(tag) = nil; qlock(&bfd->lock); killbuffd(bfd); flushqueue(&bfd->rq); qunlock(&bfd->lock); free(bfd); } static void forkbuffdtag(void *tag) { Buffd *bfd; bfd = *fdtagp(tag); *fdtagp(tag) = nil; flushqueue(&bfd->rq); free(bfd); unlinkfdtag(tag); } static int readprocnote(void *, char *msg) { if(threadp->pid != 0) return 0; if(strcmp(msg, "rocktimer")==0) return 1; return 0; } static void readproc(void *aux) { Buffd *bfd; Buf *b; int n; int z; int fd; bfd = aux; z = 0; b = nil; qlock(&bfd->lock); fd = bfd->fd; if(bfd->fd < 0) goto die; qunlock(&bfd->lock); for(;;){ if(b==nil){ b = malloc(sizeof(*b) + MAXREADSIZE); if(b == nil){ qlock(&bfd->lock); bfd->error = -ENOMEM; goto die; } } n = read(fd, b->data, MAXREADSIZE); qlock(&bfd->lock); if(bfd->fd < 0) goto die; if(n == 0){ if(++z <= 3){ qunlock(&bfd->lock); continue; } bfd->eof = 1; epollevent(fd, POLLIN, 0); goto die; } z = 0; if(n < 0){ int e; switch(e = mkerror()){ case -ESHUTDOWN: bfd->eof = 1; break; default: bfd->error = e; } epollevent(fd, POLLIN, 0); goto die; } if(realloc(b, sizeof(*b) + n) == nil){ bfd->error = -ENOMEM; goto die; } b->ep = b->bp = b->data; b->ep += n; b->next = nil; *bfd->rq.tail = b; bfd->rq.tail = &b->next; b = nil; epollevent(fd, POLLIN, 0); rwakeup(&bfd->readwait); if(queuesize(&bfd->rq) > MAXQUEUESIZE) rsleep(&bfd->queuewait); if(bfd->fd < 0) goto die; qunlock(&bfd->lock); } die: bfd->pid = -1; rwakeup(&bfd->killwait); rwakeup(&bfd->readwait); qunlock(&bfd->lock); if(b) free(b); } static void buffdproc(void *aux) { Buffd *bfd; bfd = aux; threadp->pid = 0; atnotify(readprocnote, 1); readproc(bfd); } void buffd(int fd) { void *tag; Buffd *bfd; if((tag = openfdtag(fd, TAG_BUFFD, 1))==nil) return; if(*fdtagp(tag) != nil){ closefdtag(tag); return; } bfd = malloc(sizeof(*bfd)); memset(bfd, 0, sizeof(*bfd)); qlock(&bfd->lock); bfd->fd = fd; bfd->pid = -1; bfd->shutdown = 0; bfd->eof = 0; bfd->error = 0; bfd->rq.head = nil; bfd->rq.tail = &bfd->rq.head; bfd->queuewait.l = &bfd->lock; bfd->readwait.l = &bfd->lock; bfd->killwait.l = &bfd->lock; bfd->pid = createxproc(buffdproc, bfd, RFMEM|RFPROC, 16 * 1024); *fdtagp(tag) = bfd; atdestroyfdtag(tag, destroybuffdtag); atforkfdtag(tag, forkbuffdtag); qunlock(&bfd->lock); closefdtag(tag); } void buffdpoll(int fd) { void *tag; Buffd *bfd; ulong e; if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil) return; if((bfd = *fdtagp(tag)) == nil) return; qlock(&bfd->lock); closefdtag(tag); e = 0; if(!((bfd->shutdown&(1<shutdown&(1<error || bfd->eof || bfd->rq.head) e |= POLLIN; if(bfd->error) e |= POLLRDHUP; epollevent(fd, e, 0); qunlock(&bfd->lock); } int buffdionread(int fd) { void *tag; int n; Buffd *bfd; tag = openfdtag(fd, TAG_BUFFD, 0); if(tag == nil) return -1; bfd = (Buffd*)*fdtagp(tag); qlock(&bfd->lock); closefdtag(tag); if((bfd->shutdown&(1<shutdown&(1<rq); } qunlock(&bfd->lock); return n; } void shutdownbuffd(int fd, int how) { void *tag; Buffd *bfd; tag = openfdtag(fd, TAG_BUFFD, 0); if(tag == nil) return; bfd = (Buffd*)*fdtagp(tag); qlock(&bfd->lock); closefdtag(tag); bfd->shutdown = (1<rq); qunlock(&bfd->lock); } int peekbuffd(int fd, void *data, int len, int noblock) { int t; void *tag; Buffd *bfd; Buf *b; if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil) return -EBADF; if((bfd = *fdtagp(tag)) == nil) return -EBADF; t = 0; qlock(&bfd->lock); closefdtag(tag); if((bfd->shutdown&(1<shutdown&(1<rq.head; (t < len) && b; b=b->next){ int x; x = b->ep - b->bp; if(x > len-t) x = len-t; memcpy(((uchar*)data) + t, b->bp, x); t += x; } if(t > 0){ break; } if(bfd->eof){ t = 0; break; } if(bfd->error || (bfd->fd < 0)){ t = bfd->error ? bfd->error : -1; break; } if(noblock){ t = -EAGAIN; break; } rsleep(&bfd->readwait); } out: qunlock(&bfd->lock); return t; } int readbuffd(int fd, void *data, int len, int noblock) { int t; void *tag; Buffd *bfd; Buf *b; if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil) return -EBADF; if((bfd = *fdtagp(tag)) == nil) return -EBADF; t = 0; qlock(&bfd->lock); closefdtag(tag); if((bfd->shutdown&(1<shutdown&(1<rq.head) && (t < len)){ int x; x = b->ep - b->bp; if(x > len-t) x = len-t; memcpy(((uchar*)data) + t, b->bp, x); t += x; b->bp += x; if(b->bp == b->ep){ if((bfd->rq.head = b->next) == nil){ bfd->rq.tail = &bfd->rq.head; } free(b); } } if(t > 0){ rwakeup(&bfd->queuewait); break; } if(bfd->eof){ t = 0; break; } if(bfd->error || (bfd->fd < 0)){ t = bfd->error ? bfd->error : -1; break; } if(noblock){ t = -EAGAIN; break; } rsleep(&bfd->readwait); } if(bfd->rq.head == nil) epollevent(fd, 0, POLLIN); out: qunlock(&bfd->lock); return t; } int writebuffd(int fd, void *data, int len, int noblock) { int ret; Buffd *bfd; void *tag; USED(noblock); tag = openfdtag(fd, TAG_BUFFD, 0); if(tag == nil) return -EBADF; bfd = (Buffd*)*fdtagp(tag); qlock(&bfd->lock); closefdtag(tag); if((bfd->shutdown&(1<shutdown&(1<lock); return ret; }