Files
fivenode_go/hbrtl_ext/pgrtl/pg.go
Charles KWON OhJun 176f4e5cf5 feat(pgrtl): minimal PostgreSQL client RTL (pgxpool + 4 HB_FUNCs)
PG_OPEN(cDsn)                 -> integer handle, -1 on failure
  PG_CLOSE(nH)                  -> NIL
  PG_QUERY(nH, cSQL [, aArgs])  -> array of { col => val } hashes
  PG_EXEC (nH, cSQL [, aArgs])  -> rows affected, -1 on error
  PG_LAST_ERROR(nH)             -> last error string

Backed by github.com/jackc/pgx/v5/pgxpool, which is already in Five's
indirect dep tree (pgserver uses pgproto3 from the same repo). Pool
limits: MaxConns 8, MinConns 1, 5-min idle. Query timeout is capped at
30s so a runaway query can't pin a goroutine forever.

aArgs uses standard Postgres $1/$2/... placeholders — pgx parameter
binding prevents SQL injection. Never concatenate user input into cSQL.

Smoke-tested with app/pg_test.prg: bad DSN returns -1 cleanly (no
panic), the error path prints the expected fallback message, and the
real round-trip path is wired so setting LABDB_DSN to a live database
exercises SELECT + parameter binding + multi-row return without any
further code change.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 11:09:49 +09:00

289 lines
6.9 KiB
Go

