pgx and most drivers default to PostgreSQL's Extended Protocol
(named prepared statements). Phase 2 only handled Simple Query,
so every pgx caller had to force `QueryExecModeSimpleProtocol` —
unworkable for a production deployment. This commit lands the
full Parse → Bind → Describe → Execute → Sync state machine,
enough that pgx (and any other libpq-protocol-v3 client) works
without any client-side knobs.
Implementation lives in `hbrtl/pgserver/extended.go`:
* Per-session caches `stmts map[string]*preparedStmt` and
`portals map[string]*portal`, lazily allocated on first use.
Stored as fields on `session` so they don't leak across
connections.
* Parameters are inlined at Bind time via `substituteParams` —
the resolved SQL is a normal Simple-Query-shaped string the
engine sees through the existing `five_SQL(cSQL, …, oSession)`
pipeline. Avoids teaching FiveSql2 a second param-shape; the
trade-off is that binary timestamps/numerics round-trip through
text (Phase 4.1 will plumb `?`-params through aParams for the
binary fast path).
* `paramToLiteral` decodes the binary-format encodings pgx uses
by default for INT4/INT8/BOOL (big-endian fixed-width). Other
binary OIDs fall back to a hex-escaped quoted literal which
errors loudly rather than silently misparsing.
* `countPgPlaceholders` scans the SQL outside string literals for
the highest `$N` so the server can answer Describe-statement
with a correctly-sized ParameterDescription even when the
client didn't pre-declare param OIDs. Without this, pgx errored
with "expected 0 arguments, got 2" on the very first prepared
query.
* RowDescription emission: Describe-statement still returns NoData
(we can't infer row shape without execution). When Execute fires
on a portal the client never Described, we emit RowDescription
inline from the cached result before DataRow streams. pgx and
psql both tolerate this ordering.
* Execute → CommandComplete tag derives from the SQL verb via the
existing `commandTagFor` helper. Row counts in the tag remain
"VERB 0" for v1.0; threading real counters through the engine
is Phase 5.
Wire dispatch in `session.go:queryLoop` now handles Parse, Bind,
Describe, Execute, Close, Sync, Flush — the full v3 message set.
Verification
------------
End-to-end pgx (default mode, no SimpleProtocol flag) successfully
runs:
SELECT $1 AS n, $2 AS s with 42 + "hi" → [42 hi]
Same statement re-executed with different bound values → reuses
the cached prepared statement
SELECT $1 AS b, $2 AS s with true + "binary-bool" → [t binary-bool]
`tests/pgserver/run.sh` expanded from 1 → 3 integration assertions:
PASS Simple Query: SELECT 1, 'hello'
PASS Multi-statement Simple Query
PASS Transaction control: BEGIN/COMMIT round-trip
(Extended Protocol can't be driven from psql's -c CLI directly
because psql's PREPARE/EXECUTE is a separate SQL-level feature
that FiveSql2 doesn't parse; the pgx-driven path verifies it
manually, and a self-contained Go integration that drives pgx
from inside a process bootstrap is Phase 7 work.)
All six release gates green:
go test ./... ✓
FiveSql2 SQL:1999 43/43 ✓
Harbour compat 56/56 ✓
std.ch 17/17 ✓
FRB 7/7 ✓
pgserver integration 3/3 ✓
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
279 lines
8.1 KiB
Go
279 lines
8.1 KiB
Go
// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com)
|
|
// All rights reserved.
|
|
|
|
package pgserver
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rand"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5/pgproto3"
|
|
|
|
"five/hbrt"
|
|
)
|
|
|
|
// session is the per-connection state. One goroutine per session;
|
|
// nothing in session is touched by anyone else, so all fields are
|
|
// access-by-owner without locking.
|
|
type session struct {
|
|
srv *Server
|
|
conn net.Conn
|
|
|
|
// pgproto3 Backend speaks the PG protocol on the wire. Created
|
|
// after the TLS-upgrade decision so handshake bytes go through
|
|
// the right transport.
|
|
be *pgproto3.Backend
|
|
|
|
// Authenticated identity. Empty before AuthenticationOk.
|
|
user string
|
|
database string
|
|
|
|
// Per-connection hbrt.Thread, used to dispatch FIVE_SQL calls
|
|
// inside the PRG world. Owns a TSqlSession on the PRG side
|
|
// (created lazily by the first query via five_SQL's default-
|
|
// session fallback; for true isolation we instantiate one
|
|
// explicitly — see queryLoop).
|
|
thread *hbrt.Thread
|
|
|
|
// PRG-side session value: a TSqlSession instance held in a
|
|
// thread-local hbrt slot. Allocated on first query.
|
|
prgSession hbrt.Value
|
|
|
|
// Cancellation key — PG protocol mandates random 32-bit
|
|
// process id + secret in BackendKeyData so clients can send
|
|
// CancelRequest later. We don't honour cancel yet (v1.1),
|
|
// but the BackendKeyData must still be present and unique.
|
|
pid uint32
|
|
secret []byte // 4 random bytes per PG BackendKeyData
|
|
|
|
// Current transaction status, emitted in every ReadyForQuery:
|
|
// 'I' = idle, 'T' = in transaction, 'E' = failed transaction.
|
|
txStatus byte
|
|
|
|
// Extended-protocol per-session caches. Both lazily allocated
|
|
// by ensureExtendedState() in extended.go; nil before the
|
|
// first Parse/Bind. pgx-style clients re-use the unnamed
|
|
// statement aggressively, so leave both alive for the whole
|
|
// session.
|
|
stmts map[string]*preparedStmt
|
|
portals map[string]*portal
|
|
}
|
|
|
|
func newSession(srv *Server, conn net.Conn) *session {
|
|
var pid uint32
|
|
_ = binary.Read(rand.Reader, binary.BigEndian, &pid)
|
|
secret := make([]byte, 4)
|
|
_, _ = rand.Read(secret)
|
|
return &session{
|
|
srv: srv,
|
|
conn: conn,
|
|
pid: pid,
|
|
secret: secret,
|
|
txStatus: 'I',
|
|
}
|
|
}
|
|
|
|
// run drives the full session lifecycle:
|
|
//
|
|
// 1. SSLRequest / GSSEncRequest probe (reply 'N' for v1.0)
|
|
// 2. StartupMessage parse
|
|
// 3. Auth (trust path for v1.0; password/MD5/SCRAM in Phase 5)
|
|
// 4. ParameterStatus broadcast
|
|
// 5. BackendKeyData
|
|
// 6. ReadyForQuery
|
|
// 7. query loop until Terminate / EOF / error
|
|
//
|
|
// Any error in steps 1-3 aborts the session before query loop
|
|
// begins; an ErrorResponse goes back if at all possible.
|
|
func (s *session) run(ctx context.Context) {
|
|
// Step 1 — SSLRequest peek. Client sends 8 bytes: length(8) +
|
|
// special version 80877103 to ask for TLS; or it skips this and
|
|
// goes straight to StartupMessage. The pgproto3 Backend doesn't
|
|
// abstract this, so we peek the raw bytes first.
|
|
if err := s.negotiateTLS(); err != nil {
|
|
return
|
|
}
|
|
|
|
s.be = pgproto3.NewBackend(s.conn, s.conn)
|
|
|
|
// Step 2 — StartupMessage. After SSLRequest the client retries
|
|
// with a fresh length-prefixed startup. ReceiveStartupMessage
|
|
// handles either form transparently.
|
|
startupMsg, err := s.be.ReceiveStartupMessage()
|
|
if err != nil {
|
|
return // client gone
|
|
}
|
|
startup, ok := startupMsg.(*pgproto3.StartupMessage)
|
|
if !ok {
|
|
s.sendError("08P01", fmt.Sprintf("unexpected startup message: %T", startupMsg))
|
|
return
|
|
}
|
|
s.user = startup.Parameters["user"]
|
|
s.database = startup.Parameters["database"]
|
|
if s.database == "" {
|
|
s.database = s.user
|
|
}
|
|
|
|
// Step 3 — Auth. v1.0 lands `trust`; password/md5/scram in
|
|
// Phase 5 via auth.go.
|
|
if err := s.authenticate(); err != nil {
|
|
return
|
|
}
|
|
|
|
// Step 4 — ParameterStatus. These tell the client our identity
|
|
// and default formatting so it doesn't try features we don't
|
|
// have. server_version is the most-checked field; pgx + JDBC
|
|
// negotiate features based on its numeric prefix.
|
|
s.sendParameterStatus("server_version", s.srv.cfg.serverVersion())
|
|
s.sendParameterStatus("server_encoding", "UTF8")
|
|
s.sendParameterStatus("client_encoding", "UTF8")
|
|
s.sendParameterStatus("DateStyle", "ISO, MDY")
|
|
s.sendParameterStatus("TimeZone", "UTC")
|
|
s.sendParameterStatus("integer_datetimes", "on")
|
|
s.sendParameterStatus("standard_conforming_strings", "on")
|
|
|
|
// Step 5 — BackendKeyData. Some clients (psql) expect this
|
|
// even though we don't honour CancelRequest yet.
|
|
s.send(&pgproto3.BackendKeyData{ProcessID: s.pid, SecretKey: s.secret})
|
|
|
|
// Step 6 — ReadyForQuery, transaction status idle.
|
|
s.sendReadyForQuery()
|
|
|
|
// Step 7 — query loop. Each iteration:
|
|
// - Receive a frontend message
|
|
// - Dispatch (Query / Parse / Bind / Execute / Sync / Terminate / Close / Describe)
|
|
// - Send response stream
|
|
// - Loop
|
|
s.queryLoop(ctx)
|
|
}
|
|
|
|
// negotiateTLS reads the first 8-byte preamble. If it's an
|
|
// SSLRequest (length=8, version=80877103) and the server has a
|
|
// TLS config, we reply 'S' and upgrade; otherwise 'N' and the
|
|
// client retries unencrypted (or hangs up if it required TLS).
|
|
func (s *session) negotiateTLS() error {
|
|
var hdr [8]byte
|
|
if _, err := io.ReadFull(s.conn, hdr[:]); err != nil {
|
|
return err
|
|
}
|
|
length := binary.BigEndian.Uint32(hdr[0:4])
|
|
version := binary.BigEndian.Uint32(hdr[4:8])
|
|
const sslReqCode = 80877103
|
|
|
|
if length == 8 && version == sslReqCode {
|
|
if s.srv.cfg.TLSConfig != nil {
|
|
if _, err := s.conn.Write([]byte{'S'}); err != nil {
|
|
return err
|
|
}
|
|
// tls.go implements the upgrade; v1.0 stub returns
|
|
// the connection unchanged so the byte ordering is
|
|
// correct but cipher negotiation isn't wired yet.
|
|
upgraded, err := upgradeToTLS(s.conn, s.srv.cfg.TLSConfig)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.conn = upgraded
|
|
return nil
|
|
}
|
|
if _, err := s.conn.Write([]byte{'N'}); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Not an SSLRequest — the 8 bytes are the start of a
|
|
// StartupMessage. Buffer them so pgproto3.ReceiveStartupMessage
|
|
// sees the full payload.
|
|
s.conn = &prefixedConn{Conn: s.conn, prefix: hdr[:]}
|
|
return nil
|
|
}
|
|
|
|
// send writes a single backend message to the wire.
|
|
func (s *session) send(msg pgproto3.BackendMessage) {
|
|
s.be.Send(msg)
|
|
_ = s.be.Flush()
|
|
}
|
|
|
|
func (s *session) sendParameterStatus(name, value string) {
|
|
s.send(&pgproto3.ParameterStatus{Name: name, Value: value})
|
|
}
|
|
|
|
func (s *session) sendReadyForQuery() {
|
|
s.send(&pgproto3.ReadyForQuery{TxStatus: s.txStatus})
|
|
}
|
|
|
|
// sendError ships an ErrorResponse without forcing the caller to
|
|
// build the full pgproto3 struct. Used for protocol-level errors
|
|
// before the query loop starts; in-loop errors go through errmap.go.
|
|
func (s *session) sendError(sqlState, message string) {
|
|
s.send(&pgproto3.ErrorResponse{
|
|
Severity: "ERROR",
|
|
Code: sqlState,
|
|
Message: message,
|
|
})
|
|
}
|
|
|
|
// queryLoop runs until the client sends Terminate, closes the
|
|
// connection, or we hit an unrecoverable error.
|
|
func (s *session) queryLoop(ctx context.Context) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
msg, err := s.be.Receive()
|
|
if err != nil {
|
|
return // client gone or wire-level error
|
|
}
|
|
switch m := msg.(type) {
|
|
case *pgproto3.Terminate:
|
|
return
|
|
case *pgproto3.Query:
|
|
s.dispatchSimpleQuery(strings.TrimSpace(m.String))
|
|
case *pgproto3.Parse:
|
|
s.handleParse(m)
|
|
case *pgproto3.Bind:
|
|
s.handleBind(m)
|
|
case *pgproto3.Describe:
|
|
s.handleDescribe(m)
|
|
case *pgproto3.Execute:
|
|
s.handleExecute(m)
|
|
case *pgproto3.Close:
|
|
s.handleClose(m)
|
|
case *pgproto3.Sync:
|
|
s.handleSync()
|
|
case *pgproto3.Flush:
|
|
// Force-flush — pgproto3.Backend.Send already flushes
|
|
// after each Send(), so this is a no-op for us.
|
|
default:
|
|
s.sendError("0A000",
|
|
fmt.Sprintf("message %T not supported", m))
|
|
s.sendReadyForQuery()
|
|
}
|
|
}
|
|
}
|
|
|
|
// prefixedConn injects pre-read bytes back into a net.Conn so a
|
|
// caller that needs those bytes for parsing (pgproto3 reading the
|
|
// StartupMessage after our SSLRequest peek) sees them seamlessly.
|
|
type prefixedConn struct {
|
|
net.Conn
|
|
prefix []byte
|
|
off int
|
|
}
|
|
|
|
func (p *prefixedConn) Read(buf []byte) (int, error) {
|
|
if p.off < len(p.prefix) {
|
|
n := copy(buf, p.prefix[p.off:])
|
|
p.off += n
|
|
return n, nil
|
|
}
|
|
return p.Conn.Read(buf)
|
|
}
|