// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com) // All rights reserved. package pgserver import ( "context" "crypto/rand" "encoding/binary" "fmt" "io" "net" "strings" "github.com/jackc/pgx/v5/pgproto3" "five/hbrt" ) // session is the per-connection state. One goroutine per session; // nothing in session is touched by anyone else, so all fields are // access-by-owner without locking. type session struct { srv *Server conn net.Conn // pgproto3 Backend speaks the PG protocol on the wire. Created // after the TLS-upgrade decision so handshake bytes go through // the right transport. be *pgproto3.Backend // Authenticated identity. Empty before AuthenticationOk. user string database string // Per-connection hbrt.Thread, used to dispatch FIVE_SQL calls // inside the PRG world. Owns a TSqlSession on the PRG side // (created lazily by the first query via five_SQL's default- // session fallback; for true isolation we instantiate one // explicitly — see queryLoop). thread *hbrt.Thread // PRG-side session value: a TSqlSession instance held in a // thread-local hbrt slot. Allocated on first query. prgSession hbrt.Value // Cancellation key — PG protocol mandates random 32-bit // process id + secret in BackendKeyData so clients can send // CancelRequest later. We don't honour cancel yet (v1.1), // but the BackendKeyData must still be present and unique. pid uint32 secret []byte // 4 random bytes per PG BackendKeyData // Current transaction status, emitted in every ReadyForQuery: // 'I' = idle, 'T' = in transaction, 'E' = failed transaction. txStatus byte } func newSession(srv *Server, conn net.Conn) *session { var pid uint32 _ = binary.Read(rand.Reader, binary.BigEndian, &pid) secret := make([]byte, 4) _, _ = rand.Read(secret) return &session{ srv: srv, conn: conn, pid: pid, secret: secret, txStatus: 'I', } } // run drives the full session lifecycle: // // 1. SSLRequest / GSSEncRequest probe (reply 'N' for v1.0) // 2. StartupMessage parse // 3. Auth (trust path for v1.0; password/MD5/SCRAM in Phase 5) // 4. ParameterStatus broadcast // 5. BackendKeyData // 6. ReadyForQuery // 7. query loop until Terminate / EOF / error // // Any error in steps 1-3 aborts the session before query loop // begins; an ErrorResponse goes back if at all possible. func (s *session) run(ctx context.Context) { // Step 1 — SSLRequest peek. Client sends 8 bytes: length(8) + // special version 80877103 to ask for TLS; or it skips this and // goes straight to StartupMessage. The pgproto3 Backend doesn't // abstract this, so we peek the raw bytes first. if err := s.negotiateTLS(); err != nil { return } s.be = pgproto3.NewBackend(s.conn, s.conn) // Step 2 — StartupMessage. After SSLRequest the client retries // with a fresh length-prefixed startup. ReceiveStartupMessage // handles either form transparently. startupMsg, err := s.be.ReceiveStartupMessage() if err != nil { return // client gone } startup, ok := startupMsg.(*pgproto3.StartupMessage) if !ok { s.sendError("08P01", fmt.Sprintf("unexpected startup message: %T", startupMsg)) return } s.user = startup.Parameters["user"] s.database = startup.Parameters["database"] if s.database == "" { s.database = s.user } // Step 3 — Auth. v1.0 lands `trust`; password/md5/scram in // Phase 5 via auth.go. if err := s.authenticate(); err != nil { return } // Step 4 — ParameterStatus. These tell the client our identity // and default formatting so it doesn't try features we don't // have. server_version is the most-checked field; pgx + JDBC // negotiate features based on its numeric prefix. s.sendParameterStatus("server_version", s.srv.cfg.serverVersion()) s.sendParameterStatus("server_encoding", "UTF8") s.sendParameterStatus("client_encoding", "UTF8") s.sendParameterStatus("DateStyle", "ISO, MDY") s.sendParameterStatus("TimeZone", "UTC") s.sendParameterStatus("integer_datetimes", "on") s.sendParameterStatus("standard_conforming_strings", "on") // Step 5 — BackendKeyData. Some clients (psql) expect this // even though we don't honour CancelRequest yet. s.send(&pgproto3.BackendKeyData{ProcessID: s.pid, SecretKey: s.secret}) // Step 6 — ReadyForQuery, transaction status idle. s.sendReadyForQuery() // Step 7 — query loop. Each iteration: // - Receive a frontend message // - Dispatch (Query / Parse / Bind / Execute / Sync / Terminate / Close / Describe) // - Send response stream // - Loop s.queryLoop(ctx) } // negotiateTLS reads the first 8-byte preamble. If it's an // SSLRequest (length=8, version=80877103) and the server has a // TLS config, we reply 'S' and upgrade; otherwise 'N' and the // client retries unencrypted (or hangs up if it required TLS). func (s *session) negotiateTLS() error { var hdr [8]byte if _, err := io.ReadFull(s.conn, hdr[:]); err != nil { return err } length := binary.BigEndian.Uint32(hdr[0:4]) version := binary.BigEndian.Uint32(hdr[4:8]) const sslReqCode = 80877103 if length == 8 && version == sslReqCode { if s.srv.cfg.TLSConfig != nil { if _, err := s.conn.Write([]byte{'S'}); err != nil { return err } // tls.go implements the upgrade; v1.0 stub returns // the connection unchanged so the byte ordering is // correct but cipher negotiation isn't wired yet. upgraded, err := upgradeToTLS(s.conn, s.srv.cfg.TLSConfig) if err != nil { return err } s.conn = upgraded return nil } if _, err := s.conn.Write([]byte{'N'}); err != nil { return err } return nil } // Not an SSLRequest — the 8 bytes are the start of a // StartupMessage. Buffer them so pgproto3.ReceiveStartupMessage // sees the full payload. s.conn = &prefixedConn{Conn: s.conn, prefix: hdr[:]} return nil } // send writes a single backend message to the wire. func (s *session) send(msg pgproto3.BackendMessage) { s.be.Send(msg) _ = s.be.Flush() } func (s *session) sendParameterStatus(name, value string) { s.send(&pgproto3.ParameterStatus{Name: name, Value: value}) } func (s *session) sendReadyForQuery() { s.send(&pgproto3.ReadyForQuery{TxStatus: s.txStatus}) } // sendError ships an ErrorResponse without forcing the caller to // build the full pgproto3 struct. Used for protocol-level errors // before the query loop starts; in-loop errors go through errmap.go. func (s *session) sendError(sqlState, message string) { s.send(&pgproto3.ErrorResponse{ Severity: "ERROR", Code: sqlState, Message: message, }) } // queryLoop runs until the client sends Terminate, closes the // connection, or we hit an unrecoverable error. func (s *session) queryLoop(ctx context.Context) { for { select { case <-ctx.Done(): return default: } msg, err := s.be.Receive() if err != nil { return // client gone or wire-level error } switch m := msg.(type) { case *pgproto3.Terminate: return case *pgproto3.Query: s.dispatchSimpleQuery(strings.TrimSpace(m.String)) default: // v1.0 ignores Extended-protocol messages with a // loud diagnostic so clients see they're unsupported // instead of hanging on a silent stall. s.sendError("0A000", fmt.Sprintf("message %T not supported in this protocol version (Simple Query only)", m)) s.sendReadyForQuery() } } } // prefixedConn injects pre-read bytes back into a net.Conn so a // caller that needs those bytes for parsing (pgproto3 reading the // StartupMessage after our SSLRequest peek) sees them seamlessly. type prefixedConn struct { net.Conn prefix []byte off int } func (p *prefixedConn) Read(buf []byte) (int, error) { if p.off < len(p.prefix) { n := copy(buf, p.prefix[p.off:]) p.off += n return n, nil } return p.Conn.Read(buf) }