// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com) // All rights reserved. // extended.go — Extended Protocol (Parse/Bind/Execute/Describe/ // Sync/Close) support for the pgserver. Without this, pgx clients // using the default QueryExecModeCacheStatement get an "0A000 // not supported" error on every query (Phase 3 saw this — the // integration script had to force SimpleProtocol). // // The implementation is deliberately minimal for v1.0: // * Per-session named statement cache: name → SQL + paramOIDs // * Per-session named portal cache: name → statement + bound params // * Parameter substitution happens client-side at Bind time; we // build a "rewrite" SQL with literals inlined so five_SQL's // existing template-cache pipeline (TFiveSQL.prg's // SqlLexAndExtractTemplate path) can re-parameterise the // query without us having to teach it the PG-wire param shape. // * Describe-statement returns NoData; pgx tolerates this and // issues Describe-portal after Bind, by which point we've run // the query and can emit a real RowDescription. // * Execute streams the cached result set produced at Describe- // portal time so we don't run the same SQL twice. // // Phase 4.1 will replace the literal-substitution rewrite with a // proper `?`-param threading through five_SQL's aParams so binary // params (timestamps, numerics) avoid round-tripping through text. package pgserver import ( "encoding/binary" "fmt" "math" "strconv" "strings" "time" "github.com/jackc/pgx/v5/pgproto3" "five/hbrt" ) // preparedStmt holds a named statement registered via Parse. It // stores the original SQL plus the per-position param OIDs the // client declared (or 0 = "any"). The actual parse happens lazily // at Execute time inside five_SQL. type preparedStmt struct { sql string paramOIDs []uint32 } // portal binds a prepared statement to concrete parameter values. // We materialise the values into a fully-substituted SQL string so // the engine sees a normal Simple-Query-shaped statement; the // result is cached on the portal so Describe-portal can emit a // RowDescription without a second execution. type portal struct { stmt *preparedStmt resolvedSQL string // Cached result, populated on Describe-portal so Execute can // stream without re-running the query. resultArr is the // engine's `aResult` array; once executed, used to derive // RowDescription + DataRow stream. executed bool resultArr *hbrt.HbArray resultErr *errEnvelope } // errEnvelope is the unpacked form of FiveSql2's error sentinel. type errEnvelope struct { code int msg string sql string } // ensureExtendedState lazily allocates the per-session caches. // Called by every extended-protocol handler; the maps stay alive // for the whole session because pgx-style clients re-use unnamed // statements aggressively. func (s *session) ensureExtendedState() { if s.stmts == nil { s.stmts = make(map[string]*preparedStmt) } if s.portals == nil { s.portals = make(map[string]*portal) } } // handleParse stores a named prepared statement. Returns // ParseComplete; any actual parse error from five_SQL is deferred // until Execute so a probe-only Parse + Describe + Sync round-trip // (which pgx does for QueryExecModeDescribeExec) stays cheap. func (s *session) handleParse(m *pgproto3.Parse) { s.ensureExtendedState() s.stmts[m.Name] = &preparedStmt{ sql: m.Query, paramOIDs: append([]uint32(nil), m.ParameterOIDs...), } s.send(&pgproto3.ParseComplete{}) } // handleBind materialises parameter values into the SQL text and // stashes the portal for later Describe/Execute. Binary-format // params are decoded for the OIDs we recognise; otherwise the raw // bytes are passed through as a quoted text literal (good enough // for the common SELECT/INSERT-with-int/string cases). func (s *session) handleBind(m *pgproto3.Bind) { s.ensureExtendedState() stmt := s.stmts[m.PreparedStatement] if stmt == nil { s.send(buildErrorResponse("26000", fmt.Sprintf("prepared statement %q does not exist", m.PreparedStatement), "")) return } resolved, err := substituteParams(stmt.sql, stmt.paramOIDs, m.Parameters, m.ParameterFormatCodes) if err != nil { s.send(buildErrorResponse("42P02", err.Error(), stmt.sql)) return } s.portals[m.DestinationPortal] = &portal{stmt: stmt, resolvedSQL: resolved} s.send(&pgproto3.BindComplete{}) } // handleDescribe answers Describe for either a statement or a // portal. ObjectType 'S' = statement, 'P' = portal. // // Statement Describe is hard to answer fully without running the // query (we'd need column-type inference). For v1.0 we return // NoData + ParameterDescription with the declared OIDs; pgx // tolerates this and re-Describes the portal after Bind. // // Portal Describe runs the query immediately, caches the result // on the portal, and emits RowDescription from the actual result // columns. Execute later just streams from the cache. func (s *session) handleDescribe(m *pgproto3.Describe) { s.ensureExtendedState() switch m.ObjectType { case 'S': stmt := s.stmts[m.Name] if stmt == nil { s.send(buildErrorResponse("26000", fmt.Sprintf("prepared statement %q does not exist", m.Name), "")) return } // pgx-style clients send Parse without explicit OIDs and // expect the server to infer them from the SQL. Count the // `$N` placeholders and pad paramOIDs with OID 0 ("any") // for any that the client didn't pre-declare — otherwise // pgx errors with "expected 0 arguments, got N" before // the Bind even reaches us. nParams := countPgPlaceholders(stmt.sql) oids := stmt.paramOIDs for len(oids) < nParams { oids = append(oids, 0) } stmt.paramOIDs = oids s.send(&pgproto3.ParameterDescription{ParameterOIDs: oids}) // NoData — we don't know the row shape until execution. // pgx tolerates this and asks again via Describe-portal. s.send(&pgproto3.NoData{}) case 'P': p := s.portals[m.Name] if p == nil { s.send(buildErrorResponse("34000", fmt.Sprintf("portal %q does not exist", m.Name), "")) return } s.executePortal(p) if p.resultErr != nil { s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql)) return } s.sendRowDescriptionFromArr(p.resultArr) } } // handleExecute streams the cached result of the portal as a // DataRow sequence then CommandComplete. If the portal hasn't // been executed yet (Describe-portal was skipped), execute now. // // pgx's default QueryExecModeCacheStatement skips Describe-portal // and only does Describe-statement once at prepare time. Since // Describe-statement returns NoData (we don't know the row shape // without executing), pgx never gets a RowDescription unless we // emit one here too. We emit it conditionally: only when the // portal wasn't already Described (described==false) so we don't // duplicate the descriptor for clients that *did* call Describe-P. func (s *session) handleExecute(m *pgproto3.Execute) { s.ensureExtendedState() p := s.portals[m.Portal] if p == nil { s.send(buildErrorResponse("34000", fmt.Sprintf("portal %q does not exist", m.Portal), "")) return } wasDescribed := p.executed s.executePortal(p) if p.resultErr != nil { s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql)) s.txStatus = currentTxStatusAfterError(s.txStatus) return } // If Describe-portal was never called, the client is still // waiting for column metadata. Emit it now from the cached // result so DataRow has a context the client can decode. if !wasDescribed { s.sendRowDescriptionFromArr(p.resultArr) } s.streamPortalRows(p, int(m.MaxRows)) tag := commandTagFor(p.resolvedSQL) s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)}) s.updateTxStatusForTag(tag) } // handleClose drops a prepared statement or portal entry. func (s *session) handleClose(m *pgproto3.Close) { s.ensureExtendedState() switch m.ObjectType { case 'S': delete(s.stmts, m.Name) case 'P': delete(s.portals, m.Name) } s.send(&pgproto3.CloseComplete{}) } // handleSync flushes any pending output and emits ReadyForQuery, // closing one cycle of the extended-protocol exchange. func (s *session) handleSync() { s.sendReadyForQuery() } // executePortal runs the resolved SQL through five_SQL once and // caches the result on the portal. Idempotent — repeated calls // short-circuit on `executed`. pg_catalog probes (BI-tool metadata // queries) are intercepted before the engine call so DBeaver / // Tableau / DataGrip can negotiate without erroring out. func (s *session) executePortal(p *portal) { if p.executed { return } p.executed = true if handled, val := s.catalogIntercept(p.resolvedSQL); handled { p.resultArr = val.AsArray() return } res, err := s.runSQL(p.resolvedSQL) if err != nil { p.resultErr = &errEnvelope{code: 0, msg: err.Error(), sql: p.resolvedSQL} return } if res.IsNil() || !res.IsArray() { // Non-result statement (DDL, transaction control). Synthesise // an empty result envelope so the streaming path emits an // empty RowDescription + CommandComplete cleanly. p.resultArr = &hbrt.HbArray{Items: []hbrt.Value{ hbrt.MakeArrayFrom([]hbrt.Value{}), hbrt.MakeArrayFrom([]hbrt.Value{}), }} return } arr := res.AsArray() if isErrorEnvelope(arr) { code, msg, sql := unpackError(arr) p.resultErr = &errEnvelope{code: code, msg: msg, sql: sql} return } p.resultArr = arr } // sendRowDescriptionFromArr emits a RowDescription derived from // the engine's `{aFieldNames, aRows}` envelope. Mirrors what // emitResultSet (Simple Query) does on the field-description side // so the wire shape is identical regardless of which protocol the // client picks. func (s *session) sendRowDescriptionFromArr(arr *hbrt.HbArray) { if arr == nil || len(arr.Items) < 1 || !arr.Items[0].IsArray() { s.send(&pgproto3.RowDescription{Fields: nil}) return } fields := arr.Items[0].AsArray().Items var firstRow []hbrt.Value if len(arr.Items) >= 2 && arr.Items[1].IsArray() { if rows := arr.Items[1].AsArray().Items; len(rows) > 0 && rows[0].IsArray() { firstRow = rows[0].AsArray().Items } } descFields := make([]pgproto3.FieldDescription, len(fields)) for i, f := range fields { name := "" if f.IsString() { name = f.AsString() } else { name = fmt.Sprintf("column%d", i+1) } var sample hbrt.Value if i < len(firstRow) { sample = firstRow[i] } oid, typeSize := pgTypeFor(sample) descFields[i] = pgproto3.FieldDescription{ Name: []byte(name), DataTypeOID: uint32(oid), DataTypeSize: typeSize, TypeModifier: -1, Format: 0, } } s.send(&pgproto3.RowDescription{Fields: descFields}) } // streamPortalRows iterates the portal's cached rows and emits a // DataRow per row, respecting maxRows (0 = unlimited). func (s *session) streamPortalRows(p *portal, maxRows int) { if p.resultArr == nil || len(p.resultArr.Items) < 2 { return } rowsVal := p.resultArr.Items[1] if !rowsVal.IsArray() { return } rows := rowsVal.AsArray().Items fieldCount := 0 if p.resultArr.Items[0].IsArray() { fieldCount = len(p.resultArr.Items[0].AsArray().Items) } emitted := 0 for _, rowVal := range rows { if maxRows > 0 && emitted >= maxRows { break } if !rowVal.IsArray() { continue } cells := rowVal.AsArray().Items out := make([][]byte, fieldCount) for i := 0; i < fieldCount; i++ { if i < len(cells) { out[i] = encodeText(cells[i]) } } s.send(&pgproto3.DataRow{Values: out}) emitted++ } } // substituteParams produces a Simple-Query-shaped SQL by inlining // the bound parameter values as PG-style literals. Format codes: // 0 = text, 1 = binary. We support text for all OIDs and binary // for INT2/INT4/INT8/FLOAT4/FLOAT8/BOOL — the rest fall back to // hex-escaped strings, which works for VARCHAR but loses fidelity // for binary timestamps until Phase 4.1. // // pgx defaults to binary for ints + text for everything else, so // the common case is well covered. func substituteParams(sql string, oids []uint32, params [][]byte, formats []int16) (string, error) { // Each `$1`/`$2`/… placeholder maps to params[i-1]. We do a // linear scan rather than regex to avoid quoting pitfalls // (strings containing literal `$1` are rare in practice but // the scanner respects single-quoted runs). var out strings.Builder i := 0 inStr := byte(0) for i < len(sql) { c := sql[i] if inStr != 0 { out.WriteByte(c) if c == inStr { inStr = 0 } i++ continue } if c == '\'' || c == '"' { inStr = c out.WriteByte(c) i++ continue } if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' { j := i + 1 for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' { j++ } idx, err := strconv.Atoi(sql[i+1 : j]) if err != nil || idx < 1 || idx > len(params) { out.WriteByte(c) i++ continue } pidx := idx - 1 oid := uint32(0) if pidx < len(oids) { oid = oids[pidx] } format := int16(0) if pidx < len(formats) { format = formats[pidx] } else if len(formats) == 1 { format = formats[0] // single format code applies to all } lit, err := paramToLiteral(params[pidx], oid, format) if err != nil { return "", err } out.WriteString(lit) i = j continue } out.WriteByte(c) i++ } return out.String(), nil } // countPgPlaceholders scans SQL for the highest $N placeholder // outside string literals and returns that count. Matches what // substituteParams resolves at Bind time. Returns 0 for SQL with // no placeholders. Order of placeholders doesn't matter for the // count — repeated `$1` still counts as 1 param slot. func countPgPlaceholders(sql string) int { max := 0 inStr := byte(0) for i := 0; i < len(sql); i++ { c := sql[i] if inStr != 0 { if c == inStr { inStr = 0 } continue } if c == '\'' || c == '"' { inStr = c continue } if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' { j := i + 1 for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' { j++ } if n, err := strconv.Atoi(sql[i+1 : j]); err == nil && n > max { max = n } i = j - 1 } } return max } func paramToLiteral(raw []byte, oid uint32, format int16) (string, error) { if raw == nil { return "NULL", nil } if format == 0 { // Text format — quote per type. Numerics + bools transit // unquoted; everything else (including DATE / TIMESTAMP in // text form) gets single-quoted with embedded-quote escape. switch oid { case oidInt2, oidInt4, oidInt8, oidBool, oidNumeric, oidFloat4, oidFloat8: return string(raw), nil default: return "'" + strings.ReplaceAll(string(raw), "'", "''") + "'", nil } } // Binary format. pgx defaults to binary for INT*, FLOAT*, BOOL, // NUMERIC, DATE, TIMESTAMP, TIMESTAMPTZ — decode each into a // FiveSql2-shaped literal that the engine's lexer can re-parse. switch oid { case oidInt2: if len(raw) != 2 { return "", fmt.Errorf("int2 param: want 2 bytes, got %d", len(raw)) } return strconv.FormatInt(int64(int16(binary.BigEndian.Uint16(raw))), 10), nil case oidInt4: if len(raw) != 4 { return "", fmt.Errorf("int4 param: want 4 bytes, got %d", len(raw)) } return strconv.FormatInt(int64(int32(binary.BigEndian.Uint32(raw))), 10), nil case oidInt8: if len(raw) != 8 { return "", fmt.Errorf("int8 param: want 8 bytes, got %d", len(raw)) } return strconv.FormatInt(int64(binary.BigEndian.Uint64(raw)), 10), nil case oidFloat4: if len(raw) != 4 { return "", fmt.Errorf("float4 param: want 4 bytes, got %d", len(raw)) } f := math.Float32frombits(binary.BigEndian.Uint32(raw)) return strconv.FormatFloat(float64(f), 'g', -1, 32), nil case oidFloat8: if len(raw) != 8 { return "", fmt.Errorf("float8 param: want 8 bytes, got %d", len(raw)) } f := math.Float64frombits(binary.BigEndian.Uint64(raw)) return strconv.FormatFloat(f, 'g', -1, 64), nil case oidBool: if len(raw) != 1 { return "", fmt.Errorf("bool param: want 1 byte, got %d", len(raw)) } if raw[0] == 0 { return "FALSE", nil } return "TRUE", nil case oidNumeric: s, err := decodeBinaryNumeric(raw) if err != nil { return "", err } return s, nil case oidDate: s, err := decodeBinaryDate(raw) if err != nil { return "", err } return "'" + s + "'", nil case oidTimestamp, oidTimestamptz: s, err := decodeBinaryTimestamp(raw) if err != nil { return "", err } return "'" + s + "'", nil default: // Unknown binary OID — fall back to a quoted hex literal. // FiveSql2 won't accept this directly, but the resulting // error is at least diagnosable rather than a silent miss. return "'\\x" + fmt.Sprintf("%x", raw) + "'", nil } } // decodeBinaryNumeric converts PostgreSQL's binary NUMERIC wire // format (RFC-independent — see PG source utils/adt/numeric.c // numeric_send / numeric_recv) to a plain decimal string. The // format is: // // int16 ndigits number of base-10000 "digits" // int16 weight weight of the first digit, in base-10000 units // uint16 sign 0x0000 positive, 0x4000 negative, 0xC000 NaN // uint16 dscale displayed scale (decimal places to show) // int16 digits[ndigits] each in 0..9999 // // The numeric value equals sign × Σ d[i] × 10000^(weight − i). // // Output is a FiveSql2-parseable decimal literal — unquoted, no // scientific notation, with exactly `dscale` digits after the // decimal point so round-trip width is preserved. func decodeBinaryNumeric(raw []byte) (string, error) { if len(raw) < 8 { return "", fmt.Errorf("numeric param: header too short (%d bytes)", len(raw)) } ndigits := int16(binary.BigEndian.Uint16(raw[0:2])) weight := int16(binary.BigEndian.Uint16(raw[2:4])) sign := binary.BigEndian.Uint16(raw[4:6]) dscale := int16(binary.BigEndian.Uint16(raw[6:8])) if int(ndigits)*2+8 != len(raw) { return "", fmt.Errorf("numeric param: digit count mismatch (ndigits=%d, body=%d)", ndigits, len(raw)-8) } if sign == 0xC000 { return "NaN", nil } digs := make([]uint16, ndigits) for i := 0; i < int(ndigits); i++ { digs[i] = binary.BigEndian.Uint16(raw[8+i*2 : 10+i*2]) } var sb strings.Builder if sign == 0x4000 { sb.WriteByte('-') } // Integer part: weight+1 base-10000 digits. If weight is // negative the integer part is just "0". intDigits := int(weight) + 1 if intDigits <= 0 { sb.WriteByte('0') } else { for i := 0; i < intDigits; i++ { var d uint16 if i < int(ndigits) { d = digs[i] } if i == 0 { fmt.Fprintf(&sb, "%d", d) } else { fmt.Fprintf(&sb, "%04d", d) } } } if dscale > 0 { sb.WriteByte('.') // Build the fractional digit string. When weight < -1, the // first array digit is already several base-10000 positions // past the decimal point — pad with "0000" groups for those // missing leading-zero positions. var frac strings.Builder leadingZeroGroups := 0 if intDigits < 0 { leadingZeroGroups = -intDigits } for i := 0; i < leadingZeroGroups; i++ { frac.WriteString("0000") } fracStart := intDigits if fracStart < 0 { fracStart = 0 } for i := fracStart; i < int(ndigits); i++ { fmt.Fprintf(&frac, "%04d", digs[i]) } s := frac.String() if len(s) >= int(dscale) { sb.WriteString(s[:dscale]) } else { sb.WriteString(s) sb.WriteString(strings.Repeat("0", int(dscale)-len(s))) } } return sb.String(), nil } // decodeBinaryDate converts a PG binary DATE (4-byte signed days // since pgEpoch = 2000-01-01) to "YYYY-MM-DD". func decodeBinaryDate(raw []byte) (string, error) { if len(raw) != 4 { return "", fmt.Errorf("date param: want 4 bytes, got %d", len(raw)) } days := int32(binary.BigEndian.Uint32(raw)) t := pgEpoch.AddDate(0, 0, int(days)) return t.Format("2006-01-02"), nil } // decodeBinaryTimestamp converts a PG binary TIMESTAMP / TIMESTAMPTZ // (8-byte signed microseconds since pgEpoch = 2000-01-01) to // "YYYY-MM-DD HH:MM:SS.ffffff". Encoding assumes integer_datetimes // = on; we advertise that in ParameterStatus on connect so clients // won't send the floating-point variant. func decodeBinaryTimestamp(raw []byte) (string, error) { if len(raw) != 8 { return "", fmt.Errorf("timestamp param: want 8 bytes, got %d", len(raw)) } us := int64(binary.BigEndian.Uint64(raw)) t := pgEpoch.Add(time.Duration(us) * time.Microsecond) return t.Format("2006-01-02 15:04:05.000000"), nil }