// Package pgrtl exposes a small PostgreSQL client surface to PRG.
// Backed by github.com/jackc/pgx/v5 — already in Five's indirect
// dependency tree because pgserver uses pgproto3 from the same repo.
//
// PRG surface
//
// nH := PG_OPEN(cConnStr) -> integer handle, -1 on error
// PG_CLOSE(nH) -> NIL
// aRows := PG_QUERY(nH, cSQL [, aArgs]) -> array of hashes, NIL on error
// n := PG_EXEC(nH, cSQL [, aArgs]) -> rows affected (-1 on error)
// cErr := PG_LAST_ERROR(nH) -> last error message ("" if none)
//
// aArgs (optional) is a PRG array of scalar values; positions in cSQL
// use the Postgres $1, $2, ... numeric placeholders. The Go side does
// SQL injection prevention via pgx parameter binding — never string-
// concatenate user input into cSQL.
//
// PG_QUERY returns rows as an array of { columnName => value } hashes.
// Column names are lower-cased to match PostgreSQL's default casefold.
// Values come back typed: strings, numerics, booleans, nil; arrays /
// json fields arrive as their text representation (PRG can decode with
// hb_jsonDecode if needed).
//
// Handles are integers indexing into an internal map; PG_CLOSE removes
// the entry so the pool is GC'd. Concurrent PG_QUERY calls on the same
// handle are safe — pgxpool serialises through the connection pool.
package pgrtl
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"five/hbrt"
)
type pool struct {
pool *pgxpool.Pool
lastErr string
mu sync.Mutex
}
var (
pools sync.Map // map[int64]*pool
nextID atomic.Int64
queryCtx = func() context.Context {
// 30-second default cap so a runaway query can't pin a
// goroutine forever. Apps that need longer queries can run
// their own goroutine outside this RTL.
c, _ := context.WithTimeout(context.Background(), 30*time.Second)
return c
}
)
func init() {
hbrt.HB_FUNC("PG_OPEN", pgOpen)
hbrt.HB_FUNC("PG_CLOSE", pgClose)
hbrt.HB_FUNC("PG_QUERY", pgQuery)
hbrt.HB_FUNC("PG_EXEC", pgExec)
hbrt.HB_FUNC("PG_LAST_ERROR", pgLastError)
}
func loadPool(handle int64) *pool {
if v, ok := pools.Load(handle); ok {
return v.(*pool)
}
return nil
}
func setErr(p *pool, msg string) {
p.mu.Lock()
p.lastErr = msg
p.mu.Unlock()
}
func pgOpen(ctx *hbrt.HBContext) {
if ctx.PCount() < 1 || !ctx.IsChar(1) {
ctx.RetNI(-1)
return
}
conn := ctx.ParC(1)
cfg, err := pgxpool.ParseConfig(conn)
if err != nil {
// No handle yet — surface via return value only.
ctx.RetNI(-1)
return
}
cfg.MaxConns = 8
cfg.MinConns = 1
cfg.MaxConnIdleTime = 5 * time.Minute
dialCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
pp, err := pgxpool.NewWithConfig(dialCtx, cfg)
if err != nil {
ctx.RetNI(-1)
return
}
if err := pp.Ping(dialCtx); err != nil {
pp.Close()
ctx.RetNI(-1)
return
}
h := nextID.Add(1)
pools.Store(h, &pool{pool: pp})
ctx.RetNI(int(h))
}
func pgClose(ctx *hbrt.HBContext) {
if ctx.PCount() < 1 || !ctx.IsNumeric(1) {
ctx.RetNil()
return
}
h := int64(ctx.ParNI(1))
if v, ok := pools.LoadAndDelete(h); ok {
v.(*pool).pool.Close()
}
ctx.RetNil()
}
func pgLastError(ctx *hbrt.HBContext) {
if ctx.PCount() < 1 || !ctx.IsNumeric(1) {
ctx.RetC("")
return
}
p := loadPool(int64(ctx.ParNI(1)))
if p == nil {
ctx.RetC("invalid handle")
return
}
p.mu.Lock()
v := p.lastErr
p.mu.Unlock()
ctx.RetC(v)
}
func pgQuery(ctx *hbrt.HBContext) {
if ctx.PCount() < 2 || !ctx.IsNumeric(1) || !ctx.IsChar(2) {
ctx.RetNil()
return
}
p := loadPool(int64(ctx.ParNI(1)))
if p == nil {
ctx.RetNil()
return
}
sql := ctx.ParC(2)
args := extractArgs(ctx, 3)
qCtx := queryCtx()
rows, err := p.pool.Query(qCtx, sql, args...)
if err != nil {
setErr(p, err.Error())
ctx.RetNil()
return
}
defer rows.Close()
fields := rows.FieldDescriptions()
colNames := make([]hbrt.Value, len(fields))
for i, f := range fields {
colNames[i] = hbrt.MakeString(strings.ToLower(string(f.Name)))
}
result := hbrt.MakeArray(0)
resultArr := result.AsArray()
for rows.Next() {
vals, err := rows.Values()
if err != nil {
setErr(p, err.Error())
ctx.RetNil()
return
}
row := hbrt.MakeHash()
rh := row.AsHash()
for i, raw := range vals {
rh.Set(colNames[i], goToHbValue(raw))
}
resultArr.Items = append(resultArr.Items, row)
}
if err := rows.Err(); err != nil {
setErr(p, err.Error())
ctx.RetNil()
return
}
setErr(p, "")
ctx.RetVal(result)
}
func pgExec(ctx *hbrt.HBContext) {
if ctx.PCount() < 2 || !ctx.IsNumeric(1) || !ctx.IsChar(2) {
ctx.RetNI(-1)
return
}
p := loadPool(int64(ctx.ParNI(1)))
if p == nil {
ctx.RetNI(-1)
return
}
sql := ctx.ParC(2)
args := extractArgs(ctx, 3)
qCtx := queryCtx()
tag, err := p.pool.Exec(qCtx, sql, args...)
if err != nil {
setErr(p, err.Error())
ctx.RetNI(-1)
return
}
setErr(p, "")
ctx.RetNI(int(tag.RowsAffected()))
}
// extractArgs walks the PRG array passed at argIdx and converts each
// element into a Go scalar suitable for pgx parameter binding. Missing
// or non-array arg returns an empty slice (a no-param query).
func extractArgs(ctx *hbrt.HBContext, argIdx int) []interface{} {
if ctx.PCount() < argIdx {
return nil
}
v := ctx.Param(argIdx)
arr := v.AsArray()
if arr == nil {
return nil
}
out := make([]interface{}, len(arr.Items))
for i, item := range arr.Items {
out[i] = hbValueToGo(item)
}
return out
}
// hbValueToGo turns a PRG Value into the most natural Go type for pgx.
// Strings, integers, doubles, booleans, and NIL cover the common
// labdb query parameters; anything fancier (arrays, hashes, dates)
// falls through to its fmt.Stringer form which pgx will likely reject
// — callers should pre-serialise those into JSON or text.
func hbValueToGo(v hbrt.Value) interface{} {
switch {
case v.IsNil():
return nil
case v.IsString():
return v.AsString()
case v.IsLogical():
return v.AsBool()
case v.IsNumeric():
if v.AsNumDouble() == float64(v.AsNumInt()) {
return v.AsNumInt()
}
return v.AsNumDouble()
}
return fmt.Sprintf("%v", v)
}
// goToHbValue converts a pgx-decoded value into the PRG Value the row
// hash will store. Lossy for advanced PG types (numeric, json, uuid,
// arrays) which arrive here as their pgx-default Go form — callers
// can layer hb_jsonDecode / Val() on top if needed.
func goToHbValue(g interface{}) hbrt.Value {
if g == nil {
return hbrt.MakeNil()
}
switch v := g.(type) {
case string:
return hbrt.MakeString(v)
case []byte:
return hbrt.MakeString(string(v))
case bool:
return hbrt.MakeBool(v)
case int:
return hbrt.MakeInt(v)
case int32:
return hbrt.MakeInt(int(v))
case int64:
return hbrt.MakeInt(int(v))
case float32:
return hbrt.MakeDouble(float64(v), 0, 0)
case float64:
return hbrt.MakeDouble(v, 0, 0)
case time.Time:
return hbrt.MakeString(v.Format(time.RFC3339))
}
return hbrt.MakeString(fmt.Sprintf("%v", g))
}