Files
five/hbrtl/pgserver/extended.go
CharlesKWON d7a81af7db feat(pgserver): binary-format param decoding (Phase 4.1)
pgx defaults to binary wire format for INT2/INT4/INT8/FLOAT4/FLOAT8/
BOOL/NUMERIC/DATE/TIMESTAMP/TIMESTAMPTZ — Go's most-used PG driver
ships nearly every typed parameter as binary unless explicitly told
to use text mode. The Phase 3 implementation only decoded INT4/INT8/
BOOL, so any pgx call with a decimal price, a timestamp, or a date
was silently mis-quoted into the SQL stream.

Decoders now cover the seven additional OIDs. The interesting one is
NUMERIC: PG's wire format is base-10000 digit groups plus a separate
displayed-scale, so the decoder rebuilds the decimal string from
weight+sign+ndigits+digits[] without going through float (which would
lose precision for NUMERIC(38,*) values). Pinned by vectors covering
zero / positive / negative / fractional-only / NaN / multi-group
integer + fraction cases.

DATE / TIMESTAMP decoders assume integer_datetimes=on (which the
server advertises in ParameterStatus); the 8-byte microsecond delta
from the PG epoch (2000-01-01 UTC) is converted via Go's time.Time
machinery and re-emitted as a quoted SQL literal.

Text-format path also broadened: FLOAT4/FLOAT8/INT2 now transit
unquoted alongside INT4/INT8/BOOL/NUMERIC; the regression would have
been clients sending text-format floats getting them rewritten as
'1.5' (string literal) instead of 1.5 (numeric).

Verified: all 6 mandatory gates green (go test, SQL 43/43, compat
56/56, std.ch 17/17, FRB 7/7, pgserver 11/11). Five new decoder
tests pin each wire format against handcrafted PG payloads.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 10:02:15 +09:00

