diff --git a/hbrdd/dbf/area_registry.go b/hbrdd/dbf/area_registry.go new file mode 100644 index 0000000..0bb03c2 --- /dev/null +++ b/hbrdd/dbf/area_registry.go @@ -0,0 +1,69 @@ +// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com) +// All rights reserved. + +// area_registry.go — per-file mmap generation tracker. +// +// Each unique on-disk DBF path gets an atomic uint64 generation +// counter. Every *DBFArea instance on that path remembers a snapshot +// of the counter at open time + after each of its own Appends. A +// peer instance's Append (in another goroutine, same process) bumps +// the shared counter; the local reader's loadRecord then sees its +// snapshot < shared and knows its mmap may show stale bytes — it +// bypasses the zero-copy mmap fast path for that load and ReadAts +// the bytes off disk instead. +// +// Without this, two pgserver connections opening the same DBF SHARED +// each got their own mmap snapshot at Open time. A peer Append grew +// the file past our window (loadRecord's existing length-bound check +// caught that case and fell back to ReadAt) — but a peer PutValue +// mutating a record we'd mmap-cached returned the stale pre-mutate +// bytes from our snapshot. That manifested as "own marker missing" +// in concurrent insert-then-select stress runs. + +package dbf + +import ( + "sync" + "sync/atomic" +) + +var ( + pathGenMu sync.RWMutex + pathGens = map[string]*uint64{} +) + +// pathGenFor returns the (shared) atomic counter pointer for path. +// First call per path allocates the counter; subsequent calls return +// the same pointer so peers see each other's bumps. Pointers are +// never invalidated — even after all areas close, the entry stays +// (cheap; one uint64 per file ever opened in this process). +func pathGenFor(path string) *uint64 { + pathGenMu.RLock() + gen, ok := pathGens[path] + pathGenMu.RUnlock() + if ok { + return gen + } + pathGenMu.Lock() + defer pathGenMu.Unlock() + if gen, ok := pathGens[path]; ok { + return gen + } + var g uint64 + pathGens[path] = &g + return &g +} + +// bumpPathGen advances the counter for path. Called by Append / +// PutValue / Pack / Zap after mutating disk state so peers refresh. +func bumpPathGen(path string) { + atomic.AddUint64(pathGenFor(path), 1) +} + +// loadPathGen reads the current counter without contention. +func loadPathGen(p *uint64) uint64 { + if p == nil { + return 0 + } + return atomic.LoadUint64(p) +} diff --git a/hbrdd/dbf/dbf.go b/hbrdd/dbf/dbf.go index 1034188..ef75b82 100644 --- a/hbrdd/dbf/dbf.go +++ b/hbrdd/dbf/dbf.go @@ -32,6 +32,13 @@ type DBFArea struct { shared bool readOnly bool + // Per-path mmap generation tracker. Bumped by self/peer Append + + // PutValue + Pack + Zap. loadRecord compares pathGenSeen against + // the live counter and bypasses the mmap fast path if a peer + // modified the file since our snapshot. See area_registry.go. + pathGen *uint64 + pathGenSeen uint64 + // Header header Header fieldDescs []FieldDesc @@ -181,7 +188,9 @@ func openDBF(drv *DBFDriver, params hbrdd.OpenParams) (*DBFArea, error) { filePath: path, shared: params.Shared, readOnly: params.ReadOnly, + pathGen: pathGenFor(path), } + area.pathGenSeen = loadPathGen(area.pathGen) area.BaseArea = hbrdd.BaseArea{} // Step 1: Read header (32 bytes) @@ -350,7 +359,9 @@ func createDBF(drv *DBFDriver, params hbrdd.CreateParams) (*DBFArea, error) { ownBuf: make([]byte, recordLen), recOwned: true, recCount: 0, + pathGen: pathGenFor(path), } + area.pathGenSeen = loadPathGen(area.pathGen) area.recBuf = area.ownBuf fieldInfos := make([]hbrdd.FieldInfo, len(params.Fields)) @@ -522,10 +533,27 @@ func (a *DBFArea) GoTo(recNo uint32) error { return nil } - // Read record — COW: mmap slice reference (zero-copy), fallback to file read + // Read record — COW: mmap slice reference (zero-copy), fallback + // to file read. Three reasons the mmap fast path can't be trusted + // and we have to ReadAt: + // 1. The record offset is past our mmap snapshot's length + // (file grew since we mmap-ed — covered by the length check). + // 2. SHARED mode AND the per-path mmap-gen counter has advanced + // since our last sync — a peer DBFArea modified the file + // bytes (Append/PutValue/Pack). Our snapshot still holds the + // pre-mutation bytes; ReadAt pulls the current on-disk + // contents. See area_registry.go. + mmapStale := false + if a.shared && a.pathGen != nil { + if cur := loadPathGen(a.pathGen); cur > a.pathGenSeen { + mmapStale = true + a.pathGenSeen = cur + } + } offset := a.header.RecordOffset(recNo) recLen := int(a.header.RecordLen) - if a.mmapData != nil && int(offset)+recLen <= len(a.mmapData) { + useMmap := !mmapStale && a.mmapData != nil && int(offset)+recLen <= len(a.mmapData) + if useMmap { // Zero-copy: recBuf points into mmap (read-only until PutValue) a.recBuf = a.mmapData[offset : offset+int64(recLen)] a.recOwned = false @@ -822,6 +850,11 @@ func (a *DBFArea) Append() error { // the cost is one Seek+stat per peer's next RecCount call, // which is exactly what cross-connection freshness needs. InvalidateRecCountCache() + // Also bump the per-path mmap-gen so peer DBFAreas on this + // file know their mmap snapshot may be stale and force a + // ReadAt on next load. See area_registry.go. + bumpPathGen(a.filePath) + a.pathGenSeen = loadPathGen(a.pathGen) } // Promote to owned buffer for writing @@ -1074,6 +1107,7 @@ func (a *DBFArea) flushRecord() error { if err == nil { a.dirty = false a.drainPendingIndexInserts() + a.markPathDirty() } return err } @@ -1082,10 +1116,24 @@ func (a *DBFArea) flushRecord() error { if err == nil { a.dirty = false a.drainPendingIndexInserts() + a.markPathDirty() } return err } +// markPathDirty bumps the per-path mmap-gen counter so peer +// DBFAreas on this file see their cached mmap as stale on next +// loadRecord and force a fresh ReadAt. Cheap (one atomic.Add) and +// only fires for shared-mode writers — EXCLUSIVE mode has no peers +// to invalidate. +func (a *DBFArea) markPathDirty() { + if !a.shared || a.pathGen == nil { + return + } + bumpPathGen(a.filePath) + a.pathGenSeen = loadPathGen(a.pathGen) +} + // drainPendingIndexInserts pushes any queued APPENDed records into // every open IndexWriter so a follow-up dbSeek finds them. Engines // that don't implement IndexWriter (CDX today) are skipped — those diff --git a/hbrtl/rdd.go b/hbrtl/rdd.go index 5e81b32..bd21701 100644 --- a/hbrtl/rdd.go +++ b/hbrtl/rdd.go @@ -109,24 +109,25 @@ func rtlFieldGet(t *hbrt.Thread) { t.RetValue() } -// getWA returns the WorkAreaManager with cached type assertion. -var waCache = struct { - iface interface{} - wam *hbrdd.WorkAreaManager -}{} - +// getWA resolves the WorkAreaManager attached to this thread. +// +// The previous version cached the last-seen interface→*WAM pair in +// a process-global struct to skip the type assertion. That cache +// was the worst-of-both-worlds under multi-pgserver-connection +// load: each connection's thread has its own WAM, so the cache +// missed on every call and immediately re-wrote two shared +// `interface{}` fields. Go's interface is a two-word value, so a +// concurrent write+read produced torn pointers — different +// goroutines saw the WRONG WAM as their own, leading to the +// FieldPosCache + WAM.aliases "concurrent map writes" panics. +// +// The type assertion itself is fast (~ns). Drop the cache; if the +// micro-bench matters again, replace with a sync/atomic.Pointer +// or sync.Map keyed by thread, not a single global slot. func getWA(t *hbrt.Thread) *hbrdd.WorkAreaManager { if t.WA == nil { return nil } - if t.WA == waCache.iface { - return waCache.wam - } - wa, ok := t.WA.(*hbrdd.WorkAreaManager) - if !ok { - return nil - } - waCache.iface = t.WA - waCache.wam = wa + wa, _ := t.WA.(*hbrdd.WorkAreaManager) return wa }