// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com) // All rights reserved. // Package pgserver implements a PostgreSQL-wire-protocol-compatible // TCP/IP frontend for FiveSql2. The protocol layer is provided by // github.com/jackc/pgx/v5/pgproto3 (the same library pgx itself uses // internally); this package wires it into Five's runtime so that // psql / pgx / JDBC / DBeaver / Tableau and any other PostgreSQL // driver can connect to a Five process as if it were Postgres. // // Lifecycle // // five pgserver --listen :5432 starts Listen() in the main goroutine. // accept() spawns one goroutine per connection. Each connection // owns a *hbrt.Thread + a TSqlSession on the PRG side, so concurrent // clients don't share transaction state or plan caches (see // refactor commit 93cf5c8). // // Scope // // v1.0 ships Simple Query (SELECT/INSERT/UPDATE/DELETE), per-session // BEGIN/COMMIT/ROLLBACK, trust + password/MD5/SCRAM auth, and TLS. // Extended protocol (Parse/Bind/Execute) and pg_catalog shim are // v1.1+. See /Users/charleskwon/.claude/plans/compiled-launching-shore.md. package pgserver import ( "context" "crypto/tls" "fmt" "net" "sync" "five/hbrt" ) // Config holds runtime knobs for the server. All fields optional; // zero values yield a trust-auth, no-TLS server bound to :5432. type Config struct { // Listen is the bind address (":5432" by default). Listen string // MaxConnections caps concurrent accepted sessions. Excess // connections block in accept() until a slot frees. 0 → 100. MaxConnections int // AuthMode: "trust" (default), "password", "md5", "scram-sha-256". // Roles + password hashes are loaded from __five_roles.dbf when // not in trust mode; see auth.go. AuthMode string // TLSConfig, if non-nil, is presented when a client sends // SSLRequest. tls.go provides helpers for loading a cert/key // pair or auto-generating a self-signed pair for dev. TLSConfig *tls.Config // ParameterStatus values broadcast right after auth. Five // announces itself as PostgreSQL 14 by default so probing // clients (pgx, JDBC) negotiate without erroring out on // unsupported features. ServerVersion string // default "14.0 (FiveSql2)" } func (c *Config) listenAddr() string { if c.Listen == "" { return ":5432" } return c.Listen } func (c *Config) maxConns() int { if c.MaxConnections <= 0 { return 100 } return c.MaxConnections } func (c *Config) serverVersion() string { if c.ServerVersion == "" { return "14.0 (FiveSql2)" } return c.ServerVersion } // Server is the live server state. Create via NewServer and drive // with Serve(); Close() drains in-flight connections gracefully. type Server struct { vm *hbrt.VM cfg Config listener net.Listener sem chan struct{} // accept gate, sized to MaxConnections // allowList snapshots tls.go's pending CIDR list at start // time. Empty → accept any source IP. The accept loop // closes any connection whose RemoteAddr falls outside. allowList []*net.IPNet mu sync.Mutex conns map[*session]struct{} // live sessions for clean shutdown closed bool closeCh chan struct{} } // NewServer constructs an unstarted Server. The hbrt.VM is the // runtime instance whose `FIVE_SQL` function will execute incoming // queries — every accepted connection spawns a fresh thread on this // VM via vm.NewThread() so statics + workarea factory are inherited // (see hbrt/vm.go NewThread, fixed in cde8673 to propagate both). func NewServer(vm *hbrt.VM, cfg Config) *Server { return &Server{ vm: vm, cfg: cfg, sem: make(chan struct{}, cfg.maxConns()), conns: make(map[*session]struct{}), closeCh: make(chan struct{}), } } // Serve binds the listener and runs the accept loop until Close() // fires or a fatal accept error is hit. Each accepted connection // runs handleConn in its own goroutine. func (s *Server) Serve(ctx context.Context) error { if s.vm == nil { return fmt.Errorf("pgserver: nil VM") } ln, err := net.Listen("tcp", s.cfg.listenAddr()) if err != nil { return fmt.Errorf("pgserver: listen %s: %w", s.cfg.listenAddr(), err) } s.mu.Lock() s.listener = ln s.mu.Unlock() // Stop the listener when ctx cancels so Accept() returns. go func() { select { case <-ctx.Done(): case <-s.closeCh: } _ = ln.Close() }() for { conn, err := ln.Accept() if err != nil { s.mu.Lock() closed := s.closed s.mu.Unlock() if closed { return nil } return fmt.Errorf("pgserver: accept: %w", err) } // Source-IP allowlist: drop connections from any peer not // matching a configured CIDR before allocating a slot. // Empty list → unconditional accept (the default). if !peerAllowed(conn.RemoteAddr(), s.allowList) { _ = conn.Close() continue } // Backpressure: block here when MaxConnections sessions are // already in flight. Holding the slot through handleConn // (released in defer) is the natural way to gate. s.sem <- struct{}{} go func(c net.Conn) { defer func() { <-s.sem }() s.handleConn(ctx, c) }(conn) } } // Close signals the accept loop to exit, drops the listener, and // closes every in-flight connection. Outstanding handleConn // goroutines exit on the first failed Receive. func (s *Server) Close() error { s.mu.Lock() if s.closed { s.mu.Unlock() return nil } s.closed = true ln := s.listener conns := make([]*session, 0, len(s.conns)) for c := range s.conns { conns = append(conns, c) } s.mu.Unlock() close(s.closeCh) if ln != nil { _ = ln.Close() } for _, c := range conns { _ = c.conn.Close() } return nil } // handleConn is the entry point for each accepted connection. // The full implementation (TLS upgrade, startup handshake, auth, // query loop) lives in session.go; this function just allocates // the session and tracks it for clean shutdown. func (s *Server) handleConn(ctx context.Context, conn net.Conn) { sess := newSession(s, conn) s.mu.Lock() s.conns[sess] = struct{}{} s.mu.Unlock() defer func() { s.mu.Lock() delete(s.conns, sess) s.mu.Unlock() _ = conn.Close() }() sess.run(ctx) }