Files
five/hbrtl/pgserver/extended.go
CharlesKWON ed1aeeb212 feat(pgserver): pg_catalog stub for BI-tool connection compatibility
PostgreSQL clients (psql, pgx, DBeaver, Tableau, DataGrip,
pgAdmin) fire a barrage of catalog probes at connection time —
SELECT version(), SHOW server_version, SELECT FROM pg_namespace
/ pg_class / pg_type / pg_database / pg_settings. FiveSql2 can't
parse most of them. Without interception the BI tool either
errors out on connect or proceeds with a half-broken view of
the database (zero tables, no type info, no schema list). This
commit lands the minimum-viable catalog shim so the common
connect-and-list-tables flow succeeds.

Strategy
--------

Pattern-match catalog probes BEFORE handing the SQL to five_SQL.
Recognised shapes get synthesised result envelopes — same
`{ aFieldNames, aRows }` hbrt.Value shape the engine returns,
so the existing dispatchSimpleQuery / executePortal pipelines
stream them identically to a normal query.

Covered (v1.0)
--------------

  * SET / RESET / DISCARD <name>           → success, no-op
  * SHOW <name>                            → single-row response
                                             (server_version, server_encoding,
                                              client_encoding, DateStyle,
                                              transaction_isolation, etc.)
  * SELECT version() / current_database() / current_schema() /
    current_user / session_user / pg_backend_pid()  → single-row
  * SELECT … FROM pg_namespace             → 2 rows (pg_catalog + public)
  * SELECT … FROM pg_class                 → list of open workareas
                                             (relkind='r', relnamespace=public)
  * SELECT … FROM pg_attribute             → empty (stub; column-shape
                                             introspection deferred to v1.1)
  * SELECT … FROM pg_type                  → 7 OIDs FiveSql2 actually emits
                                             (bool, int4, int8, text, numeric,
                                              date, timestamp)
  * SELECT … FROM pg_database              → 1 row, the connect-time db name
  * SELECT … FROM pg_settings              → name/setting pairs matching SHOW
  * Anything else mentioning pg_catalog. / pg_<name> / information_schema.
    → empty result with generic field names (BI tool sees "0 rows" rather
    than a parse error)

Deliberate non-goals
--------------------

  * WHERE / JOIN evaluation — psql, pgx, DBeaver all filter
    client-side on the rows we return. We send the whole
    catalog and let them apply their predicates.
  * pg_attribute introspection — would need to re-derive
    column types from the open workarea + map back to PG OIDs.
    Tracked as v1.1 work.
  * Recursive CTE catalog queries (pgAdmin's tree builder uses
    them) — too brittle to pattern-match. Falls through to
    five_SQL where it errors loudly. pgAdmin's table-tree pane
    will then show "0 tables" but the connection itself stays
    alive.

Files
-----

  hbrtl/pgserver/catalog.go  (new, ~280 LOC)
    catalogIntercept(sql) → (handled, value)
    synthPgNamespace / synthPgClass / synthPgAttribute /
    synthPgType / synthPgDatabase / synthPgSettings
    simpleSelectFunction (version/current_*/pg_backend_pid)
    showResponse (SHOW <name>)

  hbrtl/pgserver/dispatch.go
    dispatchSimpleQuery: catalogIntercept ahead of runSQL.

  hbrtl/pgserver/extended.go
    executePortal: same intercept, ahead of runSQL.

Verification
------------

psql against a running pgserver, with sslmode=require + MD5:

  $ psql -c 'SELECT version()' -At
  PostgreSQL 14.0 (FiveSql2) (FiveSql2 wire-compat shim)

  $ psql -c 'SELECT * FROM pg_namespace' -At
  11|pg_catalog|10
  2200|public|10

  $ psql -c 'SELECT * FROM pg_type' -At
  16|bool|1
  23|int4|4
  20|int8|8
  25|text|-1
  1700|numeric|-1
  1082|date|4
  1114|timestamp|8

  $ psql -l    # \\l now works
          데이터베이스 목록
   oid | datname | datdba | 인코딩
  -----+---------+--------+--------
     1 | alice   |     10 |      6

