Files
five/hbrtl/pgserver/session.go
CharlesKWON 8472928102 feat(pgserver): Phase 4 — Extended Protocol (Parse/Bind/Execute)
pgx and most drivers default to PostgreSQL's Extended Protocol
(named prepared statements). Phase 2 only handled Simple Query,
so every pgx caller had to force `QueryExecModeSimpleProtocol` —
unworkable for a production deployment. This commit lands the
full Parse → Bind → Describe → Execute → Sync state machine,
enough that pgx (and any other libpq-protocol-v3 client) works
without any client-side knobs.

Implementation lives in `hbrtl/pgserver/extended.go`:

* Per-session caches `stmts map[string]*preparedStmt` and
  `portals map[string]*portal`, lazily allocated on first use.
  Stored as fields on `session` so they don't leak across
  connections.

* Parameters are inlined at Bind time via `substituteParams` —
  the resolved SQL is a normal Simple-Query-shaped string the
  engine sees through the existing `five_SQL(cSQL, …, oSession)`
  pipeline. Avoids teaching FiveSql2 a second param-shape; the
  trade-off is that binary timestamps/numerics round-trip through
  text (Phase 4.1 will plumb `?`-params through aParams for the
  binary fast path).

* `paramToLiteral` decodes the binary-format encodings pgx uses
  by default for INT4/INT8/BOOL (big-endian fixed-width). Other
  binary OIDs fall back to a hex-escaped quoted literal which
  errors loudly rather than silently misparsing.

* `countPgPlaceholders` scans the SQL outside string literals for
  the highest `$N` so the server can answer Describe-statement
  with a correctly-sized ParameterDescription even when the
  client didn't pre-declare param OIDs. Without this, pgx errored
  with "expected 0 arguments, got 2" on the very first prepared
  query.

