// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com) // All rights reserved. package pgserver import ( "fmt" "strings" "github.com/jackc/pgx/v5/pgproto3" "five/hbrt" ) // runSQL invokes the PRG-level `five_SQL(cSQL, NIL, NIL, oSession)` // on this connection's thread and returns the resulting hbrt.Value // (the engine's `aResult` two-element array, or an error array). // // The fourth arg threads the per-connection TSqlSession through so // concurrent connections never collide on transaction state or the // plan cache (see refactor commit 93cf5c8). func (s *session) runSQL(sql string) (result hbrt.Value, runErr error) { if s.thread == nil { s.thread = s.srv.vm.NewThread() } if s.prgSession.IsNil() { // Lazily allocate a TSqlSession on the PRG side. We do it // via a dedicated helper instead of the engine's // SqlDefaultSession() so each connection truly gets its // own object; the default session is reserved for embedded // callers. sessVal, err := callPRG(s.thread, "PG_NEW_SESSION", nil) if err != nil { return hbrt.MakeNil(), err } s.prgSession = sessVal } defer func() { if r := recover(); r != nil { // PRG-side panic (HbError, BreakValue, etc.) — surface // as a runtime error so the caller can map it to an // ErrorResponse without aborting the whole connection. runErr = fmt.Errorf("five_SQL panic: %v", r) } }() args := []hbrt.Value{ hbrt.MakeString(sql), hbrt.MakeNil(), // aParams hbrt.MakeNil(), // bBlock s.prgSession, // oSession } return callPRG(s.thread, "FIVE_SQL", args) } // callPRG performs a single PRG function call on the given thread, // returning the PRG-level return value. nil args means a zero-arg // call. Errors come back as Go errors only for symbol resolution // failures; PRG-level runtime errors panic up to the caller's // recover() (see runSQL). func callPRG(t *hbrt.Thread, name string, args []hbrt.Value) (hbrt.Value, error) { sym := t.VM().FindSymbol(strings.ToUpper(name)) if sym == nil { return hbrt.MakeNil(), fmt.Errorf("pgserver: PRG symbol %q not found", name) } t.PushSymbol(sym) t.PushNil() // self placeholder — matches gengo's call layout for _, a := range args { t.PushValue(a) } t.Function(len(args)) // Function() pushed retVal back onto the stack; pop it. return popValue(t), nil } // popValue retrieves the topmost stack entry as the call's return. // Thread.Pop2() pops and returns (the named non-`return` Pop() is the // statement form used by gengo); we want the value here, so Pop2. func popValue(t *hbrt.Thread) hbrt.Value { return t.Pop2() } // handleSimpleQuery dispatches a Simple Query message: parse the // SQL, run it through five_SQL, and stream RowDescription + // DataRow* + CommandComplete + ReadyForQuery back. // // FiveSql2's result envelope is either: // // { aFieldNames, aRows } — success // { {"__error__"}, { {nCode, cMsg, cSQL} } } — failure // // We detect the error shape by inspecting aResult[0][0] for the // sentinel "__error__" header column. func (s *session) dispatchSimpleQuery(sql string) { if sql == "" { s.send(&pgproto3.EmptyQueryResponse{}) s.sendReadyForQuery() return } // pg_catalog intercept: BI tools (psql, pgx, DBeaver, Tableau) // fire dozens of catalog probes on connect. five_SQL can't parse // most of them, so handle the common shapes (SHOW, SELECT // version(), pg_namespace / pg_class / pg_type / pg_settings) // with synthesised responses BEFORE delegating to the engine. if handled, val := s.catalogIntercept(sql); handled { arr := val.AsArray() fieldsVal := arr.Items[0] rowsVal := hbrt.MakeNil() if len(arr.Items) >= 2 { rowsVal = arr.Items[1] } _ = s.emitResultSet(fieldsVal, rowsVal, sql) s.send(&pgproto3.CommandComplete{CommandTag: []byte(commandTagFor(sql))}) s.sendReadyForQuery() return } result, err := s.runSQL(sql) if err != nil { s.send(buildErrorResponse("XX000", err.Error(), sql)) s.txStatus = currentTxStatusAfterError(s.txStatus) s.sendReadyForQuery() return } if result.IsNil() || !result.IsArray() { // Bare NIL or non-array — most plausibly a successful // statement that doesn't produce a result set (e.g. DDL, // SET, transaction control). Reply CommandComplete with // a generic tag. tag := commandTagFor(sql) s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)}) s.updateTxStatusForTag(tag) s.sendReadyForQuery() return } arr := result.AsArray() if isErrorEnvelope(arr) { nCode, cMsg, cSQL := unpackError(arr) s.send(buildErrorResponse(sqlStateFor(nCode), cMsg, cSQL)) s.txStatus = currentTxStatusAfterError(s.txStatus) s.sendReadyForQuery() return } // Success envelope: { aFieldNames, aRows } fieldsVal := arr.Items[0] rowsVal := hbrt.MakeNil() if len(arr.Items) >= 2 { rowsVal = arr.Items[1] } if err := s.emitResultSet(fieldsVal, rowsVal, sql); err != nil { s.send(buildErrorResponse("XX000", err.Error(), sql)) } tag := commandTagFor(sql) s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)}) s.updateTxStatusForTag(tag) s.sendReadyForQuery() } // emitResultSet writes RowDescription + a DataRow per source row. func (s *session) emitResultSet(fieldsVal, rowsVal hbrt.Value, sql string) error { if !fieldsVal.IsArray() { return fmt.Errorf("malformed result: first element is %s, expected array", fiveTypeName(fieldsVal)) } fields := fieldsVal.AsArray().Items // First-row inference: scan the leftmost non-NIL value per // column to pick a stable PG OID. v1.0 sticks to TEXT for // any column with mixed/NIL types — easy upgrade path to // declared schema lookup later. var firstRow []hbrt.Value if rowsVal.IsArray() && len(rowsVal.AsArray().Items) > 0 { if rowsVal.AsArray().Items[0].IsArray() { firstRow = rowsVal.AsArray().Items[0].AsArray().Items } } descFields := make([]pgproto3.FieldDescription, len(fields)) for i, f := range fields { name := "" if f.IsString() { name = f.AsString() } else { name = fmt.Sprintf("column%d", i+1) } var sample hbrt.Value if i < len(firstRow) { sample = firstRow[i] } oid, typeSize := pgTypeFor(sample) descFields[i] = pgproto3.FieldDescription{ Name: []byte(name), TableOID: 0, TableAttributeNumber: 0, DataTypeOID: uint32(oid), DataTypeSize: typeSize, TypeModifier: -1, Format: 0, // text format } } s.send(&pgproto3.RowDescription{Fields: descFields}) if !rowsVal.IsArray() { return nil } rows := rowsVal.AsArray().Items for _, rowVal := range rows { if !rowVal.IsArray() { continue } cells := rowVal.AsArray().Items out := make([][]byte, len(fields)) for i := range fields { if i < len(cells) { out[i] = encodeText(cells[i]) } } s.send(&pgproto3.DataRow{Values: out}) } _ = sql // reserved for tag generation return nil } // isErrorEnvelope detects the `{ {"__error__"}, ... }` shape that // FiveSql2 returns on failure. The header is the very first cell // of the first row. func isErrorEnvelope(arr *hbrt.HbArray) bool { if arr == nil || len(arr.Items) < 1 { return false } hdr := arr.Items[0] if !hdr.IsArray() { return false } items := hdr.AsArray().Items if len(items) == 0 || !items[0].IsString() { return false } return items[0].AsString() == "__error__" } // unpackError extracts (code, message, sql) from an error envelope. func unpackError(arr *hbrt.HbArray) (int, string, string) { if len(arr.Items) < 2 || !arr.Items[1].IsArray() { return 0, "unknown error", "" } rows := arr.Items[1].AsArray().Items if len(rows) == 0 || !rows[0].IsArray() { return 0, "unknown error", "" } cells := rows[0].AsArray().Items code := 0 msg := "" sql := "" if len(cells) >= 1 && cells[0].IsNumeric() { code = int(cells[0].AsNumInt()) } if len(cells) >= 2 && cells[1].IsString() { msg = cells[1].AsString() } if len(cells) >= 3 && cells[2].IsString() { sql = cells[2].AsString() } return code, msg, sql } // commandTagFor builds the PG-style command tag string. Format: // // "SELECT n" (for SELECT) // "INSERT 0 n" (oid + row count; oid is always 0 in PG ≥ 12) // "UPDATE n" // "DELETE n" // "BEGIN" / "COMMIT" / "ROLLBACK" (verbatim) // // v1.0-skeleton: emits a verb-only tag; row counts come in the // SimpleQuery commit when we have streaming row counters. func commandTagFor(sql string) string { verb := strings.ToUpper(strings.SplitN(strings.TrimSpace(sql), " ", 2)[0]) switch verb { case "SELECT", "INSERT", "UPDATE", "DELETE": return verb + " 0" case "BEGIN", "COMMIT", "ROLLBACK", "SAVEPOINT", "RELEASE": return verb default: return verb } } func (s *session) updateTxStatusForTag(tag string) { switch { case strings.HasPrefix(tag, "BEGIN"): s.txStatus = 'T' case strings.HasPrefix(tag, "COMMIT"), strings.HasPrefix(tag, "ROLLBACK"): s.txStatus = 'I' } } func currentTxStatusAfterError(prev byte) byte { if prev == 'T' { return 'E' // failed transaction — client must ROLLBACK } return prev } // fiveTypeName returns a printable name for the underlying Five // value's runtime tag, used in diagnostic messages. func fiveTypeName(v hbrt.Value) string { switch { case v.IsNil(): return "NIL" case v.IsString(): return "STRING" case v.IsNumeric(): return "NUMERIC" case v.IsArray(): return "ARRAY" case v.IsHash(): return "HASH" case v.IsLogical(): return "LOGICAL" case v.IsDate(): return "DATE" case v.IsTimestamp(): return "TIMESTAMP" default: return "UNKNOWN" } }