Integration script gates grew from 6/6 → 9/9:
  PASS  Catalog probe: SELECT version()
  PASS  Catalog probe: pg_namespace lists public + pg_catalog
  PASS  Catalog probe: SHOW server_version_num

All six release gates green:
  go test ./...               ✓
  FiveSql2 SQL:1999 43/43     ✓
  Harbour compat 56/56        ✓
  std.ch 17/17                ✓
  FRB 7/7                     ✓
  pgserver integration 9/9    ✓ (+3 from catalog stubs)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-21 22:31:52 +09:00

491 lines
15 KiB
Go

// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com)
// All rights reserved.
// extended.go — Extended Protocol (Parse/Bind/Execute/Describe/
// Sync/Close) support for the pgserver. Without this, pgx clients
// using the default QueryExecModeCacheStatement get an "0A000
// not supported" error on every query (Phase 3 saw this — the
// integration script had to force SimpleProtocol).
//
// The implementation is deliberately minimal for v1.0:
// * Per-session named statement cache: name → SQL + paramOIDs
// * Per-session named portal cache: name → statement + bound params
// * Parameter substitution happens client-side at Bind time; we
// build a "rewrite" SQL with literals inlined so five_SQL's
// existing template-cache pipeline (TFiveSQL.prg's
// SqlLexAndExtractTemplate path) can re-parameterise the
// query without us having to teach it the PG-wire param shape.
// * Describe-statement returns NoData; pgx tolerates this and
// issues Describe-portal after Bind, by which point we've run
// the query and can emit a real RowDescription.
// * Execute streams the cached result set produced at Describe-
// portal time so we don't run the same SQL twice.
//
// Phase 4.1 will replace the literal-substitution rewrite with a
// proper `?`-param threading through five_SQL's aParams so binary
// params (timestamps, numerics) avoid round-tripping through text.
package pgserver
import (
"encoding/binary"
"fmt"
"strconv"
"strings"
"github.com/jackc/pgx/v5/pgproto3"
"five/hbrt"
)
// preparedStmt holds a named statement registered via Parse. It
// stores the original SQL plus the per-position param OIDs the
// client declared (or 0 = "any"). The actual parse happens lazily
// at Execute time inside five_SQL.
type preparedStmt struct {
sql string
paramOIDs []uint32
}
// portal binds a prepared statement to concrete parameter values.
// We materialise the values into a fully-substituted SQL string so
// the engine sees a normal Simple-Query-shaped statement; the
// result is cached on the portal so Describe-portal can emit a
// RowDescription without a second execution.
type portal struct {
stmt *preparedStmt
resolvedSQL string
// Cached result, populated on Describe-portal so Execute can
// stream without re-running the query. resultArr is the
// engine's `aResult` array; once executed, used to derive
// RowDescription + DataRow stream.
executed bool
resultArr *hbrt.HbArray
resultErr *errEnvelope
}
// errEnvelope is the unpacked form of FiveSql2's error sentinel.
type errEnvelope struct {
code int
msg string
sql string
}
// ensureExtendedState lazily allocates the per-session caches.
// Called by every extended-protocol handler; the maps stay alive
// for the whole session because pgx-style clients re-use unnamed
// statements aggressively.
func (s *session) ensureExtendedState() {
if s.stmts == nil {
s.stmts = make(map[string]*preparedStmt)
}
if s.portals == nil {
s.portals = make(map[string]*portal)
}
}
// handleParse stores a named prepared statement. Returns
// ParseComplete; any actual parse error from five_SQL is deferred
// until Execute so a probe-only Parse + Describe + Sync round-trip
// (which pgx does for QueryExecModeDescribeExec) stays cheap.
func (s *session) handleParse(m *pgproto3.Parse) {
s.ensureExtendedState()
s.stmts[m.Name] = &preparedStmt{
sql: m.Query,
paramOIDs: append([]uint32(nil), m.ParameterOIDs...),
}
s.send(&pgproto3.ParseComplete{})
}
// handleBind materialises parameter values into the SQL text and
// stashes the portal for later Describe/Execute. Binary-format
// params are decoded for the OIDs we recognise; otherwise the raw
// bytes are passed through as a quoted text literal (good enough
// for the common SELECT/INSERT-with-int/string cases).
func (s *session) handleBind(m *pgproto3.Bind) {
s.ensureExtendedState()
stmt := s.stmts[m.PreparedStatement]
if stmt == nil {
s.send(buildErrorResponse("26000",
fmt.Sprintf("prepared statement %q does not exist", m.PreparedStatement), ""))
return
}
resolved, err := substituteParams(stmt.sql, stmt.paramOIDs, m.Parameters, m.ParameterFormatCodes)
if err != nil {
s.send(buildErrorResponse("42P02", err.Error(), stmt.sql))
return
}
s.portals[m.DestinationPortal] = &portal{stmt: stmt, resolvedSQL: resolved}
s.send(&pgproto3.BindComplete{})
}
// handleDescribe answers Describe for either a statement or a
// portal. ObjectType 'S' = statement, 'P' = portal.
//
// Statement Describe is hard to answer fully without running the
// query (we'd need column-type inference). For v1.0 we return
// NoData + ParameterDescription with the declared OIDs; pgx
// tolerates this and re-Describes the portal after Bind.
//
// Portal Describe runs the query immediately, caches the result
// on the portal, and emits RowDescription from the actual result
// columns. Execute later just streams from the cache.
func (s *session) handleDescribe(m *pgproto3.Describe) {
s.ensureExtendedState()
switch m.ObjectType {
case 'S':
stmt := s.stmts[m.Name]
if stmt == nil {
s.send(buildErrorResponse("26000",
fmt.Sprintf("prepared statement %q does not exist", m.Name), ""))
return
}
// pgx-style clients send Parse without explicit OIDs and
// expect the server to infer them from the SQL. Count the
// `$N` placeholders and pad paramOIDs with OID 0 ("any")
// for any that the client didn't pre-declare — otherwise
// pgx errors with "expected 0 arguments, got N" before
// the Bind even reaches us.
nParams := countPgPlaceholders(stmt.sql)
oids := stmt.paramOIDs
for len(oids) < nParams {
oids = append(oids, 0)
}
stmt.paramOIDs = oids
s.send(&pgproto3.ParameterDescription{ParameterOIDs: oids})
// NoData — we don't know the row shape until execution.
// pgx tolerates this and asks again via Describe-portal.
s.send(&pgproto3.NoData{})
case 'P':
p := s.portals[m.Name]
if p == nil {
s.send(buildErrorResponse("34000",
fmt.Sprintf("portal %q does not exist", m.Name), ""))
return
}
s.executePortal(p)
if p.resultErr != nil {
s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql))
return
}
s.sendRowDescriptionFromArr(p.resultArr)
}
}
// handleExecute streams the cached result of the portal as a
// DataRow sequence then CommandComplete. If the portal hasn't
// been executed yet (Describe-portal was skipped), execute now.
//
// pgx's default QueryExecModeCacheStatement skips Describe-portal
// and only does Describe-statement once at prepare time. Since
// Describe-statement returns NoData (we don't know the row shape
// without executing), pgx never gets a RowDescription unless we
// emit one here too. We emit it conditionally: only when the
// portal wasn't already Described (described==false) so we don't
// duplicate the descriptor for clients that *did* call Describe-P.
func (s *session) handleExecute(m *pgproto3.Execute) {
s.ensureExtendedState()
p := s.portals[m.Portal]
if p == nil {
s.send(buildErrorResponse("34000",
fmt.Sprintf("portal %q does not exist", m.Portal), ""))
return
}
wasDescribed := p.executed
s.executePortal(p)
if p.resultErr != nil {
s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql))
s.txStatus = currentTxStatusAfterError(s.txStatus)
return
}
// If Describe-portal was never called, the client is still
// waiting for column metadata. Emit it now from the cached
// result so DataRow has a context the client can decode.
if !wasDescribed {
s.sendRowDescriptionFromArr(p.resultArr)
}
s.streamPortalRows(p, int(m.MaxRows))
tag := commandTagFor(p.resolvedSQL)
s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)})
s.updateTxStatusForTag(tag)
}
// handleClose drops a prepared statement or portal entry.
func (s *session) handleClose(m *pgproto3.Close) {
s.ensureExtendedState()
switch m.ObjectType {
case 'S':
delete(s.stmts, m.Name)
case 'P':
delete(s.portals, m.Name)
}
s.send(&pgproto3.CloseComplete{})
}
// handleSync flushes any pending output and emits ReadyForQuery,
// closing one cycle of the extended-protocol exchange.
func (s *session) handleSync() {
s.sendReadyForQuery()
}
// executePortal runs the resolved SQL through five_SQL once and
// caches the result on the portal. Idempotent — repeated calls
// short-circuit on `executed`. pg_catalog probes (BI-tool metadata
// queries) are intercepted before the engine call so DBeaver /
// Tableau / DataGrip can negotiate without erroring out.
func (s *session) executePortal(p *portal) {
if p.executed {
return
}
p.executed = true
if handled, val := s.catalogIntercept(p.resolvedSQL); handled {
p.resultArr = val.AsArray()
return
}
res, err := s.runSQL(p.resolvedSQL)
if err != nil {
p.resultErr = &errEnvelope{code: 0, msg: err.Error(), sql: p.resolvedSQL}
return
}
if res.IsNil() || !res.IsArray() {
// Non-result statement (DDL, transaction control). Synthesise
// an empty result envelope so the streaming path emits an
// empty RowDescription + CommandComplete cleanly.
p.resultArr = &hbrt.HbArray{Items: []hbrt.Value{
hbrt.MakeArrayFrom([]hbrt.Value{}),
hbrt.MakeArrayFrom([]hbrt.Value{}),
}}
return
}
arr := res.AsArray()
if isErrorEnvelope(arr) {
code, msg, sql := unpackError(arr)
p.resultErr = &errEnvelope{code: code, msg: msg, sql: sql}
return
}
p.resultArr = arr
}
// sendRowDescriptionFromArr emits a RowDescription derived from
// the engine's `{aFieldNames, aRows}` envelope. Mirrors what
// emitResultSet (Simple Query) does on the field-description side
// so the wire shape is identical regardless of which protocol the
// client picks.
func (s *session) sendRowDescriptionFromArr(arr *hbrt.HbArray) {
if arr == nil || len(arr.Items) < 1 || !arr.Items[0].IsArray() {
s.send(&pgproto3.RowDescription{Fields: nil})
return
}
fields := arr.Items[0].AsArray().Items
var firstRow []hbrt.Value
if len(arr.Items) >= 2 && arr.Items[1].IsArray() {
if rows := arr.Items[1].AsArray().Items; len(rows) > 0 && rows[0].IsArray() {
firstRow = rows[0].AsArray().Items
}
}
descFields := make([]pgproto3.FieldDescription, len(fields))
for i, f := range fields {
name := ""
if f.IsString() {
name = f.AsString()
} else {
name = fmt.Sprintf("column%d", i+1)
}
var sample hbrt.Value
if i < len(firstRow) {
sample = firstRow[i]
}
oid, typeSize := pgTypeFor(sample)
descFields[i] = pgproto3.FieldDescription{
Name: []byte(name),
DataTypeOID: uint32(oid),
DataTypeSize: typeSize,
TypeModifier: -1,
Format: 0,
}
}
s.send(&pgproto3.RowDescription{Fields: descFields})
}
// streamPortalRows iterates the portal's cached rows and emits a
// DataRow per row, respecting maxRows (0 = unlimited).
func (s *session) streamPortalRows(p *portal, maxRows int) {
if p.resultArr == nil || len(p.resultArr.Items) < 2 {
return
}
rowsVal := p.resultArr.Items[1]
if !rowsVal.IsArray() {
return
}
rows := rowsVal.AsArray().Items
fieldCount := 0
if p.resultArr.Items[0].IsArray() {
fieldCount = len(p.resultArr.Items[0].AsArray().Items)
}
emitted := 0
for _, rowVal := range rows {
if maxRows > 0 && emitted >= maxRows {
break
}
if !rowVal.IsArray() {
continue
}
cells := rowVal.AsArray().Items
out := make([][]byte, fieldCount)
for i := 0; i < fieldCount; i++ {
if i < len(cells) {
out[i] = encodeText(cells[i])
}
}
s.send(&pgproto3.DataRow{Values: out})
emitted++
}
}
// substituteParams produces a Simple-Query-shaped SQL by inlining
// the bound parameter values as PG-style literals. Format codes:
// 0 = text, 1 = binary. We support text for all OIDs and binary
// for INT2/INT4/INT8/FLOAT4/FLOAT8/BOOL — the rest fall back to
// hex-escaped strings, which works for VARCHAR but loses fidelity
// for binary timestamps until Phase 4.1.
//
// pgx defaults to binary for ints + text for everything else, so
// the common case is well covered.
func substituteParams(sql string, oids []uint32, params [][]byte, formats []int16) (string, error) {
// Each `$1`/`$2`/… placeholder maps to params[i-1]. We do a
// linear scan rather than regex to avoid quoting pitfalls
// (strings containing literal `$1` are rare in practice but
// the scanner respects single-quoted runs).
var out strings.Builder
i := 0
inStr := byte(0)
for i < len(sql) {
c := sql[i]
if inStr != 0 {
out.WriteByte(c)
if c == inStr {
inStr = 0
}
i++
continue
}
if c == '\'' || c == '"' {
inStr = c
out.WriteByte(c)
i++
continue
}
if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' {
j := i + 1
for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' {
j++
}
idx, err := strconv.Atoi(sql[i+1 : j])
if err != nil || idx < 1 || idx > len(params) {
out.WriteByte(c)
i++
continue
}
pidx := idx - 1
oid := uint32(0)
if pidx < len(oids) {
oid = oids[pidx]
}
format := int16(0)
if pidx < len(formats) {
format = formats[pidx]
} else if len(formats) == 1 {
format = formats[0] // single format code applies to all
}
lit, err := paramToLiteral(params[pidx], oid, format)
if err != nil {
return "", err
}
out.WriteString(lit)
i = j
continue
}
out.WriteByte(c)
i++
}
return out.String(), nil
}
// countPgPlaceholders scans SQL for the highest $N placeholder
// outside string literals and returns that count. Matches what
// substituteParams resolves at Bind time. Returns 0 for SQL with
// no placeholders. Order of placeholders doesn't matter for the
// count — repeated `$1` still counts as 1 param slot.
func countPgPlaceholders(sql string) int {
max := 0
inStr := byte(0)
for i := 0; i < len(sql); i++ {
c := sql[i]
if inStr != 0 {
if c == inStr {
inStr = 0
}
continue
}
if c == '\'' || c == '"' {
inStr = c
continue
}
if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' {
j := i + 1
for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' {
j++
}
if n, err := strconv.Atoi(sql[i+1 : j]); err == nil && n > max {
max = n
}
i = j - 1
}
}
return max
}
func paramToLiteral(raw []byte, oid uint32, format int16) (string, error) {
if raw == nil {
return "NULL", nil
}
if format == 0 {
// Text format — quote per type. For numerics and bools we
// don't quote; for everything else we single-quote with
// inline-escape.
switch oid {
case oidInt4, oidInt8, oidBool, oidNumeric:
return string(raw), nil
default:
return "'" + strings.ReplaceAll(string(raw), "'", "''") + "'", nil
}
}
// Binary format — decode the OIDs pgx uses by default.
switch oid {
case oidInt4:
if len(raw) != 4 {
return "", fmt.Errorf("int4 param: want 4 bytes, got %d", len(raw))
}
return strconv.FormatInt(int64(int32(binary.BigEndian.Uint32(raw))), 10), nil
case oidInt8:
if len(raw) != 8 {
return "", fmt.Errorf("int8 param: want 8 bytes, got %d", len(raw))
}
return strconv.FormatInt(int64(binary.BigEndian.Uint64(raw)), 10), nil
case oidBool:
if len(raw) != 1 {
return "", fmt.Errorf("bool param: want 1 byte, got %d", len(raw))
}
if raw[0] == 0 {
return "FALSE", nil
}
return "TRUE", nil
default:
// Unknown binary OID — fall back to a quoted hex literal.
// FiveSql2 won't accept this directly, but the resulting
// error is at least diagnosable rather than a silent miss.
return "'\\x" + fmt.Sprintf("%x", raw) + "'", nil
}
}