pgx defaults to binary wire format for INT2/INT4/INT8/FLOAT4/FLOAT8/ BOOL/NUMERIC/DATE/TIMESTAMP/TIMESTAMPTZ — Go's most-used PG driver ships nearly every typed parameter as binary unless explicitly told to use text mode. The Phase 3 implementation only decoded INT4/INT8/ BOOL, so any pgx call with a decimal price, a timestamp, or a date was silently mis-quoted into the SQL stream. Decoders now cover the seven additional OIDs. The interesting one is NUMERIC: PG's wire format is base-10000 digit groups plus a separate displayed-scale, so the decoder rebuilds the decimal string from weight+sign+ndigits+digits[] without going through float (which would lose precision for NUMERIC(38,*) values). Pinned by vectors covering zero / positive / negative / fractional-only / NaN / multi-group integer + fraction cases. DATE / TIMESTAMP decoders assume integer_datetimes=on (which the server advertises in ParameterStatus); the 8-byte microsecond delta from the PG epoch (2000-01-01 UTC) is converted via Go's time.Time machinery and re-emitted as a quoted SQL literal. Text-format path also broadened: FLOAT4/FLOAT8/INT2 now transit unquoted alongside INT4/INT8/BOOL/NUMERIC; the regression would have been clients sending text-format floats getting them rewritten as '1.5' (string literal) instead of 1.5 (numeric). Verified: all 6 mandatory gates green (go test, SQL 43/43, compat 56/56, std.ch 17/17, FRB 7/7, pgserver 11/11). Five new decoder tests pin each wire format against handcrafted PG payloads. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
646 lines
20 KiB
Go
646 lines
20 KiB
Go
// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com)
|
||
// All rights reserved.
|
||
|
||
// extended.go — Extended Protocol (Parse/Bind/Execute/Describe/
|
||
// Sync/Close) support for the pgserver. Without this, pgx clients
|
||
// using the default QueryExecModeCacheStatement get an "0A000
|
||
// not supported" error on every query (Phase 3 saw this — the
|
||
// integration script had to force SimpleProtocol).
|
||
//
|
||
// The implementation is deliberately minimal for v1.0:
|
||
// * Per-session named statement cache: name → SQL + paramOIDs
|
||
// * Per-session named portal cache: name → statement + bound params
|
||
// * Parameter substitution happens client-side at Bind time; we
|
||
// build a "rewrite" SQL with literals inlined so five_SQL's
|
||
// existing template-cache pipeline (TFiveSQL.prg's
|
||
// SqlLexAndExtractTemplate path) can re-parameterise the
|
||
// query without us having to teach it the PG-wire param shape.
|
||
// * Describe-statement returns NoData; pgx tolerates this and
|
||
// issues Describe-portal after Bind, by which point we've run
|
||
// the query and can emit a real RowDescription.
|
||
// * Execute streams the cached result set produced at Describe-
|
||
// portal time so we don't run the same SQL twice.
|
||
//
|
||
// Phase 4.1 will replace the literal-substitution rewrite with a
|
||
// proper `?`-param threading through five_SQL's aParams so binary
|
||
// params (timestamps, numerics) avoid round-tripping through text.
|
||
|
||
package pgserver
|
||
|
||
import (
|
||
"encoding/binary"
|
||
"fmt"
|
||
"math"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/jackc/pgx/v5/pgproto3"
|
||
|
||
"five/hbrt"
|
||
)
|
||
|
||
// preparedStmt holds a named statement registered via Parse. It
|
||
// stores the original SQL plus the per-position param OIDs the
|
||
// client declared (or 0 = "any"). The actual parse happens lazily
|
||
// at Execute time inside five_SQL.
|
||
type preparedStmt struct {
|
||
sql string
|
||
paramOIDs []uint32
|
||
}
|
||
|
||
// portal binds a prepared statement to concrete parameter values.
|
||
// We materialise the values into a fully-substituted SQL string so
|
||
// the engine sees a normal Simple-Query-shaped statement; the
|
||
// result is cached on the portal so Describe-portal can emit a
|
||
// RowDescription without a second execution.
|
||
type portal struct {
|
||
stmt *preparedStmt
|
||
resolvedSQL string
|
||
|
||
// Cached result, populated on Describe-portal so Execute can
|
||
// stream without re-running the query. resultArr is the
|
||
// engine's `aResult` array; once executed, used to derive
|
||
// RowDescription + DataRow stream.
|
||
executed bool
|
||
resultArr *hbrt.HbArray
|
||
resultErr *errEnvelope
|
||
}
|
||
|
||
// errEnvelope is the unpacked form of FiveSql2's error sentinel.
|
||
type errEnvelope struct {
|
||
code int
|
||
msg string
|
||
sql string
|
||
}
|
||
|
||
// ensureExtendedState lazily allocates the per-session caches.
|
||
// Called by every extended-protocol handler; the maps stay alive
|
||
// for the whole session because pgx-style clients re-use unnamed
|
||
// statements aggressively.
|
||
func (s *session) ensureExtendedState() {
|
||
if s.stmts == nil {
|
||
s.stmts = make(map[string]*preparedStmt)
|
||
}
|
||
if s.portals == nil {
|
||
s.portals = make(map[string]*portal)
|
||
}
|
||
}
|
||
|
||
// handleParse stores a named prepared statement. Returns
|
||
// ParseComplete; any actual parse error from five_SQL is deferred
|
||
// until Execute so a probe-only Parse + Describe + Sync round-trip
|
||
// (which pgx does for QueryExecModeDescribeExec) stays cheap.
|
||
func (s *session) handleParse(m *pgproto3.Parse) {
|
||
s.ensureExtendedState()
|
||
s.stmts[m.Name] = &preparedStmt{
|
||
sql: m.Query,
|
||
paramOIDs: append([]uint32(nil), m.ParameterOIDs...),
|
||
}
|
||
s.send(&pgproto3.ParseComplete{})
|
||
}
|
||
|
||
// handleBind materialises parameter values into the SQL text and
|
||
// stashes the portal for later Describe/Execute. Binary-format
|
||
// params are decoded for the OIDs we recognise; otherwise the raw
|
||
// bytes are passed through as a quoted text literal (good enough
|
||
// for the common SELECT/INSERT-with-int/string cases).
|
||
func (s *session) handleBind(m *pgproto3.Bind) {
|
||
s.ensureExtendedState()
|
||
stmt := s.stmts[m.PreparedStatement]
|
||
if stmt == nil {
|
||
s.send(buildErrorResponse("26000",
|
||
fmt.Sprintf("prepared statement %q does not exist", m.PreparedStatement), ""))
|
||
return
|
||
}
|
||
resolved, err := substituteParams(stmt.sql, stmt.paramOIDs, m.Parameters, m.ParameterFormatCodes)
|
||
if err != nil {
|
||
s.send(buildErrorResponse("42P02", err.Error(), stmt.sql))
|
||
return
|
||
}
|
||
s.portals[m.DestinationPortal] = &portal{stmt: stmt, resolvedSQL: resolved}
|
||
s.send(&pgproto3.BindComplete{})
|
||
}
|
||
|
||
// handleDescribe answers Describe for either a statement or a
|
||
// portal. ObjectType 'S' = statement, 'P' = portal.
|
||
//
|
||
// Statement Describe is hard to answer fully without running the
|
||
// query (we'd need column-type inference). For v1.0 we return
|
||
// NoData + ParameterDescription with the declared OIDs; pgx
|
||
// tolerates this and re-Describes the portal after Bind.
|
||
//
|
||
// Portal Describe runs the query immediately, caches the result
|
||
// on the portal, and emits RowDescription from the actual result
|
||
// columns. Execute later just streams from the cache.
|
||
func (s *session) handleDescribe(m *pgproto3.Describe) {
|
||
s.ensureExtendedState()
|
||
switch m.ObjectType {
|
||
case 'S':
|
||
stmt := s.stmts[m.Name]
|
||
if stmt == nil {
|
||
s.send(buildErrorResponse("26000",
|
||
fmt.Sprintf("prepared statement %q does not exist", m.Name), ""))
|
||
return
|
||
}
|
||
// pgx-style clients send Parse without explicit OIDs and
|
||
// expect the server to infer them from the SQL. Count the
|
||
// `$N` placeholders and pad paramOIDs with OID 0 ("any")
|
||
// for any that the client didn't pre-declare — otherwise
|
||
// pgx errors with "expected 0 arguments, got N" before
|
||
// the Bind even reaches us.
|
||
nParams := countPgPlaceholders(stmt.sql)
|
||
oids := stmt.paramOIDs
|
||
for len(oids) < nParams {
|
||
oids = append(oids, 0)
|
||
}
|
||
stmt.paramOIDs = oids
|
||
s.send(&pgproto3.ParameterDescription{ParameterOIDs: oids})
|
||
// NoData — we don't know the row shape until execution.
|
||
// pgx tolerates this and asks again via Describe-portal.
|
||
s.send(&pgproto3.NoData{})
|
||
case 'P':
|
||
p := s.portals[m.Name]
|
||
if p == nil {
|
||
s.send(buildErrorResponse("34000",
|
||
fmt.Sprintf("portal %q does not exist", m.Name), ""))
|
||
return
|
||
}
|
||
s.executePortal(p)
|
||
if p.resultErr != nil {
|
||
s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql))
|
||
return
|
||
}
|
||
s.sendRowDescriptionFromArr(p.resultArr)
|
||
}
|
||
}
|
||
|
||
// handleExecute streams the cached result of the portal as a
|
||
// DataRow sequence then CommandComplete. If the portal hasn't
|
||
// been executed yet (Describe-portal was skipped), execute now.
|
||
//
|
||
// pgx's default QueryExecModeCacheStatement skips Describe-portal
|
||
// and only does Describe-statement once at prepare time. Since
|
||
// Describe-statement returns NoData (we don't know the row shape
|
||
// without executing), pgx never gets a RowDescription unless we
|
||
// emit one here too. We emit it conditionally: only when the
|
||
// portal wasn't already Described (described==false) so we don't
|
||
// duplicate the descriptor for clients that *did* call Describe-P.
|
||
func (s *session) handleExecute(m *pgproto3.Execute) {
|
||
s.ensureExtendedState()
|
||
p := s.portals[m.Portal]
|
||
if p == nil {
|
||
s.send(buildErrorResponse("34000",
|
||
fmt.Sprintf("portal %q does not exist", m.Portal), ""))
|
||
return
|
||
}
|
||
wasDescribed := p.executed
|
||
s.executePortal(p)
|
||
if p.resultErr != nil {
|
||
s.send(buildErrorResponse(sqlStateFor(p.resultErr.code), p.resultErr.msg, p.resultErr.sql))
|
||
s.txStatus = currentTxStatusAfterError(s.txStatus)
|
||
return
|
||
}
|
||
// If Describe-portal was never called, the client is still
|
||
// waiting for column metadata. Emit it now from the cached
|
||
// result so DataRow has a context the client can decode.
|
||
if !wasDescribed {
|
||
s.sendRowDescriptionFromArr(p.resultArr)
|
||
}
|
||
s.streamPortalRows(p, int(m.MaxRows))
|
||
tag := commandTagFor(p.resolvedSQL)
|
||
s.send(&pgproto3.CommandComplete{CommandTag: []byte(tag)})
|
||
s.updateTxStatusForTag(tag)
|
||
}
|
||
|
||
// handleClose drops a prepared statement or portal entry.
|
||
func (s *session) handleClose(m *pgproto3.Close) {
|
||
s.ensureExtendedState()
|
||
switch m.ObjectType {
|
||
case 'S':
|
||
delete(s.stmts, m.Name)
|
||
case 'P':
|
||
delete(s.portals, m.Name)
|
||
}
|
||
s.send(&pgproto3.CloseComplete{})
|
||
}
|
||
|
||
// handleSync flushes any pending output and emits ReadyForQuery,
|
||
// closing one cycle of the extended-protocol exchange.
|
||
func (s *session) handleSync() {
|
||
s.sendReadyForQuery()
|
||
}
|
||
|
||
// executePortal runs the resolved SQL through five_SQL once and
|
||
// caches the result on the portal. Idempotent — repeated calls
|
||
// short-circuit on `executed`. pg_catalog probes (BI-tool metadata
|
||
// queries) are intercepted before the engine call so DBeaver /
|
||
// Tableau / DataGrip can negotiate without erroring out.
|
||
func (s *session) executePortal(p *portal) {
|
||
if p.executed {
|
||
return
|
||
}
|
||
p.executed = true
|
||
if handled, val := s.catalogIntercept(p.resolvedSQL); handled {
|
||
p.resultArr = val.AsArray()
|
||
return
|
||
}
|
||
res, err := s.runSQL(p.resolvedSQL)
|
||
if err != nil {
|
||
p.resultErr = &errEnvelope{code: 0, msg: err.Error(), sql: p.resolvedSQL}
|
||
return
|
||
}
|
||
if res.IsNil() || !res.IsArray() {
|
||
// Non-result statement (DDL, transaction control). Synthesise
|
||
// an empty result envelope so the streaming path emits an
|
||
// empty RowDescription + CommandComplete cleanly.
|
||
p.resultArr = &hbrt.HbArray{Items: []hbrt.Value{
|
||
hbrt.MakeArrayFrom([]hbrt.Value{}),
|
||
hbrt.MakeArrayFrom([]hbrt.Value{}),
|
||
}}
|
||
return
|
||
}
|
||
arr := res.AsArray()
|
||
if isErrorEnvelope(arr) {
|
||
code, msg, sql := unpackError(arr)
|
||
p.resultErr = &errEnvelope{code: code, msg: msg, sql: sql}
|
||
return
|
||
}
|
||
p.resultArr = arr
|
||
}
|
||
|
||
// sendRowDescriptionFromArr emits a RowDescription derived from
|
||
// the engine's `{aFieldNames, aRows}` envelope. Mirrors what
|
||
// emitResultSet (Simple Query) does on the field-description side
|
||
// so the wire shape is identical regardless of which protocol the
|
||
// client picks.
|
||
func (s *session) sendRowDescriptionFromArr(arr *hbrt.HbArray) {
|
||
if arr == nil || len(arr.Items) < 1 || !arr.Items[0].IsArray() {
|
||
s.send(&pgproto3.RowDescription{Fields: nil})
|
||
return
|
||
}
|
||
fields := arr.Items[0].AsArray().Items
|
||
var firstRow []hbrt.Value
|
||
if len(arr.Items) >= 2 && arr.Items[1].IsArray() {
|
||
if rows := arr.Items[1].AsArray().Items; len(rows) > 0 && rows[0].IsArray() {
|
||
firstRow = rows[0].AsArray().Items
|
||
}
|
||
}
|
||
descFields := make([]pgproto3.FieldDescription, len(fields))
|
||
for i, f := range fields {
|
||
name := ""
|
||
if f.IsString() {
|
||
name = f.AsString()
|
||
} else {
|
||
name = fmt.Sprintf("column%d", i+1)
|
||
}
|
||
var sample hbrt.Value
|
||
if i < len(firstRow) {
|
||
sample = firstRow[i]
|
||
}
|
||
oid, typeSize := pgTypeFor(sample)
|
||
descFields[i] = pgproto3.FieldDescription{
|
||
Name: []byte(name),
|
||
DataTypeOID: uint32(oid),
|
||
DataTypeSize: typeSize,
|
||
TypeModifier: -1,
|
||
Format: 0,
|
||
}
|
||
}
|
||
s.send(&pgproto3.RowDescription{Fields: descFields})
|
||
}
|
||
|
||
// streamPortalRows iterates the portal's cached rows and emits a
|
||
// DataRow per row, respecting maxRows (0 = unlimited).
|
||
func (s *session) streamPortalRows(p *portal, maxRows int) {
|
||
if p.resultArr == nil || len(p.resultArr.Items) < 2 {
|
||
return
|
||
}
|
||
rowsVal := p.resultArr.Items[1]
|
||
if !rowsVal.IsArray() {
|
||
return
|
||
}
|
||
rows := rowsVal.AsArray().Items
|
||
fieldCount := 0
|
||
if p.resultArr.Items[0].IsArray() {
|
||
fieldCount = len(p.resultArr.Items[0].AsArray().Items)
|
||
}
|
||
emitted := 0
|
||
for _, rowVal := range rows {
|
||
if maxRows > 0 && emitted >= maxRows {
|
||
break
|
||
}
|
||
if !rowVal.IsArray() {
|
||
continue
|
||
}
|
||
cells := rowVal.AsArray().Items
|
||
out := make([][]byte, fieldCount)
|
||
for i := 0; i < fieldCount; i++ {
|
||
if i < len(cells) {
|
||
out[i] = encodeText(cells[i])
|
||
}
|
||
}
|
||
s.send(&pgproto3.DataRow{Values: out})
|
||
emitted++
|
||
}
|
||
}
|
||
|
||
// substituteParams produces a Simple-Query-shaped SQL by inlining
|
||
// the bound parameter values as PG-style literals. Format codes:
|
||
// 0 = text, 1 = binary. We support text for all OIDs and binary
|
||
// for INT2/INT4/INT8/FLOAT4/FLOAT8/BOOL — the rest fall back to
|
||
// hex-escaped strings, which works for VARCHAR but loses fidelity
|
||
// for binary timestamps until Phase 4.1.
|
||
//
|
||
// pgx defaults to binary for ints + text for everything else, so
|
||
// the common case is well covered.
|
||
func substituteParams(sql string, oids []uint32, params [][]byte, formats []int16) (string, error) {
|
||
// Each `$1`/`$2`/… placeholder maps to params[i-1]. We do a
|
||
// linear scan rather than regex to avoid quoting pitfalls
|
||
// (strings containing literal `$1` are rare in practice but
|
||
// the scanner respects single-quoted runs).
|
||
var out strings.Builder
|
||
i := 0
|
||
inStr := byte(0)
|
||
for i < len(sql) {
|
||
c := sql[i]
|
||
if inStr != 0 {
|
||
out.WriteByte(c)
|
||
if c == inStr {
|
||
inStr = 0
|
||
}
|
||
i++
|
||
continue
|
||
}
|
||
if c == '\'' || c == '"' {
|
||
inStr = c
|
||
out.WriteByte(c)
|
||
i++
|
||
continue
|
||
}
|
||
if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' {
|
||
j := i + 1
|
||
for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' {
|
||
j++
|
||
}
|
||
idx, err := strconv.Atoi(sql[i+1 : j])
|
||
if err != nil || idx < 1 || idx > len(params) {
|
||
out.WriteByte(c)
|
||
i++
|
||
continue
|
||
}
|
||
pidx := idx - 1
|
||
oid := uint32(0)
|
||
if pidx < len(oids) {
|
||
oid = oids[pidx]
|
||
}
|
||
format := int16(0)
|
||
if pidx < len(formats) {
|
||
format = formats[pidx]
|
||
} else if len(formats) == 1 {
|
||
format = formats[0] // single format code applies to all
|
||
}
|
||
lit, err := paramToLiteral(params[pidx], oid, format)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
out.WriteString(lit)
|
||
i = j
|
||
continue
|
||
}
|
||
out.WriteByte(c)
|
||
i++
|
||
}
|
||
return out.String(), nil
|
||
}
|
||
|
||
// countPgPlaceholders scans SQL for the highest $N placeholder
|
||
// outside string literals and returns that count. Matches what
|
||
// substituteParams resolves at Bind time. Returns 0 for SQL with
|
||
// no placeholders. Order of placeholders doesn't matter for the
|
||
// count — repeated `$1` still counts as 1 param slot.
|
||
func countPgPlaceholders(sql string) int {
|
||
max := 0
|
||
inStr := byte(0)
|
||
for i := 0; i < len(sql); i++ {
|
||
c := sql[i]
|
||
if inStr != 0 {
|
||
if c == inStr {
|
||
inStr = 0
|
||
}
|
||
continue
|
||
}
|
||
if c == '\'' || c == '"' {
|
||
inStr = c
|
||
continue
|
||
}
|
||
if c == '$' && i+1 < len(sql) && sql[i+1] >= '0' && sql[i+1] <= '9' {
|
||
j := i + 1
|
||
for j < len(sql) && sql[j] >= '0' && sql[j] <= '9' {
|
||
j++
|
||
}
|
||
if n, err := strconv.Atoi(sql[i+1 : j]); err == nil && n > max {
|
||
max = n
|
||
}
|
||
i = j - 1
|
||
}
|
||
}
|
||
return max
|
||
}
|
||
|
||
func paramToLiteral(raw []byte, oid uint32, format int16) (string, error) {
|
||
if raw == nil {
|
||
return "NULL", nil
|
||
}
|
||
if format == 0 {
|
||
// Text format — quote per type. Numerics + bools transit
|
||
// unquoted; everything else (including DATE / TIMESTAMP in
|
||
// text form) gets single-quoted with embedded-quote escape.
|
||
switch oid {
|
||
case oidInt2, oidInt4, oidInt8, oidBool, oidNumeric, oidFloat4, oidFloat8:
|
||
return string(raw), nil
|
||
default:
|
||
return "'" + strings.ReplaceAll(string(raw), "'", "''") + "'", nil
|
||
}
|
||
}
|
||
// Binary format. pgx defaults to binary for INT*, FLOAT*, BOOL,
|
||
// NUMERIC, DATE, TIMESTAMP, TIMESTAMPTZ — decode each into a
|
||
// FiveSql2-shaped literal that the engine's lexer can re-parse.
|
||
switch oid {
|
||
case oidInt2:
|
||
if len(raw) != 2 {
|
||
return "", fmt.Errorf("int2 param: want 2 bytes, got %d", len(raw))
|
||
}
|
||
return strconv.FormatInt(int64(int16(binary.BigEndian.Uint16(raw))), 10), nil
|
||
case oidInt4:
|
||
if len(raw) != 4 {
|
||
return "", fmt.Errorf("int4 param: want 4 bytes, got %d", len(raw))
|
||
}
|
||
return strconv.FormatInt(int64(int32(binary.BigEndian.Uint32(raw))), 10), nil
|
||
case oidInt8:
|
||
if len(raw) != 8 {
|
||
return "", fmt.Errorf("int8 param: want 8 bytes, got %d", len(raw))
|
||
}
|
||
return strconv.FormatInt(int64(binary.BigEndian.Uint64(raw)), 10), nil
|
||
case oidFloat4:
|
||
if len(raw) != 4 {
|
||
return "", fmt.Errorf("float4 param: want 4 bytes, got %d", len(raw))
|
||
}
|
||
f := math.Float32frombits(binary.BigEndian.Uint32(raw))
|
||
return strconv.FormatFloat(float64(f), 'g', -1, 32), nil
|
||
case oidFloat8:
|
||
if len(raw) != 8 {
|
||
return "", fmt.Errorf("float8 param: want 8 bytes, got %d", len(raw))
|
||
}
|
||
f := math.Float64frombits(binary.BigEndian.Uint64(raw))
|
||
return strconv.FormatFloat(f, 'g', -1, 64), nil
|
||
case oidBool:
|
||
if len(raw) != 1 {
|
||
return "", fmt.Errorf("bool param: want 1 byte, got %d", len(raw))
|
||
}
|
||
if raw[0] == 0 {
|
||
return "FALSE", nil
|
||
}
|
||
return "TRUE", nil
|
||
case oidNumeric:
|
||
s, err := decodeBinaryNumeric(raw)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return s, nil
|
||
case oidDate:
|
||
s, err := decodeBinaryDate(raw)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return "'" + s + "'", nil
|
||
case oidTimestamp, oidTimestamptz:
|
||
s, err := decodeBinaryTimestamp(raw)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
return "'" + s + "'", nil
|
||
default:
|
||
// Unknown binary OID — fall back to a quoted hex literal.
|
||
// FiveSql2 won't accept this directly, but the resulting
|
||
// error is at least diagnosable rather than a silent miss.
|
||
return "'\\x" + fmt.Sprintf("%x", raw) + "'", nil
|
||
}
|
||
}
|
||
|
||
// decodeBinaryNumeric converts PostgreSQL's binary NUMERIC wire
|
||
// format (RFC-independent — see PG source utils/adt/numeric.c
|
||
// numeric_send / numeric_recv) to a plain decimal string. The
|
||
// format is:
|
||
//
|
||
// int16 ndigits number of base-10000 "digits"
|
||
// int16 weight weight of the first digit, in base-10000 units
|
||
// uint16 sign 0x0000 positive, 0x4000 negative, 0xC000 NaN
|
||
// uint16 dscale displayed scale (decimal places to show)
|
||
// int16 digits[ndigits] each in 0..9999
|
||
//
|
||
// The numeric value equals sign × Σ d[i] × 10000^(weight − i).
|
||
//
|
||
// Output is a FiveSql2-parseable decimal literal — unquoted, no
|
||
// scientific notation, with exactly `dscale` digits after the
|
||
// decimal point so round-trip width is preserved.
|
||
func decodeBinaryNumeric(raw []byte) (string, error) {
|
||
if len(raw) < 8 {
|
||
return "", fmt.Errorf("numeric param: header too short (%d bytes)", len(raw))
|
||
}
|
||
ndigits := int16(binary.BigEndian.Uint16(raw[0:2]))
|
||
weight := int16(binary.BigEndian.Uint16(raw[2:4]))
|
||
sign := binary.BigEndian.Uint16(raw[4:6])
|
||
dscale := int16(binary.BigEndian.Uint16(raw[6:8]))
|
||
if int(ndigits)*2+8 != len(raw) {
|
||
return "", fmt.Errorf("numeric param: digit count mismatch (ndigits=%d, body=%d)", ndigits, len(raw)-8)
|
||
}
|
||
if sign == 0xC000 {
|
||
return "NaN", nil
|
||
}
|
||
digs := make([]uint16, ndigits)
|
||
for i := 0; i < int(ndigits); i++ {
|
||
digs[i] = binary.BigEndian.Uint16(raw[8+i*2 : 10+i*2])
|
||
}
|
||
|
||
var sb strings.Builder
|
||
if sign == 0x4000 {
|
||
sb.WriteByte('-')
|
||
}
|
||
|
||
// Integer part: weight+1 base-10000 digits. If weight is
|
||
// negative the integer part is just "0".
|
||
intDigits := int(weight) + 1
|
||
if intDigits <= 0 {
|
||
sb.WriteByte('0')
|
||
} else {
|
||
for i := 0; i < intDigits; i++ {
|
||
var d uint16
|
||
if i < int(ndigits) {
|
||
d = digs[i]
|
||
}
|
||
if i == 0 {
|
||
fmt.Fprintf(&sb, "%d", d)
|
||
} else {
|
||
fmt.Fprintf(&sb, "%04d", d)
|
||
}
|
||
}
|
||
}
|
||
|
||
if dscale > 0 {
|
||
sb.WriteByte('.')
|
||
// Build the fractional digit string. When weight < -1, the
|
||
// first array digit is already several base-10000 positions
|
||
// past the decimal point — pad with "0000" groups for those
|
||
// missing leading-zero positions.
|
||
var frac strings.Builder
|
||
leadingZeroGroups := 0
|
||
if intDigits < 0 {
|
||
leadingZeroGroups = -intDigits
|
||
}
|
||
for i := 0; i < leadingZeroGroups; i++ {
|
||
frac.WriteString("0000")
|
||
}
|
||
fracStart := intDigits
|
||
if fracStart < 0 {
|
||
fracStart = 0
|
||
}
|
||
for i := fracStart; i < int(ndigits); i++ {
|
||
fmt.Fprintf(&frac, "%04d", digs[i])
|
||
}
|
||
s := frac.String()
|
||
if len(s) >= int(dscale) {
|
||
sb.WriteString(s[:dscale])
|
||
} else {
|
||
sb.WriteString(s)
|
||
sb.WriteString(strings.Repeat("0", int(dscale)-len(s)))
|
||
}
|
||
}
|
||
return sb.String(), nil
|
||
}
|
||
|
||
// decodeBinaryDate converts a PG binary DATE (4-byte signed days
|
||
// since pgEpoch = 2000-01-01) to "YYYY-MM-DD".
|
||
func decodeBinaryDate(raw []byte) (string, error) {
|
||
if len(raw) != 4 {
|
||
return "", fmt.Errorf("date param: want 4 bytes, got %d", len(raw))
|
||
}
|
||
days := int32(binary.BigEndian.Uint32(raw))
|
||
t := pgEpoch.AddDate(0, 0, int(days))
|
||
return t.Format("2006-01-02"), nil
|
||
}
|
||
|
||
// decodeBinaryTimestamp converts a PG binary TIMESTAMP / TIMESTAMPTZ
|
||
// (8-byte signed microseconds since pgEpoch = 2000-01-01) to
|
||
// "YYYY-MM-DD HH:MM:SS.ffffff". Encoding assumes integer_datetimes
|
||
// = on; we advertise that in ParameterStatus on connect so clients
|
||
// won't send the floating-point variant.
|
||
func decodeBinaryTimestamp(raw []byte) (string, error) {
|
||
if len(raw) != 8 {
|
||
return "", fmt.Errorf("timestamp param: want 8 bytes, got %d", len(raw))
|
||
}
|
||
us := int64(binary.BigEndian.Uint64(raw))
|
||
t := pgEpoch.Add(time.Duration(us) * time.Microsecond)
|
||
return t.Format("2006-01-02 15:04:05.000000"), nil
|
||
}
|