/* Copyright (c) 2008 Richard Bilson */ #include "stdinc.h" #include "dat.h" #include "fns.h" #include "whack.h" static char* doS3write(S3Con *c, VtReq *r) { S3Vhdr hdr; S3Req req; S3Resp resp; char *err = nil; uchar *block; int wlen; hdr.blocktype = r->tx.blocktype; hdr.size = packetsize(r->tx.data); packetsha1(r->tx.data, r->rx.score); memset(&req, 0, sizeof(req)); req.method = "PUT"; req.resource = smprint("/%U/%V", config.bucket, r->rx.score); req.content = vtmalloc(HdrSize + AESbsize + hdr.size); /* compress */ block = vtmalloc(hdr.size); packetcopy(r->tx.data, block, 0, hdr.size); wlen = whackblock(req.content+HdrSize+AESbsize, block, hdr.size); if(wlen > 0 && wlen < hdr.size) { //fprint(2, "dos3write compress %V\n", r->rx.score); hdr.codec = BlockECompress; hdr.csize = wlen; } else { if(wlen > hdr.size) { fprint(2, "whack error: dsize=%d size=%d\n", wlen, hdr.size); abort(); } hdr.codec = BlockENone; hdr.csize = hdr.size; memmove(req.content+HdrSize+AESbsize, block, hdr.size); } req.clen = HdrSize + AESbsize + hdr.csize; /* encrypt */ if(config.key){ encryptblock(req.content+HdrSize, hdr.csize, config.key); } /* write */ vhdrpack(&hdr, req.content); //sha1(req.content+HdrSize+AESbsize, hdr.csize, r->rx.score, nil); //req.resource = smprint("/rcbilson-ventitest/%V", r->rx.score); if(err = S3request(c, &req, &resp)) { err = vtstrdup(err); goto cleanup; } if(resp.result[0] != '2'){ err = vtstrdup(resp.result); S3responsediscard(&resp); goto cleanup; } cleanup: vtfree(req.content); vtfree(block); free(req.resource); return err; } static char* doS3read(S3Con *c, VtReq *r) { S3Vhdr hdr; S3Req req; S3Resp resp; char *err = nil; uchar rawhdr[HdrSize], sha1[VtScoreSize], *buf, *ubuf; long len; Unwhack uw; int nunc; memset(&req, 0, sizeof(req)); req.method = "GET"; req.resource = smprint("/%U/%V", config.bucket, r->tx.score); if(err = S3request(c, &req, &resp)) { err = vtstrdup(err); goto cleanup; } if(memcmp(resp.result, "404", 3) == 0) { r->rx.error = vtstrdup("no such block"); r->rx.msgtype = VtRerror; goto cleanup2; } if(resp.result[0] != '2'){ err = vtstrdup(resp.result); goto cleanup2; } len = 0; do { long n = S3response(&resp, rawhdr, HdrSize - len); if(n < 0) { err = vtstrdup("EOF on response"); goto cleanup; } len += n; } while( len < HdrSize ); vhdrunpack(&hdr, rawhdr); if(hdr.blocktype != r->tx.blocktype) { r->rx.error = vtstrdup("type mismatch"); r->rx.msgtype = VtRerror; goto cleanup; } if(hdr.size > r->tx.count) { r->rx.error = vtstrdup("too big"); r->rx.msgtype = VtRerror; goto cleanup; } buf = vtmalloc(AESbsize + hdr.csize); len = 0; do { long n = S3response(&resp, buf, AESbsize + hdr.csize - len); if(n <= 0) { err = vtstrdup("EOF on response"); vtfree(buf); goto cleanup; } len += n; } while(len < AESbsize + hdr.csize); /* decrypt */ if(config.key) decryptblock(buf, hdr.csize, config.key); /* decompress */ ubuf = vtmalloc(hdr.size); switch(hdr.codec){ case BlockECompress: //fprint(2, "dos3read decompress %V\n", r->tx.score); unwhackinit(&uw); nunc = unwhack(&uw, ubuf, hdr.size, buf+AESbsize, hdr.csize); if(nunc != hdr.size){ if(nunc < 0) { char *msg = smprint("decompression failed: %s", uw.err); r->rx.error = vtstrdup(msg); r->rx.msgtype = VtRerror; free(msg); } else { char *msg = smprint("decompression gave partial block: %d/%d\n", nunc, hdr.size); r->rx.error = vtstrdup(msg); r->rx.msgtype = VtRerror; free(msg); } vtfree(buf); vtfree(ubuf); goto cleanup; } break; case BlockENone: if(hdr.csize != hdr.size){ char *msg = smprint("loading clump: bad uncompressed size for uncompressed block %V", r->tx.score); r->rx.error = vtstrdup(msg); r->rx.msgtype = VtRerror; free(msg); vtfree(buf); vtfree(ubuf); goto cleanup; } memmove(ubuf, buf+AESbsize, hdr.size); break; default: r->rx.error = vtstrdup("unknown encoding in loadlump"); r->rx.msgtype = VtRerror; vtfree(buf); vtfree(ubuf); goto cleanup; } vtfree(buf); r->rx.data = packetalloc(); packetappend(r->rx.data, ubuf, hdr.size); packetsha1(r->rx.data, sha1); if(memcmp(sha1, r->tx.score, VtScoreSize) != 0) { r->rx.error = vtstrdup("score mismatch"); r->rx.msgtype = VtRerror; packetfree(r->rx.data); vtfree(ubuf); goto cleanup2; } vtfree(ubuf); goto cleanup; cleanup2: S3responsediscard(&resp); cleanup: free(req.resource); return err; } int nretries = 2; static void S3retry(S3Con *c, VtReq *r, char *(*op)(S3Con *c, VtReq *r), int nretries) { char *err; int ntries = 0; while(1) { err = op(c, r); if(!err || ntries++ >= nretries ) break; vtfree(err); //fprint(2, "RETRY\n"); S3reopen(c); } if(err) { //fprint(2, "RETRIES FAILED\n"); r->rx.error = err; r->rx.msgtype = VtRerror; } } void S3writeblock(S3Con *c, VtReq *r) { S3retry(c, r, doS3write, nretries); } void S3readblock(S3Con *c, VtReq *r) { S3retry(c, r, doS3read, nretries); }