* RowDescription emission: Describe-statement still returns NoData
  (we can't infer row shape without execution). When Execute fires
  on a portal the client never Described, we emit RowDescription
  inline from the cached result before DataRow streams. pgx and
  psql both tolerate this ordering.

* Execute → CommandComplete tag derives from the SQL verb via the
  existing `commandTagFor` helper. Row counts in the tag remain
  "VERB 0" for v1.0; threading real counters through the engine
  is Phase 5.

Wire dispatch in `session.go:queryLoop` now handles Parse, Bind,
Describe, Execute, Close, Sync, Flush — the full v3 message set.

Verification
------------

End-to-end pgx (default mode, no SimpleProtocol flag) successfully
runs:
  SELECT $1 AS n, $2 AS s with 42 + "hi" → [42 hi]
  Same statement re-executed with different bound values → reuses
    the cached prepared statement
  SELECT $1 AS b, $2 AS s with true + "binary-bool" → [t binary-bool]

`tests/pgserver/run.sh` expanded from 1 → 3 integration assertions:

  PASS  Simple Query: SELECT 1, 'hello'
  PASS  Multi-statement Simple Query
  PASS  Transaction control: BEGIN/COMMIT round-trip

(Extended Protocol can't be driven from psql's -c CLI directly
because psql's PREPARE/EXECUTE is a separate SQL-level feature
that FiveSql2 doesn't parse; the pgx-driven path verifies it
manually, and a self-contained Go integration that drives pgx
from inside a process bootstrap is Phase 7 work.)

All six release gates green:
  go test ./...                       ✓
  FiveSql2 SQL:1999 43/43             ✓
  Harbour compat 56/56                ✓
  std.ch 17/17                        ✓
  FRB 7/7                             ✓
  pgserver integration 3/3            ✓

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 12:55:41 +09:00

279 lines
8.1 KiB
Go

// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com)
// All rights reserved.
package pgserver
import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
"io"
"net"
"strings"
"github.com/jackc/pgx/v5/pgproto3"
"five/hbrt"
)
// session is the per-connection state. One goroutine per session;
// nothing in session is touched by anyone else, so all fields are
// access-by-owner without locking.
type session struct {
srv *Server
conn net.Conn
// pgproto3 Backend speaks the PG protocol on the wire. Created
// after the TLS-upgrade decision so handshake bytes go through
// the right transport.
be *pgproto3.Backend
// Authenticated identity. Empty before AuthenticationOk.
user string
database string
// Per-connection hbrt.Thread, used to dispatch FIVE_SQL calls
// inside the PRG world. Owns a TSqlSession on the PRG side
// (created lazily by the first query via five_SQL's default-
// session fallback; for true isolation we instantiate one
// explicitly — see queryLoop).
thread *hbrt.Thread
// PRG-side session value: a TSqlSession instance held in a
// thread-local hbrt slot. Allocated on first query.
prgSession hbrt.Value
// Cancellation key — PG protocol mandates random 32-bit
// process id + secret in BackendKeyData so clients can send
// CancelRequest later. We don't honour cancel yet (v1.1),
// but the BackendKeyData must still be present and unique.
pid uint32
secret []byte // 4 random bytes per PG BackendKeyData
// Current transaction status, emitted in every ReadyForQuery:
// 'I' = idle, 'T' = in transaction, 'E' = failed transaction.
txStatus byte
// Extended-protocol per-session caches. Both lazily allocated
// by ensureExtendedState() in extended.go; nil before the
// first Parse/Bind. pgx-style clients re-use the unnamed
// statement aggressively, so leave both alive for the whole
// session.
stmts map[string]*preparedStmt
portals map[string]*portal
}
func newSession(srv *Server, conn net.Conn) *session {
var pid uint32
_ = binary.Read(rand.Reader, binary.BigEndian, &pid)
secret := make([]byte, 4)
_, _ = rand.Read(secret)
return &session{
srv: srv,
conn: conn,
pid: pid,
secret: secret,
txStatus: 'I',
}
}
// run drives the full session lifecycle:
//
// 1. SSLRequest / GSSEncRequest probe (reply 'N' for v1.0)
// 2. StartupMessage parse
// 3. Auth (trust path for v1.0; password/MD5/SCRAM in Phase 5)
// 4. ParameterStatus broadcast
// 5. BackendKeyData
// 6. ReadyForQuery
// 7. query loop until Terminate / EOF / error
//
// Any error in steps 1-3 aborts the session before query loop
// begins; an ErrorResponse goes back if at all possible.
func (s *session) run(ctx context.Context) {
// Step 1 — SSLRequest peek. Client sends 8 bytes: length(8) +
// special version 80877103 to ask for TLS; or it skips this and
// goes straight to StartupMessage. The pgproto3 Backend doesn't
// abstract this, so we peek the raw bytes first.
if err := s.negotiateTLS(); err != nil {
return
}
s.be = pgproto3.NewBackend(s.conn, s.conn)
// Step 2 — StartupMessage. After SSLRequest the client retries
// with a fresh length-prefixed startup. ReceiveStartupMessage
// handles either form transparently.
startupMsg, err := s.be.ReceiveStartupMessage()
if err != nil {
return // client gone
}
startup, ok := startupMsg.(*pgproto3.StartupMessage)
if !ok {
s.sendError("08P01", fmt.Sprintf("unexpected startup message: %T", startupMsg))
return
}
s.user = startup.Parameters["user"]
s.database = startup.Parameters["database"]
if s.database == "" {
s.database = s.user
}
// Step 3 — Auth. v1.0 lands `trust`; password/md5/scram in
// Phase 5 via auth.go.
if err := s.authenticate(); err != nil {
return
}
// Step 4 — ParameterStatus. These tell the client our identity
// and default formatting so it doesn't try features we don't
// have. server_version is the most-checked field; pgx + JDBC
// negotiate features based on its numeric prefix.
s.sendParameterStatus("server_version", s.srv.cfg.serverVersion())
s.sendParameterStatus("server_encoding", "UTF8")
s.sendParameterStatus("client_encoding", "UTF8")
s.sendParameterStatus("DateStyle", "ISO, MDY")
s.sendParameterStatus("TimeZone", "UTC")
s.sendParameterStatus("integer_datetimes", "on")
s.sendParameterStatus("standard_conforming_strings", "on")
// Step 5 — BackendKeyData. Some clients (psql) expect this
// even though we don't honour CancelRequest yet.
s.send(&pgproto3.BackendKeyData{ProcessID: s.pid, SecretKey: s.secret})
// Step 6 — ReadyForQuery, transaction status idle.
s.sendReadyForQuery()
// Step 7 — query loop. Each iteration:
// - Receive a frontend message
// - Dispatch (Query / Parse / Bind / Execute / Sync / Terminate / Close / Describe)
// - Send response stream
// - Loop
s.queryLoop(ctx)
}
// negotiateTLS reads the first 8-byte preamble. If it's an
// SSLRequest (length=8, version=80877103) and the server has a
// TLS config, we reply 'S' and upgrade; otherwise 'N' and the
// client retries unencrypted (or hangs up if it required TLS).
func (s *session) negotiateTLS() error {
var hdr [8]byte
if _, err := io.ReadFull(s.conn, hdr[:]); err != nil {
return err
}
length := binary.BigEndian.Uint32(hdr[0:4])
version := binary.BigEndian.Uint32(hdr[4:8])
const sslReqCode = 80877103
if length == 8 && version == sslReqCode {
if s.srv.cfg.TLSConfig != nil {
if _, err := s.conn.Write([]byte{'S'}); err != nil {
return err
}
// tls.go implements the upgrade; v1.0 stub returns
// the connection unchanged so the byte ordering is
// correct but cipher negotiation isn't wired yet.
upgraded, err := upgradeToTLS(s.conn, s.srv.cfg.TLSConfig)
if err != nil {
return err
}
s.conn = upgraded
return nil
}
if _, err := s.conn.Write([]byte{'N'}); err != nil {
return err
}
return nil
}
// Not an SSLRequest — the 8 bytes are the start of a
// StartupMessage. Buffer them so pgproto3.ReceiveStartupMessage
// sees the full payload.
s.conn = &prefixedConn{Conn: s.conn, prefix: hdr[:]}
return nil
}
// send writes a single backend message to the wire.
func (s *session) send(msg pgproto3.BackendMessage) {
s.be.Send(msg)
_ = s.be.Flush()
}
func (s *session) sendParameterStatus(name, value string) {
s.send(&pgproto3.ParameterStatus{Name: name, Value: value})
}
func (s *session) sendReadyForQuery() {
s.send(&pgproto3.ReadyForQuery{TxStatus: s.txStatus})
}
// sendError ships an ErrorResponse without forcing the caller to
// build the full pgproto3 struct. Used for protocol-level errors
// before the query loop starts; in-loop errors go through errmap.go.
func (s *session) sendError(sqlState, message string) {
s.send(&pgproto3.ErrorResponse{
Severity: "ERROR",
Code: sqlState,
Message: message,
})
}
// queryLoop runs until the client sends Terminate, closes the
// connection, or we hit an unrecoverable error.
func (s *session) queryLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}
msg, err := s.be.Receive()
if err != nil {
return // client gone or wire-level error
}
switch m := msg.(type) {
case *pgproto3.Terminate:
return
case *pgproto3.Query:
s.dispatchSimpleQuery(strings.TrimSpace(m.String))
case *pgproto3.Parse:
s.handleParse(m)
case *pgproto3.Bind:
s.handleBind(m)
case *pgproto3.Describe:
s.handleDescribe(m)
case *pgproto3.Execute:
s.handleExecute(m)
case *pgproto3.Close:
s.handleClose(m)
case *pgproto3.Sync:
s.handleSync()
case *pgproto3.Flush:
// Force-flush — pgproto3.Backend.Send already flushes
// after each Send(), so this is a no-op for us.
default:
s.sendError("0A000",
fmt.Sprintf("message %T not supported", m))
s.sendReadyForQuery()
}
}
}
// prefixedConn injects pre-read bytes back into a net.Conn so a
// caller that needs those bytes for parsing (pgproto3 reading the
// StartupMessage after our SSLRequest peek) sees them seamlessly.
type prefixedConn struct {
net.Conn
prefix []byte
off int
}
func (p *prefixedConn) Read(buf []byte) (int, error) {
if p.off < len(p.prefix) {
n := copy(buf, p.prefix[p.off:])
p.off += n
return n, nil
}
return p.Conn.Read(buf)
}