pgx and most drivers default to PostgreSQL's Extended Protocol
(named prepared statements). Phase 2 only handled Simple Query,
so every pgx caller had to force `QueryExecModeSimpleProtocol` —
unworkable for a production deployment. This commit lands the
full Parse → Bind → Describe → Execute → Sync state machine,
enough that pgx (and any other libpq-protocol-v3 client) works
without any client-side knobs.
Implementation lives in `hbrtl/pgserver/extended.go`:
* Per-session caches `stmts map[string]*preparedStmt` and
`portals map[string]*portal`, lazily allocated on first use.
Stored as fields on `session` so they don't leak across
connections.
* Parameters are inlined at Bind time via `substituteParams` —
the resolved SQL is a normal Simple-Query-shaped string the
engine sees through the existing `five_SQL(cSQL, …, oSession)`
pipeline. Avoids teaching FiveSql2 a second param-shape; the
trade-off is that binary timestamps/numerics round-trip through
text (Phase 4.1 will plumb `?`-params through aParams for the
binary fast path).
* `paramToLiteral` decodes the binary-format encodings pgx uses
by default for INT4/INT8/BOOL (big-endian fixed-width). Other
binary OIDs fall back to a hex-escaped quoted literal which
errors loudly rather than silently misparsing.
* `countPgPlaceholders` scans the SQL outside string literals for
the highest `$N` so the server can answer Describe-statement
with a correctly-sized ParameterDescription even when the
client didn't pre-declare param OIDs. Without this, pgx errored
with "expected 0 arguments, got 2" on the very first prepared
query.
* RowDescription emission: Describe-statement still returns NoData
(we can't infer row shape without execution). When Execute fires
on a portal the client never Described, we emit RowDescription
inline from the cached result before DataRow streams. pgx and
psql both tolerate this ordering.
* Execute → CommandComplete tag derives from the SQL verb via the
existing `commandTagFor` helper. Row counts in the tag remain
"VERB 0" for v1.0; threading real counters through the engine
is Phase 5.
Wire dispatch in `session.go:queryLoop` now handles Parse, Bind,
Describe, Execute, Close, Sync, Flush — the full v3 message set.
Verification
------------
End-to-end pgx (default mode, no SimpleProtocol flag) successfully
runs:
SELECT $1 AS n, $2 AS s with 42 + "hi" → [42 hi]
Same statement re-executed with different bound values → reuses
the cached prepared statement
SELECT $1 AS b, $2 AS s with true + "binary-bool" → [t binary-bool]
`tests/pgserver/run.sh` expanded from 1 → 3 integration assertions:
PASS Simple Query: SELECT 1, 'hello'
PASS Multi-statement Simple Query
PASS Transaction control: BEGIN/COMMIT round-trip
(Extended Protocol can't be driven from psql's -c CLI directly
because psql's PREPARE/EXECUTE is a separate SQL-level feature
that FiveSql2 doesn't parse; the pgx-driven path verifies it
manually, and a self-contained Go integration that drives pgx
from inside a process bootstrap is Phase 7 work.)
All six release gates green:
go test ./... ✓
FiveSql2 SQL:1999 43/43 ✓
Harbour compat 56/56 ✓
std.ch 17/17 ✓
FRB 7/7 ✓
pgserver integration 3/3 ✓
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
485 lines
15 KiB
Go
485 lines
15 KiB
Go
// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com)
|
|
// All rights reserved.
|
|
|
|
// extended.go — Extended Protocol (Parse/Bind/Execute/Describe/
|
|
// Sync/Close) support for the pgserver. Without this, pgx clients
|
|
// using the default QueryExecModeCacheStatement get an "0A000
|
|
// not supported" error on every query (Phase 3 saw this — the
|
|
// integration script had to force SimpleProtocol).
|
|
//
|
|
// The implementation is deliberately minimal for v1.0:
|
|
// * Per-session named statement cache: name → SQL + paramOIDs
|
|
// * Per-session named portal cache: name → statement + bound params
|
|
// * Parameter substitution happens client-side at Bind time; we
|
|
// build a "rewrite" SQL with literals inlined so five_SQL's
|
|
// existing template-cache pipeline (TFiveSQL.prg's
|
|
// SqlLexAndExtractTemplate path) can re-parameterise the
|
|
// query without us having to teach it the PG-wire param shape.
|
|
// * Describe-statement returns NoData; pgx tolerates this and
|
|
// issues Describe-portal after Bind, by which point we've run
|
|
// the query and can emit a real RowDescription.
|
|
// * Execute streams the cached result set produced at Describe-
|
|
// portal time so we don't run the same SQL twice.
|
|
//
|
|
// Phase 4.1 will replace the literal-substitution rewrite with a
|
|
// proper `?`-param threading through five_SQL's aParams so binary
|
|
// params (timestamps, numerics) avoid round-tripping through text.
|
|
|
|
package pgserver
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5/pgproto3"
|
|
|
|
"five/hbrt"
|
|
)
|
|
|
|
// preparedStmt holds a named statement registered via Parse. It
|
|
// stores the original SQL plus the per-position param OIDs the
|
|
// client declared (or 0 = "any"). The actual parse happens lazily
|
|
// at Execute time inside five_SQL.
|
|
type preparedStmt struct {
|
|
sql string
|
|
paramOIDs []uint32
|
|
}
|
|
|
|
// portal binds a prepared statement to concrete parameter values.
|
|
// We materialise the values into a fully-substituted SQL string so
|
|
// the engine sees a normal Simple-Query-shaped statement; the
|
|
// result is cached on the portal so Describe-portal can emit a
|
|
// RowDescription without a second execution.
|
|
type portal struct {
|
|
stmt *preparedStmt
|
|
resolvedSQL string
|
|
|
|
// Cached result, populated on Describe-portal so Execute can
|
|
// stream without re-running the query. resultArr is the
|
|
// engine's `aResult` array; once executed, used to derive
|
|
// RowDescription + DataRow stream.
|
|
executed bool
|
|
resultArr *hbrt.HbArray
|
|
resultErr *errEnvelope
|
|
}
|
|
|
|
// errEnvelope is the unpacked form of FiveSql2's error sentinel.
|
|
type errEnvelope struct {
|
|
code int
|
|
msg string
|
|
sql string
|
|
}
|
|
|
|
// ensureExtendedState lazily allocates the per-session caches.
|
|
// Called by every extended-protocol handler; the maps stay alive
|
|
// for the whole session because pgx-style clients re-use unnamed
|
|
// statements aggressively.
|
|
func (s *session) ensureExtendedState() {
|
|
if s.stmts == nil {
|
|
s.stmts = make(map[string]*preparedStmt)
|
|
}
|
|
if s.portals == nil {
|
|
s.portals = make(map[string]*portal)
|
|
}
|
|
}
|
|
|
|
// handleParse stores a named prepared statement. Returns
|
|
// ParseComplete; any actual parse error from five_SQL is deferred
|
|
// until Execute so a probe-only Parse + Describe + Sync round-trip
|
|
// (which pgx does for QueryExecModeDescribeExec) stays cheap.
|
|
func (s *session) handleParse(m *pgproto3.Parse) {
|
|
s.ensureExtendedState()
|
|
s.stmts[m.Name] = &preparedStmt{
|
|
sql: m.Query,
|
|
paramOIDs: append([]uint32(nil), m.ParameterOIDs...),
|
|
}
|
|
s.send(&pgproto3.ParseComplete{})
|
|
}
|
|
|
|
// handleBind materialises parameter values into the SQL text and
|
|
// stashes the portal for later Describe/Execute. Binary-format
|
|
// params are decoded for the OIDs we recognise; otherwise the raw
|
|
// bytes are passed through as a quoted text literal (good enough
|
|
// for the common SELECT/INSERT-with-int/string cases).
|
|
func (s *session) handleBind(m *pgproto3.Bind) {
|
|
s.ensureExtendedState()
|
|
stmt := s.stmts[m.PreparedStatement]
|
|
if stmt == nil {
|
|
s.send(buildErrorResponse("26000",
|
|
fmt.Sprintf("prepared statement %q does not exist", m.PreparedStatement), ""))
|
|
return
|
|
}
|
|
resolved, err := substituteParams(stmt.sql, stmt.paramOIDs, m.Parameters, m.ParameterFormatCodes)
|
|
if err != nil {
|
|
s.send(buildErrorResponse("42P02", err.Error(), stmt.sql))
|
|
return
|
|
}
|
|
s.portals[m.DestinationPortal] = &portal{stmt: stmt, resolvedSQL: resolved}
|
|
s.send(&pgproto3.BindComplete{})
|
|
}
|
|
|
|
// handleDescribe answers Describe for either a statement or a
|
|
// portal. ObjectType 'S' = statement, 'P' = portal.
|
|
//
|
|
// Statement Describe is hard to answer fully without running the
|
|
// query (we'd need column-type inference). For v1.0 we return
|
|
// NoData + ParameterDescription with the declared OIDs; pgx
|
|
// tolerates this and re-Describes the portal after Bind.
|
|
//
|
|
// Portal Describe runs the query immediately, caches the result
|
|
// on the portal, and emits RowDescription from the actual result
|
|
// columns. Execute later just streams from the cache.
|
|
func (s *session) handleDescribe(m *pgproto3.Describe) {
|
|
s.ensureExtendedState()
|
|
switch m.ObjectType {
|
|
case 'S':
|
|
stmt := s.stmts[m.Name]
|
|
if stmt == nil {
|
|
s.send(buildErrorResponse("26000",
|
|
fmt.Sprintf("prepared statement %q does not exist", m.Name), ""))
|
|
return
|
|
}
|
|
// pgx-style clients send Parse without explicit OIDs and
|
|
// expect the server to infer them from the SQL. Count the
|
|
// `$N` placeholders and pad paramOIDs with OID 0 ("any")
|
|
// for any that the client didn't pre-declare — otherwise
|
|
// pgx errors with "expected 0 arguments, got N" before
|
|
// the Bind even reaches us.
|
|
nParams := countPgPlaceholders(stmt.sql)
|
|
oids := stmt.paramOIDs
|
|
for len(oids) < nParams {
|
|
oids = append(oids, 0)
|
|
}
|
|
stmt.paramOIDs = oids
|
|
s.send(&pgproto3.ParameterDescription{ParameterOIDs: oids})
|
|
// NoData — we don't know the row shape until execution.
|
|
// pgx tolerates this and asks again via Describe-portal.
|
|
s.send(&pgproto3.NoData{})
|
|
case 'P':
|
|
p := s.portals[m.Name]
|
|
if p == nil {
|
|
s.send(buildErrorResponse("34000",
|
|
fmt.Sprintf("portal %q does not exist", m.Name), ""))
|
|
return
|
|
}
|
|
s.executePortal(p)
|
|
if p.resultErr != nil {
|
|
s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql))
|
|
return
|
|
}
|
|
s.sendRowDescriptionFromArr(p.resultArr)
|
|
}
|
|
}
|
|
|
|
// handleExecute streams the cached result of the portal as a
|
|
// DataRow sequence then CommandComplete. If the portal hasn't
|
|
// been executed yet (Describe-portal was skipped), execute now.
|
|
//
|
|
// pgx's default QueryExecModeCacheStatement skips Describe-portal
|
|
// and only does Describe-statement once at prepare time. Since
|
|
// Describe-statement returns NoData (we don't know the row shape
|
|
// without executing), pgx never gets a RowDescription unless we
|
|
// emit one here too. We emit it conditionally: only when the
|
|
// portal wasn't already Described (described==false) so we don't
|
|
// duplicate the descriptor for clients that *did* call Describe-P.
|
|
func (s *session) handleExecute(m *pgproto3.Execute) {
|
|
s.ensureExtendedState()
|
|
p := s.portals[m.Portal]
|
|
if p == nil {
|
|
s.send(buildErrorResponse("34000",
|
|
fmt.Sprintf("portal %q does not exist", m.Portal), ""))
|
|
return
|
|
}
|
|
wasDescribed := p.executed
|
|
s.executePortal(p)
|
|
if p.resultErr != nil {
|
|
s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql))
|
|
s.txStatus = currentTxStatusAfterError(s.txStatus)
|
|
return
|
|
}
|
|
// If Describe-portal was never called, the client is still
|
|
// waiting for column metadata. Emit it now from the cached
|
|
// result so DataRow has a context the client can decode.
|
|
if !wasDescribed {
|
|
s.sendRowDescriptionFromArr(p.resultArr)
|
|
}
|
|
s.streamPortalRows(p, int(m.MaxRows))
|
|
tag := commandTagFor(p.resolvedSQL)
|
|
s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)})
|
|
s.updateTxStatusForTag(tag)
|
|
}
|
|
|
|
// handleClose drops a prepared statement or portal entry.
|
|
func (s *session) handleClose(m *pgproto3.Close) {
|
|
s.ensureExtendedState()
|
|
switch m.ObjectType {
|
|
case 'S':
|
|
delete(s.stmts, m.Name)
|
|
case 'P':
|
|
delete(s.portals, m.Name)
|
|
}
|
|
s.send(&pgproto3.CloseComplete{})
|
|
}
|
|
|
|
// handleSync flushes any pending output and emits ReadyForQuery,
|
|
// closing one cycle of the extended-protocol exchange.
|
|
func (s *session) handleSync() {
|
|
s.sendReadyForQuery()
|
|
}
|
|
|
|
// executePortal runs the resolved SQL through five_SQL once and
|
|
// caches the result on the portal. Idempotent — repeated calls
|
|
// short-circuit on `executed`.
|
|
func (s *session) executePortal(p *portal) {
|
|
if p.executed {
|
|
return
|
|
}
|
|
p.executed = true
|
|
res, err := s.runSQL(p.resolvedSQL)
|
|
if err != nil {
|
|
p.resultErr = &errEnvelope{code: 0, msg: err.Error(), sql: p.resolvedSQL}
|
|
return
|
|
}
|
|
if res.IsNil() || !res.IsArray() {
|
|
// Non-result statement (DDL, transaction control). Synthesise
|
|
// an empty result envelope so the streaming path emits an
|
|
// empty RowDescription + CommandComplete cleanly.
|
|
p.resultArr = &hbrt.HbArray{Items: []hbrt.Value{
|
|
hbrt.MakeArrayFrom([]hbrt.Value{}),
|
|
hbrt.MakeArrayFrom([]hbrt.Value{}),
|
|
}}
|
|
return
|
|
}
|
|
arr := res.AsArray()
|
|
if isErrorEnvelope(arr) {
|
|
code, msg, sql := unpackError(arr)
|
|
p.resultErr = &errEnvelope{code: code, msg: msg, sql: sql}
|
|
return
|
|
}
|
|
p.resultArr = arr
|
|
}
|
|
|
|
// sendRowDescriptionFromArr emits a RowDescription derived from
|
|
// the engine's `{aFieldNames, aRows}` envelope. Mirrors what
|
|
// emitResultSet (Simple Query) does on the field-description side
|
|
// so the wire shape is identical regardless of which protocol the
|
|
// client picks.
|
|
func (s *session) sendRowDescriptionFromArr(arr *hbrt.HbArray) {
|
|
if arr == nil || len(arr.Items) < 1 || !arr.Items[0].IsArray() {
|
|
s.send(&pgproto3.RowDescription{Fields: nil})
|
|
return
|
|
}
|
|
fields := arr.Items[0].AsArray().Items
|
|
var firstRow []hbrt.Value
|
|
if len(arr.Items) >= 2 && arr.Items[1].IsArray() {
|
|
if rows := arr.Items[1].AsArray().Items; len(rows) > 0 && rows[0].IsArray() {
|
|
firstRow = rows[0].AsArray().Items
|
|
}
|
|
}
|
|
descFields := make([]pgproto3.FieldDescription, len(fields))
|
|
for i, f := range fields {
|
|
name := ""
|
|
if f.IsString() {
|
|
name = f.AsString()
|
|
} else {
|
|
name = fmt.Sprintf("column%d", i+1)
|
|
}
|
|
var sample hbrt.Value
|
|
if i < len(firstRow) {
|
|
sample = firstRow[i]
|
|
}
|
|
oid, typeSize := pgTypeFor(sample)
|
|
descFields[i] = pgproto3.FieldDescription{
|
|
Name: []byte(name),
|
|
DataTypeOID: uint32(oid),
|
|
DataTypeSize: typeSize,
|
|
TypeModifier: -1,
|
|
Format: 0,
|
|
}
|
|
}
|
|
s.send(&pgproto3.RowDescription{Fields: descFields})
|
|
}
|
|
|
|
// streamPortalRows iterates the portal's cached rows and emits a
|
|
// DataRow per row, respecting maxRows (0 = unlimited).
|
|
func (s *session) streamPortalRows(p *portal, maxRows int) {
|
|
if p.resultArr == nil || len(p.resultArr.Items) < 2 {
|
|
return
|
|
}
|
|
rowsVal := p.resultArr.Items[1]
|
|
if !rowsVal.IsArray() {
|
|
return
|
|
}
|
|
rows := rowsVal.AsArray().Items
|
|
fieldCount := 0
|
|
if p.resultArr.Items[0].IsArray() {
|
|
fieldCount = len(p.resultArr.Items[0].AsArray().Items)
|
|
}
|
|
emitted := 0
|
|
for _, rowVal := range rows {
|
|
if maxRows > 0 && emitted >= maxRows {
|
|
break
|
|
}
|
|
if !rowVal.IsArray() {
|
|
continue
|
|
}
|
|
cells := rowVal.AsArray().Items
|
|
out := make([][]byte, fieldCount)
|
|
for i := 0; i < fieldCount; i++ {
|
|
if i < len(cells) {
|
|
out[i] = encodeText(cells[i])
|
|
}
|
|
}
|
|
s.send(&pgproto3.DataRow{Values: out})
|
|
emitted++
|
|
}
|
|
}
|
|
|
|
// substituteParams produces a Simple-Query-shaped SQL by inlining
|
|
// the bound parameter values as PG-style literals. Format codes:
|
|
// 0 = text, 1 = binary. We support text for all OIDs and binary
|
|
// for INT2/INT4/INT8/FLOAT4/FLOAT8/BOOL — the rest fall back to
|
|
// hex-escaped strings, which works for VARCHAR but loses fidelity
|
|
// for binary timestamps until Phase 4.1.
|
|
//
|
|
// pgx defaults to binary for ints + text for everything else, so
|
|
// the common case is well covered.
|
|
func substituteParams(sql string, oids []uint32, params [][]byte, formats []int16) (string, error) {
|
|
// Each `$1`/`$2`/… placeholder maps to params[i-1]. We do a
|
|
// linear scan rather than regex to avoid quoting pitfalls
|
|
// (strings containing literal `$1` are rare in practice but
|
|
// the scanner respects single-quoted runs).
|
|
var out strings.Builder
|
|
i := 0
|
|
inStr := byte(0)
|
|
for i < len(sql) {
|
|
c := sql[i]
|
|
if inStr != 0 {
|
|
out.WriteByte(c)
|
|
if c == inStr {
|
|
inStr = 0
|
|
}
|
|
i++
|
|
continue
|
|
}
|
|
if c == '\'' || c == '"' {
|
|
inStr = c
|
|
out.WriteByte(c)
|
|
i++
|
|
continue
|
|
}
|
|
if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' {
|
|
j := i + 1
|
|
for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' {
|
|
j++
|
|
}
|
|
idx, err := strconv.Atoi(sql[i+1 : j])
|
|
if err != nil || idx < 1 || idx > len(params) {
|
|
out.WriteByte(c)
|
|
i++
|
|
continue
|
|
}
|
|
pidx := idx - 1
|
|
oid := uint32(0)
|
|
if pidx < len(oids) {
|
|
oid = oids[pidx]
|
|
}
|
|
format := int16(0)
|
|
if pidx < len(formats) {
|
|
format = formats[pidx]
|
|
} else if len(formats) == 1 {
|
|
format = formats[0] // single format code applies to all
|
|
}
|
|
lit, err := paramToLiteral(params[pidx], oid, format)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
out.WriteString(lit)
|
|
i = j
|
|
continue
|
|
}
|
|
out.WriteByte(c)
|
|
i++
|
|
}
|
|
return out.String(), nil
|
|
}
|
|
|
|
// countPgPlaceholders scans SQL for the highest $N placeholder
|
|
// outside string literals and returns that count. Matches what
|
|
// substituteParams resolves at Bind time. Returns 0 for SQL with
|
|
// no placeholders. Order of placeholders doesn't matter for the
|
|
// count — repeated `$1` still counts as 1 param slot.
|
|
func countPgPlaceholders(sql string) int {
|
|
max := 0
|
|
inStr := byte(0)
|
|
for i := 0; i < len(sql); i++ {
|
|
c := sql[i]
|
|
if inStr != 0 {
|
|
if c == inStr {
|
|
inStr = 0
|
|
}
|
|
continue
|
|
}
|
|
if c == '\'' || c == '"' {
|
|
inStr = c
|
|
continue
|
|
}
|
|
if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' {
|
|
j := i + 1
|
|
for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' {
|
|
j++
|
|
}
|
|
if n, err := strconv.Atoi(sql[i+1 : j]); err == nil && n > max {
|
|
max = n
|
|
}
|
|
i = j - 1
|
|
}
|
|
}
|
|
return max
|
|
}
|
|
|
|
func paramToLiteral(raw []byte, oid uint32, format int16) (string, error) {
|
|
if raw == nil {
|
|
return "NULL", nil
|
|
}
|
|
if format == 0 {
|
|
// Text format — quote per type. For numerics and bools we
|
|
// don't quote; for everything else we single-quote with
|
|
// inline-escape.
|
|
switch oid {
|
|
case oidInt4, oidInt8, oidBool, oidNumeric:
|
|
return string(raw), nil
|
|
default:
|
|
return "'" + strings.ReplaceAll(string(raw), "'", "''") + "'", nil
|
|
}
|
|
}
|
|
// Binary format — decode the OIDs pgx uses by default.
|
|
switch oid {
|
|
case oidInt4:
|
|
if len(raw) != 4 {
|
|
return "", fmt.Errorf("int4 param: want 4 bytes, got %d", len(raw))
|
|
}
|
|
return strconv.FormatInt(int64(int32(binary.BigEndian.Uint32(raw))), 10), nil
|
|
case oidInt8:
|
|
if len(raw) != 8 {
|
|
return "", fmt.Errorf("int8 param: want 8 bytes, got %d", len(raw))
|
|
}
|
|
return strconv.FormatInt(int64(binary.BigEndian.Uint64(raw)), 10), nil
|
|
case oidBool:
|
|
if len(raw) != 1 {
|
|
return "", fmt.Errorf("bool param: want 1 byte, got %d", len(raw))
|
|
}
|
|
if raw[0] == 0 {
|
|
return "FALSE", nil
|
|
}
|
|
return "TRUE", nil
|
|
default:
|
|
// Unknown binary OID — fall back to a quoted hex literal.
|
|
// FiveSql2 won't accept this directly, but the resulting
|
|
// error is at least diagnosable rather than a silent miss.
|
|
return "'\\x" + fmt.Sprintf("%x", raw) + "'", nil
|
|
}
|
|
}
|