implement Pg; include "sys.m"; sys: Sys; include "bufio.m"; bufio: Bufio; Iobuf: import bufio; include "keyring.m"; keyring: Keyring; include "bytelib.m"; include "pg.m"; debug : con 0; chomp(io : ref Iobuf) : string { t := io.gets(0); if(len(t) > 0) t = t[0:len(t) -1]; return t; } Data.extend_bytes(data : self ref Data, n : int) : int { if(data.bytes == nil) { data.bytes = array[n] of byte; return n; } bytes := array[len(data.bytes) + n] of byte; for(i := 0; i < len(data.bytes); i++) bytes[i] = data.bytes[i]; data.bytes = bytes; return len(data.bytes); } Data.write(data : self ref Data, txt : string) : int { if(data.bytes == nil) data.extend_bytes(3 * len(txt)); bytes_consumed : int; bytes_written := 0; for(i := 0; i < len(txt); i++) { if(len(data.bytes) - data.ptr > 2) bytes_consumed = sys->char2byte(txt[i], data.bytes, data.ptr); else bytes_consumed = 0; if(bytes_consumed == 0) break; data.ptr += bytes_consumed; bytes_written += bytes_consumed; } if(bytes_consumed == 0) { data.extend_bytes(3 * (len(txt) - i)); bytes_written += data.write(txt[i:]); } return bytes_written; } Data.append(data : self ref Data, bytes : array of byte) : int { if(data.bytes == nil) data.extend_bytes(len bytes); else if (len(data.bytes) - data.ptr < len(bytes)) data.extend_bytes(len(bytes) - (len(data.bytes) - data.ptr)); data.bytes[data.ptr:] = bytes; data.ptr += len(bytes); return len(bytes); } Data.md5(data : self ref Data) : string { keyring = load Keyring Keyring->PATH; digest := array[16] of byte; keyring->md5(data.bytes[0:data.ptr], data.ptr, digest, nil); digest_text := "md5"; for(i:=0; i<16; i++) { digest_text += sys->sprint("%02x", int(digest[i])); } return digest_text; } Data.puts(data : self ref Data, txt : string) : int { if(txt == nil) return 0; return data.write(txt) + data.write("\0"); } Data.put_notnil_s(data : self ref Data, key, value : string) : int { if(value != nil) return data.puts(key) + data.puts(value); return 0; } Data.send(data : self ref Data, io : ref Iobuf, tag : int) : int { written := 0; if(tag > 0) { written += io.putc(tag); } written += int32_write(io, 4 + data.ptr); if(data.bytes != nil && data.ptr <= len(data.bytes)) written += io.write(data.bytes, data.ptr); return written; } Data.to_string(data : self ref Data) : string { i, j, c, k : int; bytes := int_to_bytes(data.ptr, 4); out := ""; for(j = 0; j < 4; j++) { out += sys->sprint("%02x ", int(bytes[j])); } out += sys->sprint("[%d]", data.ptr); k = 0; for(i = 0; k < data.ptr ; i += 16) { out += sys->sprint("\n%02X : ", k); for(j = 0; k < data.ptr && j < 16; j++) { c = int(data.bytes[k]); case c { 32 to 126 => out += sys->sprint(".%c", c); * => out += sys->sprint(".%02X", c); } k++; } } return out; } Parameter.read(p : self ref Parameter, io : ref Iobuf) : int { p.key = chomp(io); if(len p.key == 0) return 0; p.value = chomp(io); return 1; } Response.read(r : self ref Response, io : ref Iobuf) : int { r.code = io.getc(); if(r.code == 0) return 0; r.data = chomp(io); return r.code; } Recordset.to_string(r : self ref Recordset) : string { str := "Recordset\n"; str += " Fields: " + string len r.fields + "\n"; for(i:=0; i < len(r.fields); i++) str += " " + string i + " " + r.fields[i].to_string() + "\n"; numrows := len r.rows; str += " Rows: " + string numrows + "\n"; numcols := len r.rows[i]; str += " Cols: " + string numcols + "\n"; for(i=0; i < numrows; i++) { str += " " + string i + " "; for(j:=0; j < numcols; j++) { str += " "; if(r.fields[j].format_code == 0) for(k:=0; k < len(r.rows[i][j]); k++) str += sys->sprint(".%02X", int r.rows[i][j][k]); else str += "X" + string r.rows[i][j]; } str += "\n"; } return str; } read_responses(io : ref Iobuf) : list of ref Response { responses : list of ref Response; r := ref Response; while(r.read(io)) { responses = r :: responses; r = ref Response; } return responses; } Field.read(f : self ref Field, io : ref Iobuf) { f.name = chomp(io); f.table_oid = int32_read(io); f.column_attribute_number = int16_read(io); f.data_type_oid = int32_read(io); f.data_type_size = int16_read(io); f.type_modifier = int32_read(io); f.format_code = int16_read(io); } Field.to_string(f : self ref Field) : string { fcode : string; if(f.format_code == 0) fcode = "ascii"; else fcode = "bin"; fmod : string; if(f.type_modifier != -1) fmod = "(" + string f.type_modifier + ")"; else fmod = ""; flen : string; if(f.data_type_size == -1) flen = "?"; else flen = string f.data_type_size; return sys->sprint("\"%s\" oid:%d.%d type %d[%s]%s %s", f.name, f.table_oid, f.column_attribute_number, f.data_type_oid, flen, fmod, fcode); } read_fields(io : ref Iobuf) : array of ref Field { field_count := int16_read(io); fields := array[field_count] of ref Field; for(i:= 0; i < field_count; i++) { fields[i] = ref Field; fields[i].read(io); } return fields; } Response.to_string(r : self ref Response) : string { case r.code { 'P' or 'p'=> return "position " + r.data; 'F'=> return "in file " + r.data; 'L'=> return ":" + r.data; * => return r.data; } } Parameter.to_string(p : self ref Parameter) : string { return p.key + "=" + string p.value; } response_by_code(responses : list of ref Response, code : int) : string { h : ref Response; for(t:=responses; t != nil; t = tl t) { h = hd t; if(h.code == code) { return h.to_string(); break; } } return nil; } Backend_Message.to_string(b_msg : self ref Backend_Message) : string { str : string; pick msg := b_msg { Error => str = sys->sprint("%s (%s) %s %s%s %s", response_by_code(msg.responses, 'S'), response_by_code(msg.responses, 'C'), response_by_code(msg.responses, 'M'), response_by_code(msg.responses, 'F'), response_by_code(msg.responses, 'L'), response_by_code(msg.responses, 'R') ); h : int; for(t:=list of {'D', 'H', 'P', 'p', 'q', 'W'}; t != nil; t = tl t) { h = hd t; s := response_by_code(msg.responses, h); if(s != nil) { str += sys->sprint(" (%c) %s", h, s); } } Authentication => case msg.auth_type { 0 => str = "AuthenticationOK"; 2 => str = "Kerberos V5 requested"; 3 => str = "Clear Text requested"; 4 => str = sys->sprint("Crypt requested salt %02X%02X", int(msg.salt[0]), int(msg.salt[1])); 5 => str = sys->sprint("MD5 requested salt %02X%02X%02X%02X", int(msg.salt[0]), int(msg.salt[1]), int(msg.salt[2]), int(msg.salt[3])); 6 => str = "SCM Credential requested"; * => str = "Unknown Auth Type"; } ReadyForQuery => str = "ReadyForQuery"; RowDescription => str = sys->sprint("Row Description : %d columns\n", len(msg.fields)); for(i:=0; i < len(msg.fields); i++) { str += sys->sprint(" %s\n", msg.fields[i].to_string()); } DataRow => str = sys->sprint("Data Row column(s):%d \n", len(msg.columns)); for(i:=0; isprint("Column %02d\n", i); if(msg.columns[i] == nil) { str += "NULL\n"; } else { d := ref Data; d.bytes = msg.columns[i]; d.ptr = len(d.bytes); str += sys->sprint("%s\n", d.to_string()); } } CopyData => str = "Copy Data\n"; if(msg.data == nil) str += "NULL"; else { d := ref Data(msg.data, len msg.data); str += d.to_string(); } CopyDone => str = "Copy Done"; CopyInResponse => # G str = sys->sprint("Copy In Response copy format:%d columns:%d\n", int msg.copy_format, len msg.format_codes); for(i:=0; isprint("Column %02d: %d\n", i, msg.format_codes[i]); CopyOutResponse => # H str = sys->sprint("Copy Out Response copy format:%d columns:%d\n", int msg.copy_format, len msg.format_codes); for(i:=0; isprint("Column %02d: %d\n", i, msg.format_codes[i]); CommandComplete => # C str = msg.cmd + " oid:" + string msg.oid + " " + string msg.rows + " row(s)"; ParseComplete => # 1 str = "Parse Complete"; BindComplete => # 2 str = "Bind Complete"; CloseComplete => # 3 str = "Close Complete"; PortalSuspended => # s str = "Portal Suspended"; EmptyQueryResponse => # l str = "Empty Query Response"; NoData => str = "No Data"; ParameterDescription => str = sys->sprint("Parameters %d\n", len(msg.oids)); for(i:=0; isprint(" %d oid: %d\n", i, msg.oids[i]); Unknown => str = sys->sprint("Unknown TAG %c:\n", msg.tag); d := ref Data(nil, 0); d.append(msg.data); str += sys->sprint("%s\n", d.to_string()); } return str; } Connection.connect(connection: self ref Connection, ip, port, options, parameters : string) : int { sys = load Sys Sys->PATH; bufio = load Bufio Bufio->PATH; addr := sys->sprint("tcp!%s!%s", ip, port); (i, c) := sys->dial(addr, nil); if(i == -1) { return 0; } connection.status = 1; connection.fd = c.dfd; connection.rx = chan of ref Backend_Message; connection.tx = chan of ref Frontend_Message; connection.notices = chan of ref Backend_Message; spawn deal_with_notices(connection); spawn deal_with_incoming(connection); spawn deal_with_outgoing(connection); connection.tx <-= ref Frontend_Message.StartupMessage(0, 196608, connection.user, connection.database, options, parameters); for(msg := <- connection.rx; msg != nil && msg.tag != 'Z'; msg = <- connection.rx); return 1; } Connection.set_parameter(c: self ref Connection, p : ref Parameter) { h : ref Parameter; for(t:=c.parameters; t != nil; t = tl t) { h = hd t; if(h.key == p.key) { h.value = p.value; break; } } if(t == nil) { c.parameters = p :: c.parameters; } } Connection.query(c: self ref Connection, sql : string) : ref Recordset { c.tx <-= ref Frontend_Message.Query('Q', sql); r := ref Recordset(nil, nil); rows : list of array of array of byte = nil; for(msg := <- c.rx; msg.tag != 'Z'; msg = <- c.rx) { pick m := msg { RowDescription => r.fields = m.fields; DataRow => rows = m.columns :: rows; } } r.rows = array[len(rows)] of array of array of byte; for(i := len(rows) -1 ; i >= 0; i--) { r.rows[i] = hd rows; rows = tl rows; } return r; } Connection.parse(c: self ref Connection, name, query : string, data_type_oids : array of int) : int { c.tx <-= ref Frontend_Message.Parse('P', name, query, data_type_oids); c.tx <- = ref Frontend_Message.Sync('S'); for(m := <- c.rx; m.tag != 'E' && m.tag != '1'; m = <- c.rx) c.notices <- = m; if(m.tag == '1') for(m = <- c.rx; m.tag != 'E' && m.tag != 'Z'; m = <- c.rx) c.notices <- = m; return m.tag == 'Z'; } Connection.describe(c: self ref Connection, item_type : byte, name : string) { c.tx <- = ref Frontend_Message.Describe('D', item_type, name); } Connection.execute(c: self ref Connection, portal, name : string, parameter_format_codes : array of int, parameters : array of array of byte, result_format_codes : array of int, rows_to_return : int) : ref Recordset { recordset := ref Recordset(nil, nil); pfc : array of int; if(parameter_format_codes == nil) pfc = array[0] of int; # else # parameter_format_codes = pfc; c.tx <-= ref Frontend_Message.Bind('B', portal, name, pfc, parameters, result_format_codes); c.tx <-= ref Frontend_Message.Flush('H'); for(msg := <- c.rx; msg.tag != '2' && msg.tag != 'E'; msg = <-c.rx) { pick m := msg { RowDescription => recordset.fields = m.fields; } } c.tx <-= ref Frontend_Message.Describe('D', byte 'S', name); c.tx <-= ref Frontend_Message.Flush('H'); # a 't' followed by a 'T' for(msg = <- c.rx; msg.tag == 't' || msg.tag == 'T'; msg = <-c.rx) { pick m := msg { RowDescription => recordset.fields = m.fields; } if(msg.tag == 'T') break; } if(msg.tag != 'T') return nil; c.tx <-= ref Frontend_Message.Execute('E', portal, rows_to_return); c.tx <-= ref Frontend_Message.Sync('S'); rows : list of array of array of byte = nil; for(msg = <- c.rx; msg.tag != 'C' && msg.tag != 'E' && msg.tag != 'Z'; msg = <-c.rx) { pick m := msg { DataRow => rows = m.columns :: rows; } } recordset.rows = array[len(rows)] of array of array of byte; i := len(rows) -1; while(rows != nil) { recordset.rows[i--] = hd rows; rows = tl rows; } return recordset; } Connection.disconnect(c: self ref Connection) { c.status = 0; c.tx <-= ref Frontend_Message.Terminate; c.tx <-= nil; c.notices <- = nil; } peek_at_data(io : ref Iobuf, size : int) { data := ref Data(array[size] of byte, size); io.seek(big(-5), bufio->SEEKRELA); tag := io.getc(); io.seek(big(4), bufio->SEEKRELA); data.ptr = io.read(data.bytes, size); io.seek(big(-data.ptr), bufio->SEEKRELA); sys->print("RX '%c' %s\n", tag, data.to_string()); } deal_with_outgoing(c : ref Connection) { Iobuf : import bufio; d : ref Data; d = nil; tag : int; io := bufio->fopen(c.fd, Bufio->OWRITE); for(f_msg := <- c.tx; f_msg != nil; f_msg = <- c.tx) { pick msg := f_msg { StartupMessage => tag = msg.tag; if(msg.user == nil) return; d = ref Data(int_to_bytes(16r00030000, 4), 4); d.put_notnil_s("user", msg.user); d.put_notnil_s("database", msg.database); d.put_notnil_s("options", msg.options); d.puts(msg.parameters); d.write("\0"); PasswordMessage => tag = msg.tag; d = ref Data(nil, 0); d.puts(msg.password); Query => tag = msg.tag; d = ref Data(nil, 0); d.puts(msg.sql); CopyData => tag = msg.tag; if(msg.data == nil) d = ref Data(nil, 0); else d = ref Data(msg.data, len msg.data); CopyFail => tag = msg.tag; d = ref Data(nil, 0); CopyDone => tag = msg.tag; d = ref Data(nil, 0); Parse => tag = msg.tag; d = ref Data(nil, 0); if(msg.name == nil) d.write("\0"); else d.puts(msg.name); d.puts(msg.query); d.append(int_to_bytes(len msg.data_type_oids, 2)); for(i:= 0; i < len(msg.data_type_oids); i++) d.append(int_to_bytes(msg.data_type_oids[i], 4)); Bind => tag = msg.tag; d = ref Data(nil, 0); if(msg.portal == "") d.write("\0"); else d.puts(msg.portal); if(msg.name == "") d.write("\0"); else d.puts(msg.name); d.append(int_to_bytes(len msg.parameter_format_codes, 2)); i : int; for(i = 0; i < len msg.parameter_format_codes; i++) d.append(int_to_bytes(msg.parameter_format_codes[i], 2)); d.append(int_to_bytes(len msg.parameters, 2)); for(i = 0; i < len msg.parameters; i++) { if(msg.parameters[i] == nil) d.append(int_to_bytes(-1, 4)); else { d.append(int_to_bytes(len msg.parameters[i], 4)); d.append(msg.parameters[i]); } } d.append(int_to_bytes(len msg.result_format_codes, 2)); for(i = 0; i < len msg.result_format_codes; i++) d.append(int_to_bytes(msg.result_format_codes[i], 2)); Execute => tag = msg.tag; d = ref Data(nil, 0); if(msg.portal == nil) d.write("\0"); else d.puts(msg.portal); d.append(int_to_bytes(msg.rows_to_return, 4)); Sync => tag = msg.tag; d = ref Data(nil, 0); Describe => tag = msg.tag; d = ref Data(array[1] of byte, 1); d.bytes[0] = msg.item_type; if(msg.name == nil) d.write("\0"); else d.puts(msg.name); Close => tag = msg.tag; d = ref Data(array[1] of byte, 1); d.bytes[0] = msg.item_type; d.puts(msg.name); Flush => tag = msg.tag; d = ref Data(nil, 0); Terminate => tag = msg.tag; d = ref Data(nil, 0); } if(d != nil) { written := d.send(io, tag); io.flush(); if(debug) sys->print("TX %d '%c': %s\n\n", written, tag, d.to_string()); } else { if(debug) sys->print("Outgoing data was nil, not sent\n"); } } } deal_with_incoming(c : ref Connection) { Iobuf : import bufio; msg : ref Backend_Message; size : int; io := bufio->fopen(c.fd, Bufio->OREAD); for(tag := io.getc(); tag > 0 && c.status > 0; tag = io.getc()) { size = int32_read(io) - 4; # includes self if(debug) peek_at_data(io, size); msg = nil; case tag { 'E' => msg = ref Backend_Message.Error(tag, read_responses(io)); c.notices <-= msg; 'N' => msg = ref Backend_Message.NoticeResponse(tag, read_responses(io)); c.notices <-= msg; 'R' => auth_type := int32_read(io); salt : array of byte = nil; case auth_type { 3 => c.tx <-= ref Frontend_Message.PasswordMessage('p', c.password); 4 => salt = array[2] of byte; io.read(salt, 2); 5 => # doesn't work :=( hash is wrong salt = array[4] of byte; io.read(salt, 4); d := ref Data(nil, 0); d.write(c.password); d.write(c.user); un_pw_digest := d.md5(); d = ref Data(nil, 0); d.write(un_pw_digest); d.append(salt); c.tx <-= ref Frontend_Message.PasswordMessage('p', d.md5()); } msg = ref Backend_Message.Authentication(tag, auth_type, salt); 'S' => p := ref Parameter; if(p.read(io) > 0) c.set_parameter(p); 'K' => c.process_id = int32_read(io); c.key = int32_read(io); 'Z' => msg = ref Backend_Message.ReadyForQuery(tag,io.getc()); 'T' => msg = ref Backend_Message.RowDescription(tag,read_fields(io)); 'D' => num_cols := int16_read(io); columns := array[num_cols] of array of byte; for(i:= 0; i= 0) { columns[i] = array[data_size] of byte; if(data_size > 0) io.read(columns[i], data_size); } else { columns[i] = nil; } } msg = ref Backend_Message.DataRow(tag, columns); 'd' => data := array[size] of byte; io.read(data, size); msg = ref Backend_Message.CopyData(tag, data); 'c' => msg = ref Backend_Message.CopyDone(tag); 'G' => copy_format := byte io.getc(); column_count := int16_read(io); format_codes := array[column_count] of int; for(i := 0; i < column_count; i++) format_codes[i] = int16_read(io); msg = ref Backend_Message.CopyInResponse(tag, copy_format, format_codes); 'H' => copy_format := byte io.getc(); column_count := int16_read(io); format_codes := array[column_count] of int; for(i := 0; i < column_count; i++) format_codes[i] = int16_read(io); msg = ref Backend_Message.CopyOutResponse(tag, copy_format, format_codes); 'C' => oid, rows : int; (numwords, words) := sys->tokenize(chomp(io), " "); cmd := hd words; words = tl words; case numwords { 3 => oid = int hd words; rows = int hd tl words; 2 => oid = 0; rows = int hd words; * => oid = 0; rows = 0; } msg = ref Backend_Message.CommandComplete(tag, cmd, oid, rows); c.notices <-= msg; msg = nil; '1' => msg = ref Backend_Message.ParseComplete(tag); '2' => msg = ref Backend_Message.BindComplete(tag); '3' => msg = ref Backend_Message.CloseComplete(tag); 'I' => msg = ref Backend_Message.EmptyQueryResponse(tag); 's' => msg = ref Backend_Message.PortalSuspended(tag); 'n' => msg = ref Backend_Message.NoData(tag); 't' => oid_count := int16_read(io); oids := array[oid_count] of int; for(i := 0; i < oid_count; i++) oids[i] = int32_read(io); msg = ref Backend_Message.ParameterDescription(tag, oids); * => data := array[size] of byte; io.read(data, size); msg = ref Backend_Message.Unknown(tag, data); } if(msg != nil) { if(debug) sys->print("%s\n\n", msg.to_string()); c.rx <-= msg; } } } deal_with_notices(c : ref Connection) { stderr := sys->fildes(2); for(msg := <-c.notices; msg != nil; msg = <-c.notices) { pick m:= msg { Error => if(response_by_code(m.responses, 'S') == "FATAL") { c.disconnect(); } } if(debug) sys->fprint(stderr, "%s\n", msg.to_string()); } }