#include "deluge.h" typedef struct Listenargs Listenargs; typedef struct Readerargs Readerargs; typedef struct Writerargs Writerargs; typedef struct Readmsg Readmsg; typedef struct Hsin Hsin; typedef struct Trackreq Trackreq; typedef struct Stale Stale; struct Listenargs { char *adir; Channel *newpeer; Torrent *t; }; struct Readerargs { Channel *out; Channel *stalechan; Peer *p; int nbytes; }; struct Writerargs { Channel *in; Channel *out; int fd; Channel *stalechan; int peern; }; struct Readmsg { Msg *m; }; struct Hsin { Peer *p; uchar *infohash; uchar *ourpeerid; }; struct Trackreq { char *announce; char *listenport; uchar *infohash; uchar *peerid; int npieces; vlong ul, dl, left; char *event; }; struct Stale { int peern; Peer *p; int bad; }; void torrentinit(Torrent *t, Bee *b, int needopen, int maycreate) { Bee*info; int i; char *p; int plen; uchar *pieces; vlong pieceslen; vlong piecelen; t->filecreated = 0; ebeedictget(b, "info", TDict, &info, nil); beepickle(info, &p, &plen); sha1((uchar *)p, plen, t->infohash, nil); free(p); ebeedictget(b, "announce", TString, &t->announce, nil); ebeedictget(info, "piece length", TInteger, &piecelen, nil); t->piecelen = (ulong)piecelen; ebeedictget(b, "info", TDict, &info, nil); fileinit(t, info, needopen, maycreate); ebeedictget(info, "pieces", TString, &pieces, &pieceslen); if(pieceslen % Piecehashlen != 0) sysfatal("string pieces has invalid size (%lld %% %d != 0)", pieceslen, Piecehashlen); t->npieces = pieceslen / Piecehashlen; DEBUG(2, "have npieces=%d\n", t->npieces); t->pieces = emalloc(sizeof t->pieces[0] * t->npieces); for(i = 0; i < t->npieces; i++){ piecelen = t->piecelen; if(i == t->npieces - 1) piecelen = t->length - ((t->npieces - 1)*t->piecelen); pieceinit(&t->pieces[i], i, &pieces[i*Piecehashlen], piecelen); } t->haves = bitnew(t->npieces, nil); t->stored = 0; DEBUG(2, "done for torrentinit\n"); } void torrentverify(Torrent *t) { int i; for(i = 0; i < t->npieces; i++){ if(filepiecehashokay(t, &t->pieces[i])){ bitset(t->haves, i); t->stored += t->pieces[i].length; } } DEBUG(2, "done for torrentverify\n"); } int nchoked(Torrent *t) { int n; Peer *p; n = 0; for(p = t->peers; p; p = p->next) if(IsChoked(p)) n++; return n; } void kickwriter(Peer *p) { if(p->meta){ if(nbsendp(p->write, p->meta) != 0){ p->meta = p->meta->next; return; } } if(p->dataout){ if(nbsendp(p->write, p->dataout) != 0){ p->dataout = p->dataout->next; return; } } } void sendstatusmsg(Peer *p, int type) { Msg *m; m = msgnew(type, p->n); msgappend(&p->meta, m); kickwriter(p); } void choke(Peer *p) { Bite *b; DEBUG(2, "choked peer=%d\n", p->n); p->changed = 1; setischoked(p, 1); p->lastchange = time(0); while(p->rreq){ b = p->rreq; p->rreq = b->next; bitefree(b); } sendstatusmsg(p, MChoke); } void unchoke(Peer *p) { DEBUG(2, "unchoked peer=%d\n", p->n); p->changed = 1; setischoked(p, 0); p->lastchange = time(0); sendstatusmsg(p, MUnchoke); } void interestedin(Peer *p) { DEBUG(2, "now interested in peer=%d\n", p->n); p->changed = 1; setisinteresting(p, 1); p->lastchange = time(0); sendstatusmsg(p, MInterested); } void notinterestedin(Peer *p) { DEBUG(2, "now notinterested in peer=%d\n", p->n); p->changed = 1; setisinteresting(p, 0); p->lastchange = time(0); sendstatusmsg(p, MNotinterested); } void cancel(Peer *p, Bite *b) { Msg *m; Bite *b2; b2 = bitefind(p->lreq, b->piecen, b->offset, b->length); if(b2 == nil) return; biteremove(&p->lreq, b2); bitefree(b2); for(m = p->meta; m; m = m->next){ if(m->index == b->piecen && m->begin == b->offset && m->length == b->length){ msgremove(&p->meta, m); msgfree(m); DEBUG(2, "cancel: cancelled not-yet-sent request\n"); return; } } m = msgnew(MCancel, p->n); m->index = b->piecen; m->begin = b->offset; m->length = b->length; msgappend(&p->meta, m); kickwriter(p); } void request(Torrent *t, Peer *p, Bite *b) { Piece *pc; Msg *m; pc = &t->pieces[b->piecen]; bitset(t->reqpieces, b->piecen); bitset(pc->reqbites, b->n); biteappend(&p->lreq, b); m = msgnew(MRequest, p->n); m->index = b->piecen; m->begin = b->offset; m->length = b->length; msgappend(&p->meta, m); kickwriter(p); } void have(Peer *p, int piecen) { Msg *m; m = msgnew(MHave, p->n); m->have = piecen; msgappend(&p->meta, m); kickwriter(p); } void stale(Channel *c, int bad, int peern, Peer *p) { Stale s; assert((peern == -1 || p == nil) && (peern != -1 || p != nil)); s.bad = bad; s.p = p; s.peern = peern; send(c, &s); } void disconnect(Torrent *t, Peer *p, int bad) { Bite *b, *b2; Msg *m, *m2; Piece *pc; int i; b = p->lreq; while(b){ pc = &t->pieces[b->piecen]; bitunset(pc->reqbites, b->n); b2 = b; b = b->next; bitefree(b2); } while(b){ b2 = b; b = b->next; bitefree(b2); } m = p->meta; while(m){ m2 = m; m = m->next; msgfree(m2); } m = p->dataout; while(m){ m2 = m; m = m->next; msgfree(m2); } p->lreq = p->rreq = nil; p->meta = p->dataout = nil; for(i = 0; i < t->npieces; i++) if(bitget(p->pieces, i)) pieceremoveid(&t->pieces[i], p->n); close(p->fd); p->status &= ~(Valid|Connecting); peerremove(&t->peers, p); if(bad) peeradd(&t->bad, p); else peerfree(p); } Peer * randomunchokable(Torrent *t) { Peer *p; Peer **r = nil; int rn = 0; for(p = t->peers; p; p = p->next){ if(IsInterested(p) && IsChoked(p) && !p->changed && Stable(p)){ r = erealloc(r, (rn+1) * sizeof r[0]); r[rn++] = p; } } if(rn == 0) return nil; p = r[rand() % rn]; free(r); return p; } Peer * worstchokable(Torrent *t) { Peer *p, *rp; int type; type = (t->strategy == Seeding) ? Upload : Download; rp = nil; for(p = t->peers; p; p = p->next) if(!p->changed && !IsChoked(p) && Stable(p) && (rp == nil || (ratesum(p->rate, 20, type) < ratesum(rp->rate, 20, type)))) rp = p; return rp; } int cmp(vlong v1, vlong v2) { if(v1 < v2) return -1; if(v1 > v2) return 1; return 0; } int deserveunchokecmp(void *pp1, void *pp2, int which) { Peer *p1, *p2; vlong v1, v2; p1 = *(Peer **)pp1; p2 = *(Peer **)pp2; if(IsInterested(p1) && !IsInterested(p2)) return -1; if(!IsInterested(p1) && IsInterested(p2)) return 1; v1 = ratesum(p1->rate, 20, which); v2 = ratesum(p2->rate, 20, which); return cmp(v1, v2); } // XXX unchoking in seed mode should be done based on round robin, otherwise a fast leecher dominates a seed. int deserveunchokecmp_seeding(void *pp1, void *pp2) { return deserveunchokecmp(pp1, pp2, Upload); } int deserveunchokecmp_notseeding(void *pp1, void *pp2) { return deserveunchokecmp(pp1, pp2, Download); } void rethinkunchokes(Torrent *t, Peer **cp, int *ncp) { Peer **r, **rp, *p; int i; r = emalloc(peerlen(t->peers) * sizeof r[0]); rp = r; for(p = t->peers; p; p = p->next) *rp++ = p; qsort(r, peerlen(t->peers), sizeof (Peer *), t->strategy == Seeding ? deserveunchokecmp_seeding : deserveunchokecmp_notseeding); for(i = 0; i < MIN(peerlen(t->peers), WantUnchokedCount); i++) cp[i] = r[i]; *ncp = i; free(r); } void listener(void *ap) { Listenargs *la; Channel *newpeer; char *adir; int lcfd, dfd; char ldir[NETPATHLEN]; NetConnInfo *nci; Peer *p; Torrent *t; la = ap; newpeer = la->newpeer; adir = la->adir; t = la->t; free(ap); for(;;){ DEBUG(2, "listening for incoming request\n"); lcfd = listen(adir, ldir); if(lcfd < 0){ fprint(2, "listen proc exiting...\n"); return; } dfd = accept(lcfd, ldir); if(dfd < 0){ DEBUG(2, "accept failed: %r\n"); continue; } nci = getnetconninfo(nil, dfd); p = peernew(t->npieces); p->fd = dfd; p->ip = estrdup(nci->rsys); p->port = estrdup(nci->rserv); sendp(newpeer, p); freenetconninfo(nci); DEBUG(2, "newpeer has been send\n"); } } void timer(void *p) { Channel *c; c = p; for(;;){ sendp(c, nil); sleep(1000); } } void reader(void *ap) { Readerargs *rdargs; Channel *out; Channel *stalechan; Msg *m; char *errmsg; Peer *p; char buf[4]; int len; char *msgbuf; int nbytes; long n; rdargs = ap; p = rdargs->p; out = rdargs->out; stalechan = rdargs->stalechan; nbytes = rdargs->nbytes; free(rdargs); for(;;){ m = emalloc(sizeof m[0]); n = readn(p->fd, buf, 4); if(n <= 0){ free(m); DEBUG(2, "reader: only read %ld out of 4: %r\n", n); stale(stalechan, 0, p->n, nil); return; } len = GET32(buf); if(len > msgmaxlen(nbytes)){ free(m); DEBUG(2, "reader: message too long (%d)\n", len); stale(stalechan, 0, p->n, nil); return; } msgbuf = emalloc(len); n = readn(p->fd, msgbuf, len); if(n < 0 || (len > 0 && n == 0)){ free(m); free(msgbuf); DEBUG(2, "reader: only read %ld out of %d: %r\n", n, len); stale(stalechan, 0, p->n, nil); return; } errmsg = msgparse(msgbuf, len, m, nbytes); free(msgbuf); if(errmsg){ DEBUG(2, "reader: parsing message: %s\n", errmsg); free(m); free(errmsg); stale(stalechan, 0, p->n, nil); return; } m->peern = p->n; DEBUG(2, "reader: %M\n", m); sendp(out, m); } } void writer(void *ap) { Writerargs *wrargs; Channel *in, *out, *stalechan; Msg *m; char *errmsg; int fd; int peern; wrargs = ap; in = wrargs->in; out = wrargs->out; stalechan = wrargs->stalechan; fd = wrargs->fd; peern = wrargs->peern; free(wrargs); for(;;){ m = recvp(in); errmsg = msgwrite(fd, m); if(errmsg){ DEBUG(2, "writer: error: %s\n", errmsg); free(errmsg); msgfree(m); stale(stalechan, 0, peern, nil); return; } DEBUG(2, "writer: %M\n", m); sendp(out, m); } } void dialer(void *ap) { Channel *in; Channel *out; Channel *stalec; Channel **chans; Peer *p; chans = ap; in = chans[0]; out = chans[1]; stalec = chans[2]; free(ap); for(;;){ recv(in, &p); DEBUG(2, "dialer: connecting to ip=%s port=%s\n", p->ip, p->port); p->fd = dial(netmkaddr(p->ip, 0, p->port), 0, 0, 0); if(p->fd < 0){ DEBUG(2, "dialing %s: %r\n", netmkaddr(p->ip, 0, p->port)); stale(stalec, 1, -1, p); continue; } sendp(out, p); } } void handshaker(void *ap) { Channel *in; Channel *out; Channel *stalec; Channel **chans; Hsin hi; char *errmsg; uchar peerid[Peeridlen]; Peer *p; uchar *infohash; uchar *ourpeerid; chans = ap; in = chans[0]; out = chans[1]; stalec = chans[2]; free(ap); for(;;){ recv(in, &hi); p = hi.p; infohash = hi.infohash; ourpeerid = hi.ourpeerid; errmsg = peerwritestart(p->fd, infohash, ourpeerid); if(errmsg){ DEBUG(2, "handshaker: %s\n", errmsg); free(errmsg); stale(stalec, 1, -1, p); continue; } errmsg = peerreadstart(p->fd, infohash, peerid); if(errmsg){ DEBUG(2, "handshaker: reading handshake: %s\n", errmsg); free(errmsg); stale(stalec, 1, -1, p); continue; } if(memcmp(p->peerid, "\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", Peeridlen) == 0){ memmove(p->peerid, peerid, Peeridlen); }else{ if(memcmp(p->peerid, peerid, Peeridlen) != 0){ DEBUG(2, "handshaker: unexpected peerid\n"); stale(stalec, 1, -1, p); continue; } } sendp(out, p); } } void tracker(void *ap) { Channel *in; Channel *prospect; Channel *ival; Channel **chans; Bee trackerb; Bee *tmp, *tmp2; Bee *ipb, *portb, *peeridb; Trackreq tr; ulong interval; char *errmsg; int i; Peer *p; chans = ap; in = chans[0]; prospect = chans[1]; ival = chans[2]; free(ap); for(;;){ recv(in, &tr); DEBUG(2, "calling trackerget\n"); errmsg = trackerget(&trackerb, tr.announce, tr.listenport, tr.infohash, tr.peerid, tr.ul, tr.dl, tr.left, tr.event); if(errmsg){ DEBUG(2, "trackerget: %s", errmsg); continue; } if(debug) beeprint(2, 0, &trackerb); tmp = beedictget(&trackerb, "interval", TInteger); if(tmp == nil){ DEBUG(2, "missing key interval in initial response from tracker"); continue; } interval = (ulong)tmp->i; tmp = beedictget(&trackerb, "peers", TList); if(tmp == nil){ DEBUG(2, "missing key peers in initial response from tracker"); continue; } for(i = 0; i < tmp->nl; i++){ tmp2 = &tmp->l[i]; if(tmp2->type != TDict) DEBUG(2, "element in peers list is not dict\n"); ipb = beedictget(tmp2, "ip", TString); portb = beedictget(tmp2, "port", TInteger); peeridb = beedictget(tmp2, "peer id", TString); if(ipb == nil || portb == nil || peeridb == nil){ DEBUG(2, "missing key ipb or portb or peeridb"); continue; } /* XXX check if lenghts are okay */ p = peernew(tr.npieces); p->status |= Connecting; p->ip = estrdup(ipb->s); p->port = smprint("%lld", portb->i); memmove(p->peerid, peeridb->s, Peeridlen); sendp(prospect, p); } send(ival, &interval); beefree(&trackerb); } } void torrentprep(Torrent *t) { char adir[NETPATHLEN]; int acfd; Listenargs *la; int port; char buf[16]; Channel **dialargs; Channel **hsargs; Channel **trargs; int i; t->inpeer = chancreate(sizeof (Peer *), 0); t->timer = chancreate(sizeof (void *), 0); t->inmsg = chancreate(sizeof (void *), 0); t->dialpeer = chancreate(sizeof (Peer *), 0); t->dialedpeer = chancreate(sizeof (Peer *), 0); t->needshake = chancreate(sizeof (Hsin), 0); t->shakedpeer = chancreate(sizeof (Peer *), 0); t->stalepeer = chancreate(sizeof (Stale), 0); t->track = chancreate(sizeof (Trackreq), 0); t->prospect = chancreate(sizeof (Peer *), 0); t->newinterval = chancreate(sizeof (ulong), 0); t->written = chancreate(sizeof (Msg *), 0); genrandom(t->peerid, sizeof t->peerid); t->strategy = Random; t->dl = 0; t->ul = 0; t->rate = ratenew(60); t->interval = -1; t->listenport = nil; t->peers = nil; t->bad = nil; t->peern = 0; t->todial = nil; t->tohandshake = nil; t->reqpieces = bitnew(t->npieces, nil); port = Listenport; do { snprint(buf, sizeof buf, "%d", port); acfd = announce(netmkaddr("*", 0, buf), adir); } while(acfd < 0 && ++port < Listenport + Listenattempts); if(acfd < 0) sysfatal("could not announce: %r"); DEBUG(2, "we have announced...\n"); t->listenport = estrdup(buf); la = emalloc(sizeof la[0]); la->adir = estrdup(adir); la->newpeer = t->dialedpeer; la->t = t; proccreate(listener, la, STACKSIZE); proccreate(timer, t->timer, STACKSIZE); for(i = 0; i < 1; i++){ dialargs = emalloc(sizeof dialargs[0] * 3); dialargs[0] = t->dialpeer; dialargs[1] = t->dialedpeer; dialargs[2] = t->stalepeer; proccreate(dialer, dialargs, STACKSIZE); } for(i = 0; i < 1; i++){ hsargs = emalloc(sizeof hsargs[0] * 3); hsargs[0] = t->needshake; hsargs[1] = t->shakedpeer; hsargs[2] = t->stalepeer; proccreate(handshaker, hsargs, STACKSIZE); } trargs = emalloc(sizeof trargs[0] * 3); trargs[0] = t->track; trargs[1] = t->prospect; trargs[2] = t->newinterval; proccreate(tracker, trargs, STACKSIZE); } void trackrequest(Torrent *t, char *event) { Trackreq tr; tr.announce = t->announce; tr.listenport = t->listenport; tr.infohash = t->infohash; tr.peerid = t->peerid; tr.npieces = t->npieces; tr.ul = t->ul; tr.dl = t->dl; tr.left = t->length - t->stored; tr.event = event; send(t->track, &tr); } static void handlemsg(Torrent *t, Peer *p, Msg *m) { Bite *b; Peer *p2; Piece *pc; Msg *m2; Msg **mp; switch(m->type){ case MKeepalive: DEBUG(2, "handlemsg: keepalive...\n"); break; case MChoke: setischoking(p, 1); /* all requests we made to peer are now invalid, remove them */ while(b = p->lreq){ /* XXX we may still have this bite requested at another peer */ pc = &t->pieces[b->piecen]; bitunset(pc->reqbites, b->n); p->lreq = b->next; bitefree(b); } /* also remove requests we were stilll going to send */ mp = &p->meta; while(m2 = *mp){ if(m2->type == MRequest){ *mp = m2->next; msgfree(m2); } mp = &(*mp)->next; } DEBUG(2, "handlemsg: we are now choked by peer=%d\n", p->n); break; case MUnchoke: setischoking(p, 0); DEBUG(2, "handlemsg: we are now unchoked by peer=%d\n", p->n); if(IsInteresting(p)) piecepeerschedule(t, p); break; case MInterested: DEBUG(2, "handlemsg: peer=%d is now interested\n", p->n); setisinterested(p, 1); if(IsChoked(p) && nchoked(t) < WantUnchokedCount) unchoke(p); break; case MNotinterested: DEBUG(2, "handlemsg: peer=%d is not interested anymore\n", p->n); setisinterested(p, 0); if(!IsChoked(p)) choke(p); break; case MHave: DEBUG(2, "handlemsg: peer=%d now has piece %uld\n", p->n, m->have); if(m->have >= t->npieces){ DEBUG(2, "handlemsg: peer=%d sent m->have=%uld (>= npieces=%d)\n", p->n, m->have, t->npieces); break; } if(bitget(p->pieces, m->have)){ DEBUG(2, "handlemsg: we already knew peer=%d had piece %uld\n", p->n, m->have); break; } bitset(p->pieces, m->have); if(!IsInteresting(p) && bitinvertandlen(t->haves, p->pieces) > 0) interestedin(p); break; case MBitfield: if(p->havefirst){ DEBUG(2, "handlemsg: have bitfield after first message\n"); disconnect(t, p, 1); } if(bitnbytes(m->haves) != bitnbytes(p->pieces)){ DEBUG(2, "handlemsg: wrong size of bitfield\n"); disconnect(t, p, 1); } bitcopy(p->pieces, m->haves, bitlen(p->pieces)); if(t->strategy == Seeding && bithaveall(p->pieces)){ DEBUG(2, "handlemsg: disconnecting from other seeder\n"); disconnect(t, p, 0); break; } pieceaddpeerhaves(t, p); DEBUG(2, "handlemsg: first message is bitfield (peer has %.2f%%)\n", 100*bithave(p->pieces)); if(bitinvertandlen(t->haves, p->pieces) > 0) interestedin(p); break; case MRequest: DEBUG(2, "handlemsg: peer=%d requests piece=%uld begin=%uld length=%uld\n", p->n, m->index, m->begin, m->length); if(IsChoked(p)){ DEBUG(2, "handlemsg: received request from choked peer, ignoring...\n"); break; } /* XXX see that queue isn't growing too large */ b = bitenew(-1, m->index, m->begin, m->length); biteappend(&p->rreq, b); kickwriter(p); break; case MPiece: if(m->index >= t->npieces){ DEBUG(2, "handlemsg: peer=%d sent out-of-bound piecen=%uld\n", p->n, m->index); break; } b = bitefind(p->lreq, m->index, m->begin, m->length); if(b == nil){ DEBUG(2, "handlemsg: incoming piece wasn't requested, ignoring...\n"); break; } biteremove(&p->lreq, b); if(bitehave(t, b)){ DEBUG(2, "handlemsg: already have incoming piece, ignoring...\n"); break; } for(p2 = t->peers; p2; p2 = p2->next) cancel(p2, b); pc = &t->pieces[b->piecen]; bitunset(pc->reqbites, b->n); filewrite(t, m->index*t->piecelen + m->begin, m->length, m->piece); rateadd(p->rate, 0, m->length); rateadd(t->rate, 0, m->length); pc = &t->pieces[m->index]; bitset(pc->bites, b->n); if(bithaveall(pc->bites)){ if(filepiecehashokay(t, pc)){ for(p2 = t->peers; p2; p2 = p2->next){ have(p2, m->index); if(bitinvertandlen(t->haves, p->pieces) == 0) notinterestedin(p); } bitset(t->haves, pc->n); t->stored += m->length; strategy(t); DEBUG(2, "handlemsg: have valid hash for piecen=%d\n", pc->n); }else{ bitclear(pc->bites); bitclear(pc->reqbites); DEBUG(2, "handlemsg: invalid hash, have to recheck all\n"); } } bitefree(b); piecepeerschedule(t, p); break; case MCancel: DEBUG(2, "handlemsg: peer=%d cancels piece=%uld begin=%uld length=%uld\n", p->n, m->index, m->index, m->length); b = bitefind(p->rreq, m->index, m->begin, m->length); if(b == nil){ DEBUG(2, "handlemsg: could not find rreq to cancel\n"); break; } biteremove(&p->rreq, b); bitefree(b); break; default: sysfatal("cannot happen in handlemsg\n"); } p->havefirst = 1; } void torrentdo(Torrent *t) { ulong interval; Peer *p; Hsin hi; Msg *m; Readerargs *rdargs; Writerargs *wrargs; Peer *nc[WantUnchokedCount]; int n; int i; ulong now; Peer *pnew, *pworst; Stale stale; Bite *b; Alt a[] = { {t->timer, nil, CHANRCV}, {t->inpeer, &p, CHANRCV}, {t->inmsg, &m, CHANRCV}, {t->dialedpeer, &p, CHANRCV}, {t->shakedpeer, &p, CHANRCV}, {t->stalepeer, &stale, CHANRCV}, {t->prospect, &p, CHANRCV}, {t->newinterval, &interval, CHANRCV}, {t->dialpeer, nil, CHANSND}, {t->needshake, nil, CHANSND}, {t->written, &m, CHANRCV}, {nil, nil, CHANEND}, }; strategy(t); for(;;){ a[8].v = &t->todial; a[8].op = t->todial ? CHANSND : CHANNOP; hi.p = t->tohandshake; hi.infohash = t->infohash; hi.ourpeerid = t->peerid; a[9].v = &hi; a[9].op = hi.p ? CHANSND : CHANNOP; switch(alt(a)){ case 0: /* timer */ DEBUG(2, "do: timer npeers=%d nchoked=%d\n", peerlen(t->peers), nchoked(t)); now = time(0); ratetick(t->rate); for(p = t->peers; p; p = p->next) ratetick(p->rate); for(p = t->peers; p; p = p->next) p->changed = 0; if(now % 10 == 0){ rethinkunchokes(t, nc, &n); for(i = 0; i < n; i++) if(IsChoked(nc[i])) unchoke(nc[i]); for(p = t->peers; p; p = p->next) if(!IsChoked(p) && !p->changed) choke(p); } if(now % 30 == 0){ pnew = randomunchokable(t); if(pnew){ pworst = worstchokable(t); if(pworst){ choke(pworst); unchoke(pnew); } } } if(now % 60 == 0){ for(p = t->peers; p; p = p->next) if(!p->changed && p->lastchange < now-60 && ratesum(p->rate, 60, (t->strategy == Seeding) ? Upload : Download) == 0) choke(p); } if(t->interval < 0 || time(0) % t->interval == 0){ trackrequest(t, (t->interval < 0) ? "started" : ""); if(t->interval < 0) t->interval = DefaultInterval; } for(p = t->peers; p; p = p->next) p->changed = 0; break; case 1: /* new peer that connected to us */ if(peermatch(t->peers, p) || peermatchbyip(t->bad, p)){ disconnect(t, p, 0); break; } peeradd(&t->tohandshake, p); break; case 2: /* incoming message */ p = peerfind(t->peers, m->peern); if(p) handlemsg(t, p, m); msgfree(m); break; case 3: /* dialed peer */ if(t->strategy == Seeding){ disconnect(t, p, 0); break; } peeradd(&t->tohandshake, p); break; case 4: /* handshaked peer */ if(peermatch(t->peers, p) || peermatchbyip(t->bad, p)){ disconnect(t, p, 0); break; } p->status &= ~Connecting; p->status |= Valid; p->n = t->peern++; p->next = nil; peeradd(&t->peers, p); rdargs = emalloc(sizeof rdargs[0]); rdargs->out = t->inmsg; rdargs->stalechan = t->stalepeer; rdargs->p = p; rdargs->nbytes = bitnbytes(t->haves); proccreate(reader, rdargs, STACKSIZE); wrargs = emalloc(sizeof wrargs[0]); wrargs->in = p->write; wrargs->out = t->written; wrargs->fd = p->fd; wrargs->stalechan = t->stalepeer; wrargs->peern = p->n; proccreate(writer, wrargs, STACKSIZE); if(bitnhave(t->haves) > 0){ m = msgnew(MBitfield, p->n); m->haves = bitnew(bitlen(t->haves), nil); bitcopy(m->haves, t->haves, bitlen(m->haves)); }else{ m = msgnew(MKeepalive, p->n); } msgappend(&p->meta, m); kickwriter(p); if(IsInterested(p) && IsChoked(p) && nchoked(t) < WantUnchokedCount){ unchoke(p); } break; case 5: /* stale peer */ DEBUG(2, "do: stale peer\n"); if(stale.p == nil) p = peerfind(t->peers, stale.peern); if(p == nil) break; disconnect(t, p, stale.bad); break; case 6: /* new prospect peer */ if(t->strategy == Seeding || peermatch(t->peers, p) || peermatchbyip(t->bad, p)){ disconnect(t, p, 0); break; } DEBUG(2, "do: new prospect peer (%s %s %H)\n", p->ip, p->port, p->peerid); peeradd(&t->todial, p); break; case 7: /* new interval */ t->interval = MAX(MinInterval, interval); DEBUG(2, "new interval=%d\n", t->interval); break; case 8: /* send to dialer */ peerpop(&t->todial); break; case 9: /* send to handshaker */ peerpop(&t->tohandshake); break; case 10: /* message has been written to peer */ p = peerfind(t->peers, m->peern); if(p == nil) break; if(m->type == MPiece){ rateadd(p->rate, m->length, 0); rateadd(t->rate, m->length, 0); } msgfree(m); if(p->meta){ sendp(p->write, p->meta); p->meta = p->meta->next; } else if(p->dataout){ sendp(p->write, p->dataout); p->dataout = p->dataout->next; }else if(p->rreq){ b = p->rreq; p->rreq = p->rreq->next; b->next = nil; m = msgnew(MPiece, -1); m->piece = emalloc(b->length); m->length = b->length; m->index = b->piecen; m->begin = b->offset; fileread(t, b->piecen*t->piecelen + b->n*Bitelength, b->length, m->piece); bitefree(b); sendp(p->write, m); } break; default: sysfatal("invalid case in alt\n"); } } }