#include #include #include "dat.h" #include "fns.h" /* Each hubsrv manages a Hub */ /* Each Hub can serve up to MAXFDS simultaneous input and output clients */ /* The typical minimal usage is a Hub for each of the 3 conventional standard i/o file descriptors */ /* forks readers and writers, then waits and unlocks writers when a message arrives */ void hubsrv(Hub *h) { int tempid; print("-Hub%s started-", h->name); /* lock output to wait for first input */ qlock(&h->entwrlck); for(int i=0; i < MAXFDS; i++){ qlock(&h->wroblck[i]); } for(int i=0; i < h->infdcount; i++){ if((tempid=rfork(RFPROC|RFMEM|RFNOWAIT|RFNOTEG)) == 0){ if(verbosity == UP){ print("-%s rdproc forked %d-",h->name, i); } rdproc(h, i, h->infds[i]); } h->infdpids[i] = tempid; } for(int i=0; i < h->outfdcount; i++){ if((tempid=rfork(RFPROC|RFMEM|RFNOWAIT|RFNOTEG)) == 0){ if(verbosity == UP){ print("-%s wrproc forked %d-",h->name, i); } wrproc(h, i, h->outfds[i]); } h->outfdpids[i] = tempid; } while(h->hubstate == GO){ if(h->iblck.locked == 0){ qlock(&h->entwrlck); } for(int i=0; i< h->outfdcount; i++){ if((h->outq[i] != h->inqcount) && (h->wroblck[i].locked == 1)){ if(paranoia == UP){ ketchup = UP; } qunlock(&h->wroblck[i]); } sleep(h->sleeptime); sleep(h->sleeptime); } } print("!hubsrv %s ended!\n", h->name); return; } /* reader process attached to given hub and fd */ /* reads on infd and puts messages into bucket and advances the pointer */ /* monitors both bucket and pointer queue for overflows and resets as needed */ void rdproc(Hub *h, int id, int infd) { int rn; char buf[8192]; int buffuse; buffuse = 16384; /* we are paranoid about buffer overflows, these are insurance bytes */ while(h->infdstat[id] != STOP){ if(paranoia == UP){ while(ketchup == UP){ sleep(h->sleeptime); } } rn = read(infd, buf, 8192); if(rn == 0){ print("\n-%s rd%d 0 length read-\n", h->name, id); sleep(100); continue; } if(rn < 0){ print("\n\n-%s rdp%d read error!\n\n", h->name, id); sysfatal("READ ERROR!\n"); } /* This is the main critical region of every hub entered immediately after a successful read */ /* The iblck protects all the core bucket and queue and pointer information */ /* writer sync is loose unless paranoia is active, but bucket data integrity is always enforced */ qlock(&h->iblck); h->msgptr[h->inqcount] = h->inbuckp; h->inqsize[(h->inqcount)] = rn; memcpy(h->inbuckp, buf, rn); h->inbuckp += rn; buffuse += rn; h->inqcount++; if(verbosity == UP){ print("-%s rd%d msg.%d %d bytes bucket use %d-", h->name, id, h->inqcount, rn, buffuse); } if(h->inqcount == MAXQ -2){ print("-%s rd%d resetting counter\n", h->name, id); h->inqsize[h->inqcount] = -1; h->inqcount = 0; } if(buffuse > BUCKSIZE){ print("-%s rd%d resetting pointer to start of buffer-\n", h->name, id); for(int i=0; i <5; i++){ sleep(h->sleeptime); } h->inbuckp = &h->bucket[0]; buffuse = 16384; } if(h->entwrlck.locked == 1){ qunlock(&h->entwrlck); } sleep(h->sleeptime); qunlock(&h->iblck); /* End of critical region */ /* master write lock entwrlock unlocked and write procs are active*/ /* iblck unlocked and readers may now add data and adjust the *inbuckp */ sleep(h->sleeptime); } print("-%s rdproc %d TERMINATING!\n", h->name, id); exits(nil); } /* writer process attached to a given hub */ /* waits for messages then writes them to outfd, locks itself when it catches up */ /* h->outq[id] is the writer's message queue counter that follows behind the master inqcount */ void wrproc(Hub *h, int id, int outfd) { int wn; while(h->outfdstat[id] != STOP){ while(h->outq[id] != h->inqcount){ if(paranoia == UP){ ketchup = UP; } if(h->inqsize[h->outq[id]] == -1){ h->outq[id] = 0; print("-%s wr%d resetting-\n", h->name, id); continue; } wn = write(outfd, h->msgptr[h->outq[id]], h->inqsize[h->outq[id]]); if(wn == 0){ print("\n-%s wr%d 0 length write-\n", h->name, id); sleep(100); continue; } if(wn < 0){ print("\n\n-%s wr%d write error!\n", h->name, id); sysfatal("WRITE ERROR!\n"); } if(verbosity == UP){ print("-%s wr%d msg.%d %d bytes-", h->name, id, h->outq[id], h->inqsize[h->outq[id]]); } h->outq[id]++; } sleep(h->sleeptime); if(h->outq[id] != h->inqcount){ continue; } if(paranoia == UP){ ketchup = DOWN; } qlock(&h->wroblck[id]); sleep(h->sleeptime); } print("-%s wrproc %d TERMINATING!\n", h->name, id); exits(nil); } /* NOTE! We deliberately permit the rdproc to LAP the wrproc if it is reading much faster than the wrproc is writing. In other words, if the rdproc can manage to get an entire BUCKSIZE worth of data ahead of the wrproc, the wrproc will 'cross the streams' from the outdated data to the newer data. If this is problematic, the best solution is to adjust the sleeptime of the Hub to throttle the rdproc down to a speed the wrproc can match. The maximum size of a read is 8k. A sleeptime of 100ms (corresponding to the command string 'hub h1t100' in ioshell or 'h1t100' on a direct connection or echo to the ctl/data file) will throttle the rdproc bandwidth down to 40k/sec for instance. Note that this is only necessary if the size of a continuous high speed data stream is significantly larger than BUCKSIZE, compiled by default at around 512k. The design motivation for this behavior is that in a multi-client session, a single slow client will not cause a general slowdown for all users unless the sleeptime is changed to match their bottleneck. The other alternative is to set the -p 'paranoia' flag or send 'fear' to the ctl/data file. This is suboptimal because it locks the readers to the output speed of the writers and also causes the readers to 'bounce' while waiting for them. */