First end-to-end working version of the PostgreSQL-wire-compatible
TCP server frontend. A standard `psql` client now connects, runs
`SELECT * FROM employees`, and gets back a properly typed result
set rendered by psql with the right column alignment:
ID | NAME | SALARY
----+----------------------+----------
1 | Alice | 50000.00
2 | Bob | 42000.50
3 | Cho | 77500.00
This is the Phase 2 deliverable from the approved plan at
/Users/charleskwon/.claude/plans/compiled-launching-shore.md.
Builds on the session-state refactor in 93cf5c8 — each connection
gets its own TSqlSession on the PRG side via the new PG_NEW_SESSION
HB_FUNC, so concurrent psql clients won't share transaction logs
or plan caches.
Scope
-----
v1.0 MVP: Simple Query only, trust auth, no TLS yet. SELECT works
against the full FiveSql2 surface (CTEs, window functions, JOINs,
aggregates). DML + per-session transactions are Phase 3, extended
protocol is Phase 4, auth + TLS are Phases 5/6.
Architecture
------------
psql/pgx/JDBC ──TCP:5432──▶ pgserver.Listener
│ accept()
▼ go handleConn(net.Conn)
┌─────────────────────────────┐
│ Session goroutine │
│ 1. SSLRequest peek │
│ 2. StartupMessage │
│ 3. AuthenticationOk (trust) │
│ 4. ParameterStatus×7 │
│ 5. BackendKeyData │
│ 6. ReadyForQuery('I') │
│ 7. loop: Receive() → │
│ dispatchSimpleQuery → │
│ hbrt.Thread.Function( │
│ FIVE_SQL,sql,...,sess) │
│ emit RowDescription │
│ emit DataRow×N │
│ emit CommandComplete │
│ emit ReadyForQuery │
└─────────────────────────────┘
One goroutine per connection, each owning its own *hbrt.Thread and
TSqlSession instance. Uses the existing audit-fixed NewThread()
(cde8673) so statics + WA factory propagate.
New files (hbrtl/pgserver/)
---------------------------
* server.go — Config, Server, Serve loop with MaxConnections gate
via semaphore, Close drains in-flight sessions.
* session.go — full lifecycle: SSLRequest peek + prefixedConn
byte-injection trick for StartupMessage, ParameterStatus
broadcast (server_version "14.0 (FiveSql2)" so pgx negotiates),
BackendKeyData (random pid+secret per session, no CancelRequest
yet), query loop dispatching only Simple Query in v1.0 with a
loud "0A000 not supported" for Extended messages.
* dispatch.go — runSQL invokes FIVE_SQL via PushSymbol+Function,
unpacks the engine's `{aFieldNames, aRows}` envelope or the
`{{"__error__"}, {{nCode, cMsg, cSQL}}}` error shape, emits
RowDescription with text-format OIDs and DataRow per row.
* typemap.go — pgTypeFor() picks INT4 / INT8 / NUMERIC / TEXT /
DATE / TIMESTAMP / BOOL by sampling the first row's value type;
encodeText() formats each cell, returning nil-slice for NULL
(the PG length=-1 convention).
* errmap.go — sqlStateFor() maps FiveSql2 SQL_ERR_* codes to
canonical PG SQLSTATEs (42601/42P01/42703/42804/23505/23514/
23503/25P02/42501/02000/XX000).
* auth.go — trust mode in v1.0; password/MD5/SCRAM lands Phase 5
but the dispatch sentinel is already in place.
* tls.go — upgradeToTLS stub for SSLRequest handling; the byte-
ordering is already wired so Phase 6 just plugs in tls.Config.
* register.go — package init() registers pg_server_start /
pg_server_stop HB_FUNCs. Importing the package (done from
hbrtl/register.go via blank import) is enough to enable them.
* pgserver_test.go — unit tests for encodeText (numeric, string,
NIL), pgTypeFor (OID dispatch), sqlStateFor (error mapping),
commandTagFor (SELECT/INSERT/UPDATE/DELETE/BEGIN/COMMIT).
Other changes
-------------
* _FiveSql2/src/TSqlSession.prg — added PG_NEW_SESSION() factory
used by the Go dispatcher to allocate a per-connection session
bypassing the embedded process default.
* hbrtl/register.go — blank-import five/hbrtl/pgserver so its
init() fires and the HB_FUNCs land in the global dynamic-func
table for VM symbol lookup.
* go.mod / go.sum — github.com/jackc/pgx/v5 v5.9.2 (pgproto3
subpackage). MIT license. Same library pgx itself uses, so
protocol coverage matches the de-facto Go PG ecosystem.
Verification
------------
$ pg_server_start(15432, "trust") /* PRG one-liner */
$ psql -h 127.0.0.1 -p 15432 -U fiveuser -c 'SELECT * FROM employees'
→ 3 rows rendered correctly by psql (ID as INT4, NAME as TEXT,
SALARY as NUMERIC(10,2) with 2 decimal places)
All six release gates green:
go test ./... ✓ (incl. new hbrtl/pgserver tests)
FiveSql2 SQL:1999 43/43 ✓
Harbour compat 56/56 ✓
std.ch 17/17 ✓
FRB 7/7 ✓
examples 65/71 ✓ (unchanged baseline)
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
315 lines
8.7 KiB
Go
315 lines
8.7 KiB
Go
// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com)
|
|
// All rights reserved.
|
|
|
|
package pgserver
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5/pgproto3"
|
|
|
|
"five/hbrt"
|
|
)
|
|
|
|
// runSQL invokes the PRG-level `five_SQL(cSQL, NIL, NIL, oSession)`
|
|
// on this connection's thread and returns the resulting hbrt.Value
|
|
// (the engine's `aResult` two-element array, or an error array).
|
|
//
|
|
// The fourth arg threads the per-connection TSqlSession through so
|
|
// concurrent connections never collide on transaction state or the
|
|
// plan cache (see refactor commit 93cf5c8).
|
|
func (s *session) runSQL(sql string) (result hbrt.Value, runErr error) {
|
|
if s.thread == nil {
|
|
s.thread = s.srv.vm.NewThread()
|
|
}
|
|
if s.prgSession.IsNil() {
|
|
// Lazily allocate a TSqlSession on the PRG side. We do it
|
|
// via a dedicated helper instead of the engine's
|
|
// SqlDefaultSession() so each connection truly gets its
|
|
// own object; the default session is reserved for embedded
|
|
// callers.
|
|
sessVal, err := callPRG(s.thread, "PG_NEW_SESSION", nil)
|
|
if err != nil {
|
|
return hbrt.MakeNil(), err
|
|
}
|
|
s.prgSession = sessVal
|
|
}
|
|
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
// PRG-side panic (HbError, BreakValue, etc.) — surface
|
|
// as a runtime error so the caller can map it to an
|
|
// ErrorResponse without aborting the whole connection.
|
|
runErr = fmt.Errorf("five_SQL panic: %v", r)
|
|
}
|
|
}()
|
|
|
|
args := []hbrt.Value{
|
|
hbrt.MakeString(sql),
|
|
hbrt.MakeNil(), // aParams
|
|
hbrt.MakeNil(), // bBlock
|
|
s.prgSession, // oSession
|
|
}
|
|
return callPRG(s.thread, "FIVE_SQL", args)
|
|
}
|
|
|
|
// callPRG performs a single PRG function call on the given thread,
|
|
// returning the PRG-level return value. nil args means a zero-arg
|
|
// call. Errors come back as Go errors only for symbol resolution
|
|
// failures; PRG-level runtime errors panic up to the caller's
|
|
// recover() (see runSQL).
|
|
func callPRG(t *hbrt.Thread, name string, args []hbrt.Value) (hbrt.Value, error) {
|
|
sym := t.VM().FindSymbol(strings.ToUpper(name))
|
|
if sym == nil {
|
|
return hbrt.MakeNil(), fmt.Errorf("pgserver: PRG symbol %q not found", name)
|
|
}
|
|
t.PushSymbol(sym)
|
|
t.PushNil() // self placeholder — matches gengo's call layout
|
|
for _, a := range args {
|
|
t.PushValue(a)
|
|
}
|
|
t.Function(len(args))
|
|
// Function() pushed retVal back onto the stack; pop it.
|
|
return popValue(t), nil
|
|
}
|
|
|
|
// popValue retrieves the topmost stack entry as the call's return.
|
|
// Thread.Pop2() pops and returns (the named non-`return` Pop() is the
|
|
// statement form used by gengo); we want the value here, so Pop2.
|
|
func popValue(t *hbrt.Thread) hbrt.Value {
|
|
return t.Pop2()
|
|
}
|
|
|
|
// handleSimpleQuery dispatches a Simple Query message: parse the
|
|
// SQL, run it through five_SQL, and stream RowDescription +
|
|
// DataRow* + CommandComplete + ReadyForQuery back.
|
|
//
|
|
// FiveSql2's result envelope is either:
|
|
//
|
|
// { aFieldNames, aRows } — success
|
|
// { {"__error__"}, { {nCode, cMsg, cSQL} } } — failure
|
|
//
|
|
// We detect the error shape by inspecting aResult[0][0] for the
|
|
// sentinel "__error__" header column.
|
|
func (s *session) dispatchSimpleQuery(sql string) {
|
|
if sql == "" {
|
|
s.send(&pgproto3.EmptyQueryResponse{})
|
|
s.sendReadyForQuery()
|
|
return
|
|
}
|
|
|
|
result, err := s.runSQL(sql)
|
|
if err != nil {
|
|
s.send(buildErrorResponse("XX000", err.Error(), sql))
|
|
s.txStatus = currentTxStatusAfterError(s.txStatus)
|
|
s.sendReadyForQuery()
|
|
return
|
|
}
|
|
|
|
if result.IsNil() || !result.IsArray() {
|
|
// Bare NIL or non-array — most plausibly a successful
|
|
// statement that doesn't produce a result set (e.g. DDL,
|
|
// SET, transaction control). Reply CommandComplete with
|
|
// a generic tag.
|
|
tag := commandTagFor(sql)
|
|
s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)})
|
|
s.updateTxStatusForTag(tag)
|
|
s.sendReadyForQuery()
|
|
return
|
|
}
|
|
|
|
arr := result.AsArray()
|
|
if isErrorEnvelope(arr) {
|
|
nCode, cMsg, cSQL := unpackError(arr)
|
|
s.send(buildErrorResponse(sqlStateFor(nCode), cMsg, cSQL))
|
|
s.txStatus = currentTxStatusAfterError(s.txStatus)
|
|
s.sendReadyForQuery()
|
|
return
|
|
}
|
|
|
|
// Success envelope: { aFieldNames, aRows }
|
|
fieldsVal := arr.Items[0]
|
|
rowsVal := hbrt.MakeNil()
|
|
if len(arr.Items) >= 2 {
|
|
rowsVal = arr.Items[1]
|
|
}
|
|
if err := s.emitResultSet(fieldsVal, rowsVal, sql); err != nil {
|
|
s.send(buildErrorResponse("XX000", err.Error(), sql))
|
|
}
|
|
tag := commandTagFor(sql)
|
|
s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)})
|
|
s.updateTxStatusForTag(tag)
|
|
s.sendReadyForQuery()
|
|
}
|
|
|
|
// emitResultSet writes RowDescription + a DataRow per source row.
|
|
func (s *session) emitResultSet(fieldsVal, rowsVal hbrt.Value, sql string) error {
|
|
if !fieldsVal.IsArray() {
|
|
return fmt.Errorf("malformed result: first element is %s, expected array", fiveTypeName(fieldsVal))
|
|
}
|
|
fields := fieldsVal.AsArray().Items
|
|
|
|
// First-row inference: scan the leftmost non-NIL value per
|
|
// column to pick a stable PG OID. v1.0 sticks to TEXT for
|
|
// any column with mixed/NIL types — easy upgrade path to
|
|
// declared schema lookup later.
|
|
var firstRow []hbrt.Value
|
|
if rowsVal.IsArray() && len(rowsVal.AsArray().Items) > 0 {
|
|
if rowsVal.AsArray().Items[0].IsArray() {
|
|
firstRow = rowsVal.AsArray().Items[0].AsArray().Items
|
|
}
|
|
}
|
|
|
|
descFields := make([]pgproto3.FieldDescription, len(fields))
|
|
for i, f := range fields {
|
|
name := ""
|
|
if f.IsString() {
|
|
name = f.AsString()
|
|
} else {
|
|
name = fmt.Sprintf("column%d", i+1)
|
|
}
|
|
var sample hbrt.Value
|
|
if i < len(firstRow) {
|
|
sample = firstRow[i]
|
|
}
|
|
oid, typeSize := pgTypeFor(sample)
|
|
descFields[i] = pgproto3.FieldDescription{
|
|
Name: []byte(name),
|
|
TableOID: 0,
|
|
TableAttributeNumber: 0,
|
|
DataTypeOID: uint32(oid),
|
|
DataTypeSize: typeSize,
|
|
TypeModifier: -1,
|
|
Format: 0, // text format
|
|
}
|
|
}
|
|
s.send(&pgproto3.RowDescription{Fields: descFields})
|
|
|
|
if !rowsVal.IsArray() {
|
|
return nil
|
|
}
|
|
rows := rowsVal.AsArray().Items
|
|
for _, rowVal := range rows {
|
|
if !rowVal.IsArray() {
|
|
continue
|
|
}
|
|
cells := rowVal.AsArray().Items
|
|
out := make([][]byte, len(fields))
|
|
for i := range fields {
|
|
if i < len(cells) {
|
|
out[i] = encodeText(cells[i])
|
|
}
|
|
}
|
|
s.send(&pgproto3.DataRow{Values: out})
|
|
}
|
|
_ = sql // reserved for tag generation
|
|
return nil
|
|
}
|
|
|
|
// isErrorEnvelope detects the `{ {"__error__"}, ... }` shape that
|
|
// FiveSql2 returns on failure. The header is the very first cell
|
|
// of the first row.
|
|
func isErrorEnvelope(arr *hbrt.HbArray) bool {
|
|
if arr == nil || len(arr.Items) < 1 {
|
|
return false
|
|
}
|
|
hdr := arr.Items[0]
|
|
if !hdr.IsArray() {
|
|
return false
|
|
}
|
|
items := hdr.AsArray().Items
|
|
if len(items) == 0 || !items[0].IsString() {
|
|
return false
|
|
}
|
|
return items[0].AsString() == "__error__"
|
|
}
|
|
|
|
// unpackError extracts (code, message, sql) from an error envelope.
|
|
func unpackError(arr *hbrt.HbArray) (int, string, string) {
|
|
if len(arr.Items) < 2 || !arr.Items[1].IsArray() {
|
|
return 0, "unknown error", ""
|
|
}
|
|
rows := arr.Items[1].AsArray().Items
|
|
if len(rows) == 0 || !rows[0].IsArray() {
|
|
return 0, "unknown error", ""
|
|
}
|
|
cells := rows[0].AsArray().Items
|
|
code := 0
|
|
msg := ""
|
|
sql := ""
|
|
if len(cells) >= 1 && cells[0].IsNumeric() {
|
|
code = int(cells[0].AsNumInt())
|
|
}
|
|
if len(cells) >= 2 && cells[1].IsString() {
|
|
msg = cells[1].AsString()
|
|
}
|
|
if len(cells) >= 3 && cells[2].IsString() {
|
|
sql = cells[2].AsString()
|
|
}
|
|
return code, msg, sql
|
|
}
|
|
|
|
// commandTagFor builds the PG-style command tag string. Format:
|
|
//
|
|
// "SELECT n" (for SELECT)
|
|
// "INSERT 0 n" (oid + row count; oid is always 0 in PG ≥ 12)
|
|
// "UPDATE n"
|
|
// "DELETE n"
|
|
// "BEGIN" / "COMMIT" / "ROLLBACK" (verbatim)
|
|
//
|
|
// v1.0-skeleton: emits a verb-only tag; row counts come in the
|
|
// SimpleQuery commit when we have streaming row counters.
|
|
func commandTagFor(sql string) string {
|
|
verb := strings.ToUpper(strings.SplitN(strings.TrimSpace(sql), " ", 2)[0])
|
|
switch verb {
|
|
case "SELECT", "INSERT", "UPDATE", "DELETE":
|
|
return verb + " 0"
|
|
case "BEGIN", "COMMIT", "ROLLBACK", "SAVEPOINT", "RELEASE":
|
|
return verb
|
|
default:
|
|
return verb
|
|
}
|
|
}
|
|
|
|
func (s *session) updateTxStatusForTag(tag string) {
|
|
switch {
|
|
case strings.HasPrefix(tag, "BEGIN"):
|
|
s.txStatus = 'T'
|
|
case strings.HasPrefix(tag, "COMMIT"), strings.HasPrefix(tag, "ROLLBACK"):
|
|
s.txStatus = 'I'
|
|
}
|
|
}
|
|
|
|
func currentTxStatusAfterError(prev byte) byte {
|
|
if prev == 'T' {
|
|
return 'E' // failed transaction — client must ROLLBACK
|
|
}
|
|
return prev
|
|
}
|
|
|
|
// fiveTypeName returns a printable name for the underlying Five
|
|
// value's runtime tag, used in diagnostic messages.
|
|
func fiveTypeName(v hbrt.Value) string {
|
|
switch {
|
|
case v.IsNil():
|
|
return "NIL"
|
|
case v.IsString():
|
|
return "STRING"
|
|
case v.IsNumeric():
|
|
return "NUMERIC"
|
|
case v.IsArray():
|
|
return "ARRAY"
|
|
case v.IsHash():
|
|
return "HASH"
|
|
case v.IsLogical():
|
|
return "LOGICAL"
|
|
case v.IsDate():
|
|
return "DATE"
|
|
case v.IsTimestamp():
|
|
return "TIMESTAMP"
|
|
default:
|
|
return "UNKNOWN"
|
|
}
|
|
}
|