646 lines
20 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com)
// All rights reserved.
// extended.go — Extended Protocol (Parse/Bind/Execute/Describe/
// Sync/Close) support for the pgserver. Without this, pgx clients
// using the default QueryExecModeCacheStatement get an "0A000
// not supported" error on every query (Phase 3 saw this — the
// integration script had to force SimpleProtocol).
//
// The implementation is deliberately minimal for v1.0:
// * Per-session named statement cache: name → SQL + paramOIDs
// * Per-session named portal cache: name → statement + bound params
// * Parameter substitution happens client-side at Bind time; we
// build a "rewrite" SQL with literals inlined so five_SQL's
// existing template-cache pipeline (TFiveSQL.prg's
// SqlLexAndExtractTemplate path) can re-parameterise the
// query without us having to teach it the PG-wire param shape.
// * Describe-statement returns NoData; pgx tolerates this and
// issues Describe-portal after Bind, by which point we've run
// the query and can emit a real RowDescription.
// * Execute streams the cached result set produced at Describe-
// portal time so we don't run the same SQL twice.
//
// Phase 4.1 will replace the literal-substitution rewrite with a
// proper `?`-param threading through five_SQL's aParams so binary
// params (timestamps, numerics) avoid round-tripping through text.
package pgserver
import (
"encoding/binary"
"fmt"
"math"
"strconv"
"strings"
"time"
"github.com/jackc/pgx/v5/pgproto3"
"five/hbrt"
)
// preparedStmt holds a named statement registered via Parse. It
// stores the original SQL plus the per-position param OIDs the
// client declared (or 0 = "any"). The actual parse happens lazily
// at Execute time inside five_SQL.
type preparedStmt struct {
sql string
paramOIDs []uint32
}
// portal binds a prepared statement to concrete parameter values.
// We materialise the values into a fully-substituted SQL string so
// the engine sees a normal Simple-Query-shaped statement; the
// result is cached on the portal so Describe-portal can emit a
// RowDescription without a second execution.
type portal struct {
stmt *preparedStmt
resolvedSQL string
// Cached result, populated on Describe-portal so Execute can
// stream without re-running the query. resultArr is the
// engine's `aResult` array; once executed, used to derive
// RowDescription + DataRow stream.
executed bool
resultArr *hbrt.HbArray
resultErr *errEnvelope
}
// errEnvelope is the unpacked form of FiveSql2's error sentinel.
type errEnvelope struct {
code int
msg string
sql string
}
// ensureExtendedState lazily allocates the per-session caches.
// Called by every extended-protocol handler; the maps stay alive
// for the whole session because pgx-style clients re-use unnamed
// statements aggressively.
func (s *session) ensureExtendedState() {
if s.stmts == nil {
s.stmts = make(map[string]*preparedStmt)
}
if s.portals == nil {
s.portals = make(map[string]*portal)
}
}
// handleParse stores a named prepared statement. Returns
// ParseComplete; any actual parse error from five_SQL is deferred
// until Execute so a probe-only Parse + Describe + Sync round-trip
// (which pgx does for QueryExecModeDescribeExec) stays cheap.
func (s *session) handleParse(m *pgproto3.Parse) {
s.ensureExtendedState()
s.stmts[m.Name] = &preparedStmt{
sql: m.Query,
paramOIDs: append([]uint32(nil), m.ParameterOIDs...),
}
s.send(&pgproto3.ParseComplete{})
}
// handleBind materialises parameter values into the SQL text and
// stashes the portal for later Describe/Execute. Binary-format
// params are decoded for the OIDs we recognise; otherwise the raw
// bytes are passed through as a quoted text literal (good enough
// for the common SELECT/INSERT-with-int/string cases).
func (s *session) handleBind(m *pgproto3.Bind) {
s.ensureExtendedState()
stmt := s.stmts[m.PreparedStatement]
if stmt == nil {
s.send(buildErrorResponse("26000",
fmt.Sprintf("prepared statement %q does not exist", m.PreparedStatement), ""))
return
}
resolved, err := substituteParams(stmt.sql, stmt.paramOIDs, m.Parameters, m.ParameterFormatCodes)
if err != nil {
s.send(buildErrorResponse("42P02", err.Error(), stmt.sql))
return
}
s.portals[m.DestinationPortal] = &portal{stmt: stmt, resolvedSQL: resolved}
s.send(&pgproto3.BindComplete{})
}
// handleDescribe answers Describe for either a statement or a
// portal. ObjectType 'S' = statement, 'P' = portal.
//
// Statement Describe is hard to answer fully without running the
// query (we'd need column-type inference). For v1.0 we return
// NoData + ParameterDescription with the declared OIDs; pgx
// tolerates this and re-Describes the portal after Bind.
//
// Portal Describe runs the query immediately, caches the result
// on the portal, and emits RowDescription from the actual result
// columns. Execute later just streams from the cache.
func (s *session) handleDescribe(m *pgproto3.Describe) {
s.ensureExtendedState()
switch m.ObjectType {
case 'S':
stmt := s.stmts[m.Name]
if stmt == nil {
s.send(buildErrorResponse("26000",
fmt.Sprintf("prepared statement %q does not exist", m.Name), ""))
return
}
// pgx-style clients send Parse without explicit OIDs and
// expect the server to infer them from the SQL. Count the
// `$N` placeholders and pad paramOIDs with OID 0 ("any")
// for any that the client didn't pre-declare — otherwise
// pgx errors with "expected 0 arguments, got N" before
// the Bind even reaches us.
nParams := countPgPlaceholders(stmt.sql)
oids := stmt.paramOIDs
for len(oids) < nParams {
oids = append(oids, 0)
}
stmt.paramOIDs = oids
s.send(&pgproto3.ParameterDescription{ParameterOIDs: oids})
// NoData — we don't know the row shape until execution.
// pgx tolerates this and asks again via Describe-portal.
s.send(&pgproto3.NoData{})
case 'P':
p := s.portals[m.Name]
if p == nil {
s.send(buildErrorResponse("34000",
fmt.Sprintf("portal %q does not exist", m.Name), ""))
return
}
s.executePortal(p)
if p.resultErr != nil {
s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql))
return
}
s.sendRowDescriptionFromArr(p.resultArr)
}
}
// handleExecute streams the cached result of the portal as a
// DataRow sequence then CommandComplete. If the portal hasn't
// been executed yet (Describe-portal was skipped), execute now.
//
// pgx's default QueryExecModeCacheStatement skips Describe-portal
// and only does Describe-statement once at prepare time. Since
// Describe-statement returns NoData (we don't know the row shape
// without executing), pgx never gets a RowDescription unless we
// emit one here too. We emit it conditionally: only when the
// portal wasn't already Described (described==false) so we don't
// duplicate the descriptor for clients that *did* call Describe-P.
func (s *session) handleExecute(m *pgproto3.Execute) {
s.ensureExtendedState()
p := s.portals[m.Portal]
if p == nil {
s.send(buildErrorResponse("34000",
fmt.Sprintf("portal %q does not exist", m.Portal), ""))
return
}
wasDescribed := p.executed
s.executePortal(p)
if p.resultErr != nil {
s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql))
s.txStatus = currentTxStatusAfterError(s.txStatus)
return
}
// If Describe-portal was never called, the client is still
// waiting for column metadata. Emit it now from the cached
// result so DataRow has a context the client can decode.
if !wasDescribed {
s.sendRowDescriptionFromArr(p.resultArr)
}
s.streamPortalRows(p, int(m.MaxRows))
tag := commandTagFor(p.resolvedSQL)
s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)})
s.updateTxStatusForTag(tag)
}
// handleClose drops a prepared statement or portal entry.
func (s *session) handleClose(m *pgproto3.Close) {
s.ensureExtendedState()
switch m.ObjectType {
case 'S':
delete(s.stmts, m.Name)
case 'P':
delete(s.portals, m.Name)
}
s.send(&pgproto3.CloseComplete{})
}
// handleSync flushes any pending output and emits ReadyForQuery,
// closing one cycle of the extended-protocol exchange.
func (s *session) handleSync() {
s.sendReadyForQuery()
}
// executePortal runs the resolved SQL through five_SQL once and
// caches the result on the portal. Idempotent — repeated calls
// short-circuit on `executed`. pg_catalog probes (BI-tool metadata
// queries) are intercepted before the engine call so DBeaver /
// Tableau / DataGrip can negotiate without erroring out.
func (s *session) executePortal(p *portal) {
if p.executed {
return
}
p.executed = true
if handled, val := s.catalogIntercept(p.resolvedSQL); handled {
p.resultArr = val.AsArray()
return
}
res, err := s.runSQL(p.resolvedSQL)
if err != nil {
p.resultErr = &errEnvelope{code: 0, msg: err.Error(), sql: p.resolvedSQL}
return
}
if res.IsNil() || !res.IsArray() {
// Non-result statement (DDL, transaction control). Synthesise
// an empty result envelope so the streaming path emits an
// empty RowDescription + CommandComplete cleanly.
p.resultArr = &hbrt.HbArray{Items: []hbrt.Value{
hbrt.MakeArrayFrom([]hbrt.Value{}),
hbrt.MakeArrayFrom([]hbrt.Value{}),
}}
return
}
arr := res.AsArray()
if isErrorEnvelope(arr) {
code, msg, sql := unpackError(arr)
p.resultErr = &errEnvelope{code: code, msg: msg, sql: sql}
return
}
p.resultArr = arr
}
// sendRowDescriptionFromArr emits a RowDescription derived from
// the engine's `{aFieldNames, aRows}` envelope. Mirrors what
// emitResultSet (Simple Query) does on the field-description side
// so the wire shape is identical regardless of which protocol the
// client picks.
func (s *session) sendRowDescriptionFromArr(arr *hbrt.HbArray) {
if arr == nil || len(arr.Items) < 1 || !arr.Items[0].IsArray() {
s.send(&pgproto3.RowDescription{Fields: nil})
return
}
fields := arr.Items[0].AsArray().Items
var firstRow []hbrt.Value
if len(arr.Items) >= 2 && arr.Items[1].IsArray() {
if rows := arr.Items[1].AsArray().Items; len(rows) > 0 && rows[0].IsArray() {
firstRow = rows[0].AsArray().Items
}
}
descFields := make([]pgproto3.FieldDescription, len(fields))
for i, f := range fields {
name := ""
if f.IsString() {
name = f.AsString()
} else {
name = fmt.Sprintf("column%d", i+1)
}
var sample hbrt.Value
if i < len(firstRow) {
sample = firstRow[i]
}
oid, typeSize := pgTypeFor(sample)
descFields[i] = pgproto3.FieldDescription{
Name: []byte(name),
DataTypeOID: uint32(oid),
DataTypeSize: typeSize,
TypeModifier: -1,
Format: 0,
}
}
s.send(&pgproto3.RowDescription{Fields: descFields})
}
// streamPortalRows iterates the portal's cached rows and emits a
// DataRow per row, respecting maxRows (0 = unlimited).
func (s *session) streamPortalRows(p *portal, maxRows int) {
if p.resultArr == nil || len(p.resultArr.Items) < 2 {
return
}
rowsVal := p.resultArr.Items[1]
if !rowsVal.IsArray() {
return
}
rows := rowsVal.AsArray().Items
fieldCount := 0
if p.resultArr.Items[0].IsArray() {
fieldCount = len(p.resultArr.Items[0].AsArray().Items)
}
emitted := 0
for _, rowVal := range rows {
if maxRows > 0 && emitted >= maxRows {
break
}
if !rowVal.IsArray() {
continue
}
cells := rowVal.AsArray().Items
out := make([][]byte, fieldCount)
for i := 0; i < fieldCount; i++ {
if i < len(cells) {
out[i] = encodeText(cells[i])
}
}
s.send(&pgproto3.DataRow{Values: out})
emitted++
}
}
// substituteParams produces a Simple-Query-shaped SQL by inlining
// the bound parameter values as PG-style literals. Format codes:
// 0 = text, 1 = binary. We support text for all OIDs and binary
// for INT2/INT4/INT8/FLOAT4/FLOAT8/BOOL — the rest fall back to
// hex-escaped strings, which works for VARCHAR but loses fidelity
// for binary timestamps until Phase 4.1.
//
// pgx defaults to binary for ints + text for everything else, so
// the common case is well covered.
func substituteParams(sql string, oids []uint32, params [][]byte, formats []int16) (string, error) {
// Each `$1`/`$2`/… placeholder maps to params[i-1]. We do a
// linear scan rather than regex to avoid quoting pitfalls
// (strings containing literal `$1` are rare in practice but
// the scanner respects single-quoted runs).
var out strings.Builder
i := 0
inStr := byte(0)
for i < len(sql) {
c := sql[i]
if inStr != 0 {
out.WriteByte(c)
if c == inStr {
inStr = 0
}
i++
continue
}
if c == '\'' || c == '"' {
inStr = c
out.WriteByte(c)
i++
continue
}
if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' {
j := i + 1
for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' {
j++
}
idx, err := strconv.Atoi(sql[i+1 : j])
if err != nil || idx < 1 || idx > len(params) {
out.WriteByte(c)
i++
continue
}
pidx := idx - 1
oid := uint32(0)
if pidx < len(oids) {
oid = oids[pidx]
}
format := int16(0)
if pidx < len(formats) {
format = formats[pidx]
} else if len(formats) == 1 {
format = formats[0] // single format code applies to all
}
lit, err := paramToLiteral(params[pidx], oid, format)
if err != nil {
return "", err
}
out.WriteString(lit)
i = j
continue
}
out.WriteByte(c)
i++
}
return out.String(), nil
}
// countPgPlaceholders scans SQL for the highest $N placeholder
// outside string literals and returns that count. Matches what
// substituteParams resolves at Bind time. Returns 0 for SQL with
// no placeholders. Order of placeholders doesn't matter for the
// count — repeated `$1` still counts as 1 param slot.
func countPgPlaceholders(sql string) int {
max := 0
inStr := byte(0)
for i := 0; i < len(sql); i++ {
c := sql[i]
if inStr != 0 {
if c == inStr {
inStr = 0
}
continue
}
if c == '\'' || c == '"' {
inStr = c
continue
}
if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' {
j := i + 1
for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' {
j++
}
if n, err := strconv.Atoi(sql[i+1 : j]); err == nil && n > max {
max = n
}
i = j - 1
}
}
return max
}
func paramToLiteral(raw []byte, oid uint32, format int16) (string, error) {
if raw == nil {
return "NULL", nil
}
if format == 0 {
// Text format — quote per type. Numerics + bools transit
// unquoted; everything else (including DATE / TIMESTAMP in
// text form) gets single-quoted with embedded-quote escape.
switch oid {
case oidInt2, oidInt4, oidInt8, oidBool, oidNumeric, oidFloat4, oidFloat8:
return string(raw), nil
default:
return "'" + strings.ReplaceAll(string(raw), "'", "''") + "'", nil
}
}
// Binary format. pgx defaults to binary for INT*, FLOAT*, BOOL,
// NUMERIC, DATE, TIMESTAMP, TIMESTAMPTZ — decode each into a
// FiveSql2-shaped literal that the engine's lexer can re-parse.
switch oid {
case oidInt2:
if len(raw) != 2 {
return "", fmt.Errorf("int2 param: want 2 bytes, got %d", len(raw))
}
return strconv.FormatInt(int64(int16(binary.BigEndian.Uint16(raw))), 10), nil
case oidInt4:
if len(raw) != 4 {
return "", fmt.Errorf("int4 param: want 4 bytes, got %d", len(raw))
}
return strconv.FormatInt(int64(int32(binary.BigEndian.Uint32(raw))), 10), nil
case oidInt8:
if len(raw) != 8 {
return "", fmt.Errorf("int8 param: want 8 bytes, got %d", len(raw))
}
return strconv.FormatInt(int64(binary.BigEndian.Uint64(raw)), 10), nil
case oidFloat4:
if len(raw) != 4 {
return "", fmt.Errorf("float4 param: want 4 bytes, got %d", len(raw))
}
f := math.Float32frombits(binary.BigEndian.Uint32(raw))
return strconv.FormatFloat(float64(f), 'g', -1, 32), nil
case oidFloat8:
if len(raw) != 8 {
return "", fmt.Errorf("float8 param: want 8 bytes, got %d", len(raw))
}
f := math.Float64frombits(binary.BigEndian.Uint64(raw))
return strconv.FormatFloat(f, 'g', -1, 64), nil
case oidBool:
if len(raw) != 1 {
return "", fmt.Errorf("bool param: want 1 byte, got %d", len(raw))
}
if raw[0] == 0 {
return "FALSE", nil
}
return "TRUE", nil
case oidNumeric:
s, err := decodeBinaryNumeric(raw)
if err != nil {
return "", err
}
return s, nil
case oidDate:
s, err := decodeBinaryDate(raw)
if err != nil {
return "", err
}
return "'" + s + "'", nil
case oidTimestamp, oidTimestamptz:
s, err := decodeBinaryTimestamp(raw)
if err != nil {
return "", err
}
return "'" + s + "'", nil
default:
// Unknown binary OID — fall back to a quoted hex literal.
// FiveSql2 won't accept this directly, but the resulting
// error is at least diagnosable rather than a silent miss.
return "'\\x" + fmt.Sprintf("%x", raw) + "'", nil
}
}
// decodeBinaryNumeric converts PostgreSQL's binary NUMERIC wire
// format (RFC-independent — see PG source utils/adt/numeric.c
// numeric_send / numeric_recv) to a plain decimal string. The
// format is:
//
// int16 ndigits number of base-10000 "digits"
// int16 weight weight of the first digit, in base-10000 units
// uint16 sign 0x0000 positive, 0x4000 negative, 0xC000 NaN
// uint16 dscale displayed scale (decimal places to show)
// int16 digits[ndigits] each in 0..9999
//
// The numeric value equals sign × Σ d[i] × 10000^(weight i).
//
// Output is a FiveSql2-parseable decimal literal — unquoted, no
// scientific notation, with exactly `dscale` digits after the
// decimal point so round-trip width is preserved.
func decodeBinaryNumeric(raw []byte) (string, error) {
if len(raw) < 8 {
return "", fmt.Errorf("numeric param: header too short (%d bytes)", len(raw))
}
ndigits := int16(binary.BigEndian.Uint16(raw[0:2]))
weight := int16(binary.BigEndian.Uint16(raw[2:4]))
sign := binary.BigEndian.Uint16(raw[4:6])
dscale := int16(binary.BigEndian.Uint16(raw[6:8]))
if int(ndigits)*2+8 != len(raw) {
return "", fmt.Errorf("numeric param: digit count mismatch (ndigits=%d, body=%d)", ndigits, len(raw)-8)
}
if sign == 0xC000 {
return "NaN", nil
}
digs := make([]uint16, ndigits)
for i := 0; i < int(ndigits); i++ {
digs[i] = binary.BigEndian.Uint16(raw[8+i*2 : 10+i*2])
}
var sb strings.Builder
if sign == 0x4000 {
sb.WriteByte('-')
}
// Integer part: weight+1 base-10000 digits. If weight is
// negative the integer part is just "0".
intDigits := int(weight) + 1
if intDigits <= 0 {
sb.WriteByte('0')
} else {
for i := 0; i < intDigits; i++ {
var d uint16
if i < int(ndigits) {
d = digs[i]
}
if i == 0 {
fmt.Fprintf(&sb, "%d", d)
} else {
fmt.Fprintf(&sb, "%04d", d)
}
}
}
if dscale > 0 {
sb.WriteByte('.')
// Build the fractional digit string. When weight < -1, the
// first array digit is already several base-10000 positions
// past the decimal point — pad with "0000" groups for those
// missing leading-zero positions.
var frac strings.Builder
leadingZeroGroups := 0
if intDigits < 0 {
leadingZeroGroups = -intDigits
}
for i := 0; i < leadingZeroGroups; i++ {
frac.WriteString("0000")
}
fracStart := intDigits
if fracStart < 0 {
fracStart = 0
}
for i := fracStart; i < int(ndigits); i++ {
fmt.Fprintf(&frac, "%04d", digs[i])
}
s := frac.String()
if len(s) >= int(dscale) {
sb.WriteString(s[:dscale])
} else {
sb.WriteString(s)
sb.WriteString(strings.Repeat("0", int(dscale)-len(s)))
}
}
return sb.String(), nil
}
// decodeBinaryDate converts a PG binary DATE (4-byte signed days
// since pgEpoch = 2000-01-01) to "YYYY-MM-DD".
func decodeBinaryDate(raw []byte) (string, error) {
if len(raw) != 4 {
return "", fmt.Errorf("date param: want 4 bytes, got %d", len(raw))
}
days := int32(binary.BigEndian.Uint32(raw))
t := pgEpoch.AddDate(0, 0, int(days))
return t.Format("2006-01-02"), nil
}
// decodeBinaryTimestamp converts a PG binary TIMESTAMP / TIMESTAMPTZ
// (8-byte signed microseconds since pgEpoch = 2000-01-01) to
// "YYYY-MM-DD HH:MM:SS.ffffff". Encoding assumes integer_datetimes
// = on; we advertise that in ParameterStatus on connect so clients
// won't send the floating-point variant.
func decodeBinaryTimestamp(raw []byte) (string, error) {
if len(raw) != 8 {
return "", fmt.Errorf("timestamp param: want 8 bytes, got %d", len(raw))
}
us := int64(binary.BigEndian.Uint64(raw))
t := pgEpoch.Add(time.Duration(us) * time.Microsecond)
return t.Format("2006-01-02 15:04:05.000000"), nil
}