Files
five/hbrdd/ntx/build.go
CharlesKWON 6c5374778a perf(rdd): index build 38% faster — sort.Interface + fast path for numeric/UPPER
Benchmark (50k records, 4 indexes on Apple M-series):
             before   after   Δ
  INDEX     53.7ms  33.3ms  -38%  (now 10% faster than Harbour 37.3ms)
  TOTAL    156.2ms 133.0ms  -15%

Fixes:

1. sort.Slice(reflection) → concrete sort.Interface
   Benchmarked in isolation on 200k KeyRecords:
   sort.Slice(closure):  50.0ms
   sort.Sort(interface): 30.4ms  (40% faster, no reflection)

   - indexer.go: add keyRecordAsc/Desc concrete types
   - Branch hoist descending check out of Less()

2. buildOnePage zero allocation
   Was allocating a temp padded []byte per key (~50k allocs per index).
   Now writes padded key directly into the page buffer via padCopy.

3. bulkBuildBTree separator reuse
   sepKey can alias the source KeyRecord.Key when it's already keyLen-sized
   (true for all slab-allocated keys), avoiding ~n/maxItem small allocations.
   Pre-size the children slice.

4. Fast path extended to numeric fields and UPPER/LOWER
   Previously only bare CHAR field references hit the zero-alloc fast path.
   Now:
     - Numeric fields (N/F type) copy DBF bytes directly
       (same-length ASCII compare matches numeric order for non-negatives)
     - UPPER(field) / LOWER(field) wrappers on CHAR fields apply ASCII
       case folding inline during byte copy

   Per-index timing on the micro benchmark:
               before   after
     NAME       7.7ms   7.5ms  (fast path, unchanged)
     CITY       6.0ms   6.2ms  (fast path, unchanged)
     AGE       14.1ms   7.1ms  -50%  (was slow path)
     UPPER(NM) 17.0ms   7.9ms  -54%  (was slow path)

5. Slow path single-pass scan
   When an expression is too complex for fast path, we still avoid the
   double GoTo per record. The evaluation loop now sequentially walks
   records with one GoTo each, restoring the original position only at
   the end, and shares a single slab for padded keys.

Also fixes a hbrt bug surfaced while writing the benchmark:

6. Date + Numeric promoted to Date
   Plus()/Minus() previously required the integer side to be NumInt.
   Modulus returns a promoted type, so `SToD("...") + (i % 365)` panicked.
   Now accepts any Numeric on either side and truncates the fractional
   part before adding Julian days.

   - hbrt/ops_arith.go: Date±Numeric (was Date±NumInt only)

Tests:
  go test ./...        — ALL PASS (17 packages)
  FiveSql2 43/43       — 100%
  compat_harbour 51/51 — 100%
  Harbour vs Five diff — 0 lines differ (281-line RDD parity test)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-11 17:24:49 +09:00

828 lines
24 KiB
Go

// Copyright (c) 2026 Charles KWON OhJun (charleskwonohjun@gmail.com)
// All rights reserved.
// NTX index creation (INDEX ON) and key insertion.
package ntx
import (
"encoding/binary"
"fmt"
"os"
"sort"
"syscall"
)
// KeyRecord pairs a key value with its record number for sorting.
type KeyRecord struct {
Key []byte
RecNo uint32
}
// CreateIndex builds a new NTX index file from sorted key-record pairs.
func CreateIndex(path string, keyExpr string, keyLen int, unique bool, descend bool, keys []KeyRecord) (*Index, error) {
if len(path) < 4 || path[len(path)-4:] != ".ntx" {
path += ".ntx"
}
f, err := os.Create(path)
if err != nil {
return nil, fmt.Errorf("create NTX %s: %w", path, err)
}
itemSize := 8 + keyLen
maxItem := calculateMaxItems(itemSize)
halfPage := maxItem / 2
// Proper B-tree bulk build — works for any depth.
// Strategy: split sorted keys into groups of maxItem with separators between.
// keys[0..M-1] → leaf, keys[M] → separator, keys[M+1..2M] → leaf, ...
// Each separator exists ONLY in the parent, never in a leaf.
var buf pageBuffer
buf.init()
var rootOffset uint32
if len(keys) == 0 {
off := buf.allocPage()
initPageOffsets(buf.getPage(off), maxItem, itemSize)
rootOffset = uint32(off)
} else {
rootOffset = uint32(bulkBuildBTree(&buf, keys, keyLen, maxItem, itemSize))
}
nextPage := uint32(int64(HeaderSize) + int64(buf.count)*BlockSize)
// Write header
hdr := Header{
Type: 0x0401,
Version: 1,
Root: rootOffset,
NextPage: nextPage,
ItemSize: uint16(itemSize),
KeySize: uint16(keyLen),
KeyDec: 0,
MaxItem: uint16(maxItem),
HalfPage: uint16(halfPage),
}
copy(hdr.KeyExpr[:], keyExpr)
if unique {
hdr.Unique = 1
}
if descend {
hdr.Descend = 1
}
if err := WriteHeader(f, &hdr); err != nil {
f.Close()
return nil, err
}
// Bulk write all pages at correct offset (after header)
if buf.count > 0 {
if _, err := f.WriteAt(buf.data[:int64(buf.count)*BlockSize], int64(HeaderSize)); err != nil {
f.Close()
return nil, fmt.Errorf("bulk write NTX pages: %w", err)
}
}
f.Close()
return OpenIndex(path)
}
// bulkBuildBTree builds a proper B-tree in memory from sorted keys.
// Returns the file offset of the root page.
// Algorithm: distribute keys into leaf groups with separators extracted between them.
// [leaf0: M keys] [sep0] [leaf1: M keys] [sep1] ... [leafN: remaining keys]
// Then recursively build interior level from (leaf offsets + separator keys).
func bulkBuildBTree(buf *pageBuffer, keys []KeyRecord, keyLen, maxItem, itemSize int) int64 {
if len(keys) <= maxItem {
// Single leaf — base case
return buildOnePage(buf, keys, keyLen, maxItem, itemSize, nil)
}
// Split keys into leaf groups + separators
type childInfo struct {
offset int64
sepKey []byte // separator AFTER this child (nil for last)
sepRec uint32
}
// Pre-size to avoid slice growth during leaf splitting.
children := make([]childInfo, 0, len(keys)/maxItem+2)
i := 0
for i < len(keys) {
end := i + maxItem
if end > len(keys) {
end = len(keys)
}
// Prevent orphan: if taking maxItem keys leaves only 1 key for separator+next leaf,
// adjust so the last two leaves share keys evenly.
if end < len(keys) {
remaining := len(keys) - end
if remaining <= 2 {
if end-i+remaining <= maxItem {
// Absorb: total fits in one page
end = len(keys)
} else {
// Split evenly: give half of remaining+current to each of last 2 leaves.
// Total keys for last 2 leaves + 1 separator = len(keys) - i
total := len(keys) - i
end = i + (total-1)/2 // -1 accounts for separator between them
}
}
}
chunk := keys[i:end]
leafOff := buildOnePage(buf, chunk, keyLen, maxItem, itemSize, nil)
ci := childInfo{offset: leafOff}
i = end
// Extract separator only if 2+ keys remain (1 for sep + 1+ for next leaf)
if i < len(keys) && i+1 < len(keys) {
// At least 1 more key after separator → safe to promote.
// Reference the source key directly (caller's slab allocation is
// keyLen-aligned from OrderCreate's fast path, so no padding copy
// is needed). For slow path, the key was already padded in-place.
if len(keys[i].Key) == keyLen {
ci.sepKey = keys[i].Key
} else {
ci.sepKey = make([]byte, keyLen)
padCopy(ci.sepKey, keys[i].Key, keyLen)
}
ci.sepRec = keys[i].RecNo
i++ // skip separator key — it goes to parent only
}
children = append(children, ci)
}
// Build interior levels bottom-up
for len(children) > 1 {
var nextChildren []childInfo
j := 0
for j < len(children) {
// Collect up to maxItem+1 children for one parent page
end := j + maxItem + 1
if end > len(children) {
end = len(children)
}
group := children[j:end]
// Build parent page: separators come from group[0..n-2].sepKey
nKeys := len(group) - 1
parentOff := buf.allocPage()
pg := buf.getPage(parentOff)
initPageOffsets(pg, maxItem, itemSize)
binary.LittleEndian.PutUint16(pg[0:2], uint16(nKeys))
for k := 0; k <= nKeys; k++ {
entOff := int(binary.LittleEndian.Uint16(pg[2+k*2 : 4+k*2]))
binary.LittleEndian.PutUint32(pg[entOff:entOff+4], uint32(group[k].offset))
if k < nKeys && group[k].sepKey != nil {
binary.LittleEndian.PutUint32(pg[entOff+4:entOff+8], group[k].sepRec)
copy(pg[entOff+8:entOff+8+keyLen], group[k].sepKey)
}
}
// This parent becomes a child for the next level
ci := childInfo{offset: parentOff}
// Separator for this parent = last group member's separator
// (the separator that would follow this parent's range)
if end < len(children) {
// Use the last child's separator as the parent's separator
ci.sepKey = group[nKeys].sepKey
ci.sepRec = group[nKeys].sepRec
}
nextChildren = append(nextChildren, ci)
j = end
}
children = nextChildren
}
return children[0].offset
}
// buildOnePage creates a single leaf or interior page with the given keys.
// Zero-allocation: writes padded keys directly into the page buffer.
func buildOnePage(buf *pageBuffer, keys []KeyRecord, keyLen, maxItem, itemSize int, childOffsets []int64) int64 {
off := buf.allocPage()
pg := buf.getPage(off)
initPageOffsets(pg, maxItem, itemSize)
for i, kr := range keys {
entOff := int(binary.LittleEndian.Uint16(pg[2+i*2 : 4+i*2]))
if childOffsets != nil && i < len(childOffsets) {
binary.LittleEndian.PutUint32(pg[entOff:entOff+4], uint32(childOffsets[i]))
} else {
binary.LittleEndian.PutUint32(pg[entOff:entOff+4], 0) // leaf
}
binary.LittleEndian.PutUint32(pg[entOff+4:entOff+8], kr.RecNo)
// Write padded key directly into page buffer (no intermediate alloc).
padCopy(pg[entOff+8:entOff+8+keyLen], kr.Key, keyLen)
}
binary.LittleEndian.PutUint16(pg[0:2], uint16(len(keys)))
return off
}
// padCopy copies src into dst, padding with spaces. Zero extra allocation.
func padCopy(dst, src []byte, keyLen int) {
n := copy(dst, src)
for i := n; i < keyLen; i++ {
dst[i] = ' '
}
}
// rebuildWithInsert creates an NTX using per-key insertion (proper B-tree).
// Used for 3+ level trees where bulk build has separator duplication issues.
func rebuildWithInsert(path, keyExpr string, keyLen int, unique, descend bool, keys []KeyRecord) (*Index, error) {
itemSize := 8 + keyLen
maxItem := calculateMaxItems(itemSize)
halfPage := maxItem / 2
f, err := os.Create(path)
if err != nil {
return nil, err
}
rootOff := int64(HeaderSize)
emptyRoot := [BlockSize]byte{}
initPageOffsets(emptyRoot[:], maxItem, itemSize)
hdr := Header{
Type: 0x0401, Version: 1,
Root: uint32(rootOff), NextPage: uint32(rootOff + BlockSize),
ItemSize: uint16(itemSize), KeySize: uint16(keyLen),
MaxItem: uint16(maxItem), HalfPage: uint16(halfPage),
}
copy(hdr.KeyExpr[:], keyExpr)
if unique {
hdr.Unique = 1
}
if descend {
hdr.Descend = 1
}
WriteHeader(f, &hdr)
f.WriteAt(emptyRoot[:], rootOff)
f.Close()
idx, err := OpenIndex(path)
if err != nil {
return nil, err
}
for _, kr := range keys {
k := make([]byte, keyLen)
copy(k, kr.Key)
if err := idx.insertKeyBTree(k, kr.RecNo); err != nil {
idx.Close()
return nil, err
}
}
// Re-enable mmap now that file is complete
idx.mmapFile()
return idx, nil
}
// --- Bulk build buffer (ported from rddfive/ntx_engine.c) ---
type pageBuffer struct {
data []byte
count int
capacity int
}
func (pb *pageBuffer) init() {
pb.capacity = 64
pb.data = make([]byte, int64(pb.capacity)*BlockSize)
}
func (pb *pageBuffer) grow() {
if pb.count >= pb.capacity {
newCap := pb.capacity * 2
newData := make([]byte, int64(newCap)*BlockSize)
copy(newData, pb.data[:int64(pb.count)*BlockSize])
pb.data = newData
pb.capacity = newCap
}
}
// allocPage allocates a new page and returns its file offset.
func (pb *pageBuffer) allocPage() int64 {
pb.grow()
off := int64(HeaderSize) + int64(pb.count)*BlockSize
pb.count++
return off
}
// getPage returns a slice into the buffer for the page at the given file offset.
func (pb *pageBuffer) getPage(fileOffset int64) []byte {
idx := (fileOffset - int64(HeaderSize)) / BlockSize
start := idx * BlockSize
return pb.data[start : start+BlockSize]
}
// initPageOffsets pre-initializes the offset table for a page.
func initPageOffsets(pg []byte, maxItem, itemSize int) {
dataStart := 2 + (maxItem+1)*2
for i := 0; i <= maxItem; i++ {
binary.LittleEndian.PutUint16(pg[2+i*2:4+i*2], uint16(dataStart+i*itemSize))
}
}
// --- Internal build structures (legacy) ---
type buildPage struct {
data [BlockSize]byte
fileOffset int64
keyCount int
firstKey []byte
firstRecNo uint32
}
func calculateMaxItems(itemSize int) int {
// Page layout: [keyCount:2] [offsets:(max+1)*2] [entries:(max+1)*itemSize]
// Total must fit in BlockSize (1024).
// 2 + (max+1)*2 + (max+1)*itemSize <= 1024
// (max+1) * (itemSize + 2) <= 1022
// max+1 <= 1022 / (itemSize + 2)
// max <= 1022/(itemSize+2) - 1
max := 1022/(itemSize+2) - 1
if max < 2 {
max = 2
}
if max > 250 {
max = 250
}
return max
}
func makeEmptyPage(keyLen, itemSize, maxItem int, offset int64) *buildPage {
pg := &buildPage{
fileOffset: offset,
firstKey: make([]byte, keyLen),
}
binary.LittleEndian.PutUint16(pg.data[0:2], 0)
return pg
}
func buildLeafPages(keys []KeyRecord, keyLen, itemSize, maxItem int, nextOffset *int64) []*buildPage {
if len(keys) == 0 {
return nil
}
var pages []*buildPage
for i := 0; i < len(keys); i += maxItem {
end := i + maxItem
if end > len(keys) {
end = len(keys)
}
chunk := keys[i:end]
pg := encodeLeafPage(chunk, keyLen, itemSize, maxItem, *nextOffset)
*nextOffset += BlockSize
pages = append(pages, pg)
}
return pages
}
func encodeLeafPage(keys []KeyRecord, keyLen, itemSize, maxItem int, offset int64) *buildPage {
pg := &buildPage{
fileOffset: offset,
keyCount: len(keys),
firstKey: make([]byte, keyLen),
firstRecNo: keys[0].RecNo,
}
copy(pg.firstKey, keys[0].Key)
binary.LittleEndian.PutUint16(pg.data[0:2], uint16(len(keys)))
dataStart := 2 + (maxItem+1)*2
for i, kr := range keys {
entryOffset := dataStart + i*itemSize
binary.LittleEndian.PutUint16(pg.data[2+i*2:4+i*2], uint16(entryOffset))
binary.LittleEndian.PutUint32(pg.data[entryOffset:entryOffset+4], 0) // leaf
binary.LittleEndian.PutUint32(pg.data[entryOffset+4:entryOffset+8], kr.RecNo)
key := make([]byte, keyLen)
for j := range key {
key[j] = ' '
}
copy(key, kr.Key)
copy(pg.data[entryOffset+8:entryOffset+8+keyLen], key)
}
// Trailing offset
binary.LittleEndian.PutUint16(
pg.data[2+len(keys)*2:4+len(keys)*2],
uint16(dataStart+len(keys)*itemSize),
)
return pg
}
func buildInternalLevel(children []*buildPage, keyLen, itemSize, maxItem int, nextOffset *int64) []*buildPage {
var pages []*buildPage
// Each internal page holds up to maxItem keys and maxItem+1 child pointers.
// So each parent page covers maxItem+1 children.
fanout := maxItem + 1
for i := 0; i < len(children); i += fanout {
end := i + fanout
if end > len(children) {
end = len(children)
}
chunk := children[i:end]
pg := encodeInternalPage(chunk, keyLen, itemSize, maxItem, *nextOffset)
*nextOffset += BlockSize
pages = append(pages, pg)
}
return pages
}
func encodeInternalPage(children []*buildPage, keyLen, itemSize, maxItem int, offset int64) *buildPage {
nKeys := len(children) - 1
if nKeys < 0 {
nKeys = 0
}
pg := &buildPage{
fileOffset: offset,
keyCount: nKeys,
}
if len(children) > 0 {
pg.firstKey = make([]byte, keyLen)
copy(pg.firstKey, children[0].firstKey)
pg.firstRecNo = children[0].firstRecNo
}
binary.LittleEndian.PutUint16(pg.data[0:2], uint16(nKeys))
dataStart := 2 + (maxItem+1)*2
for i := 0; i <= nKeys; i++ {
entryOffset := dataStart + i*itemSize
if entryOffset+itemSize > BlockSize {
// Page overflow — cap keys
binary.LittleEndian.PutUint16(pg.data[0:2], uint16(i))
break
}
binary.LittleEndian.PutUint16(pg.data[2+i*2:4+i*2], uint16(entryOffset))
// Child pointer
binary.LittleEndian.PutUint32(pg.data[entryOffset:entryOffset+4],
uint32(children[i].fileOffset))
if i < nKeys {
// Separator = first key of child[i+1], promoted (not duplicated)
binary.LittleEndian.PutUint32(pg.data[entryOffset+4:entryOffset+8],
children[i+1].firstRecNo)
key := make([]byte, keyLen)
for j := range key {
key[j] = ' '
}
copy(key, children[i+1].firstKey)
copy(pg.data[entryOffset+8:entryOffset+8+keyLen], key)
} else {
binary.LittleEndian.PutUint32(pg.data[entryOffset+4:entryOffset+8], 0)
}
}
return pg
}
// --- B-tree insertion ---
// insertKeyBTree inserts a single key into the B-tree with proper page splitting.
// Harbour: hb_ntxTagKeyAdd in dbfntx1.c
func (idx *Index) insertKeyBTree(key []byte, recNo uint32) error {
// Disable mmap during insertion (file grows, mmap stale)
if idx.mmapData != nil {
syscall.Munmap(idx.mmapData)
idx.mmapData = nil
}
// Search for insertion position
idx.stackLevel = 0
pageOff := int64(idx.header.Root)
for {
page, err := LoadPage(idx.file, pageOff)
if err != nil {
return err
}
iKey := idx.insertSearch(page, key, recNo)
if idx.stackLevel < StackSize {
idx.stack[idx.stackLevel] = StackEntry{PageOffset: pageOff, KeyIndex: iKey}
idx.stackLevel++
}
childOff := page.KeyChild(iKey)
if childOff == 0 {
break // at leaf
}
pageOff = int64(childOff)
}
// Insert at leaf, propagate splits up
var promoteKey []byte
var promoteRecNo uint32
var promoteChild uint32
for level := idx.stackLevel - 1; level >= 0; level-- {
page, err := LoadPage(idx.file, idx.stack[level].PageOffset)
if err != nil {
return err
}
iKey := idx.stack[level].KeyIndex
var insertKey []byte
var insertRecNo uint32
var insertChild uint32
if level == idx.stackLevel-1 {
// Leaf insertion
insertKey = key
insertRecNo = recNo
insertChild = 0
} else {
// Promoted key from child split
insertKey = promoteKey
insertRecNo = promoteRecNo
insertChild = promoteChild
}
if int(page.keyCount) < int(idx.header.MaxItem) {
// Page has room — insert directly
idx.pageInsertKey(page, iKey, insertKey, insertRecNo, insertChild)
page.writeTo(idx.file, idx.stack[level].PageOffset)
return nil
}
// Page full — split
promoteKey, promoteRecNo, promoteChild, err = idx.pageSplit(page, iKey, insertKey, insertRecNo, insertChild, idx.stack[level].PageOffset)
if err != nil {
return err
}
}
// Split propagated to root — create new root
// Harbour: new page with promoted key at pos 0
// child[0] = promoteChild (newPage = LEFT half)
// child[1] = old root (RIGHT half)
newRootOff := int64(idx.header.NextPage)
idx.header.NextPage += uint32(BlockSize)
maxItem := int(idx.header.MaxItem)
itemSize := int(idx.header.ItemSize)
dataStart := 2 + (maxItem+1)*2
newRoot := &Page{data: make([]byte, BlockSize), keyCount: 0}
for i := 0; i <= maxItem; i++ {
binary.LittleEndian.PutUint16(newRoot.data[2+i*2:4+i*2], uint16(dataStart+i*itemSize))
}
// Insert promoted key at position 0 with child = promoteChild (left half)
idx.pageInsertKey(newRoot, 0, promoteKey, promoteRecNo, promoteChild)
// Set trailing child (position 1) = old root (right half)
trailOff := int(newRoot.keyOffset(1))
binary.LittleEndian.PutUint32(newRoot.data[trailOff:trailOff+4], idx.header.Root)
newRoot.writeTo(idx.file, newRootOff)
idx.header.Root = uint32(newRootOff)
idx.file.Seek(0, 0)
WriteHeader(idx.file, &idx.header)
return nil
}
// insertSearch finds the insertion position in a page (binary search).
func (idx *Index) insertSearch(page *Page, key []byte, recNo uint32) int {
lo, hi := 0, int(page.keyCount)-1
for lo <= hi {
mid := (lo + hi) / 2
cmp := idx.compareKeys(key, page.KeyValue(mid, idx.keyLen))
if cmp == 0 {
// Equal keys: sort by recNo
midRec := page.KeyRecNo(mid)
if recNo <= midRec {
hi = mid - 1
} else {
lo = mid + 1
}
} else if cmp < 0 {
hi = mid - 1
} else {
lo = mid + 1
}
}
return lo
}
// pageInsertKey inserts a key at position iKey in a page.
// Harbour: hb_ntxPageKeyAdd — swaps offsets, writes key data at freed slot.
func (idx *Index) pageInsertKey(page *Page, iKey int, key []byte, recNo uint32, childPage uint32) {
kc := int(page.keyCount)
// The offset at position kc+1 points to the next free data slot
freeOff := page.keyOffset(kc + 1)
// Shift offset table right: move [iKey..kc] to [iKey+1..kc+1]
for i := kc + 1; i > iKey; i-- {
prev := page.keyOffset(i - 1)
binary.LittleEndian.PutUint16(page.data[2+i*2:4+i*2], prev)
}
// Put the free slot offset at position iKey
binary.LittleEndian.PutUint16(page.data[2+iKey*2:4+iKey*2], freeOff)
// Write key data at the free offset
off := int(freeOff)
binary.LittleEndian.PutUint32(page.data[off:off+4], childPage)
binary.LittleEndian.PutUint32(page.data[off+4:off+8], recNo)
padKey := make([]byte, idx.keyLen)
for j := range padKey {
padKey[j] = ' '
}
copy(padKey, key)
copy(page.data[off+8:off+8+idx.keyLen], padKey)
page.keyCount++
binary.LittleEndian.PutUint16(page.data[0:2], page.keyCount)
}
// pageSplit splits a full page — exact port of Harbour hb_ntxPageSplit.
// NewPage = LEFT (lower half), OldPage = RIGHT (upper half, offset-swapped).
// Returns: promoted key, its recNo, and newPage offset (left child of promoted key).
func (idx *Index) pageSplit(page *Page, iKey int, key []byte, recNo uint32, childPage uint32, pageOff int64) ([]byte, uint32, uint32, error) {
maxItem := int(idx.header.MaxItem)
itemSize := int(idx.header.ItemSize)
dataStart := 2 + (maxItem+1)*2
// Allocate new page (will be the LEFT / lower half)
newPageOff := int64(idx.header.NextPage)
idx.header.NextPage += uint32(BlockSize)
newPage := &Page{data: make([]byte, BlockSize)}
// Initialize offset table
for i := 0; i <= maxItem; i++ {
binary.LittleEndian.PutUint16(newPage.data[2+i*2:4+i*2], uint16(dataStart+i*itemSize))
}
uiKeys := int(page.keyCount) + 1 // total including new key
uiHalf := uiKeys / 2
// Phase 1: Copy lower half to newPage
j := 0 // index into old page entries
for newPageCount := 0; newPageCount < uiHalf; newPageCount++ {
if newPageCount == iKey {
// Insert the new key here
idx.setKeyEntry(newPage, newPageCount, childPage, recNo, key)
} else {
// Copy from old page entry j
idx.copyKeyEntry(newPage, newPageCount, page, j)
j++
}
newPage.keyCount++
}
binary.LittleEndian.PutUint16(newPage.data[0:2], newPage.keyCount)
// Phase 2: Extract promoted key (middle key)
var promKey []byte
var promRecNo uint32
if uiHalf == iKey {
// The new key IS the promoted separator
promRecNo = recNo
promKey = append([]byte{}, key...)
// newPage's trailing child = the new key's child pointer
trailOff := int(newPage.keyOffset(int(newPage.keyCount)))
binary.LittleEndian.PutUint32(newPage.data[trailOff:trailOff+4], childPage)
} else {
// Promoted key comes from old page entry j
promRecNo = page.KeyRecNo(j)
promKey = append([]byte{}, page.KeyValue(j, idx.keyLen)...)
// newPage's trailing child = old page entry j's child
trailOff := int(newPage.keyOffset(int(newPage.keyCount)))
binary.LittleEndian.PutUint32(newPage.data[trailOff:trailOff+4], page.KeyChild(j))
j++
}
// Phase 3: Upper half stays in old page (offset swapping — Harbour style)
// Rearrange the old page's offset table so that entries j..keyCount-1
// become positions 0..i-1, using offset swapping (no data copying).
i := 0
for uiHalf++; uiHalf < uiKeys; uiHalf++ {
if uiHalf == iKey {
// Insert new key at position i in old page
idx.setKeyEntry(page, i, childPage, recNo, key)
} else {
// Swap offset[i] and offset[j] — data stays in place
offI := page.keyOffset(i)
offJ := page.keyOffset(j)
binary.LittleEndian.PutUint16(page.data[2+j*2:4+j*2], offI)
binary.LittleEndian.PutUint16(page.data[2+i*2:4+i*2], offJ)
j++
}
i++
}
// Move trailing child: old page's last child → position i
lastChild := page.KeyChild(int(page.keyCount))
trailOff := int(page.keyOffset(int(page.keyCount)))
binary.LittleEndian.PutUint32(page.data[trailOff:trailOff+4], 0) // clear old
newTrailOff := int(page.keyOffset(i))
binary.LittleEndian.PutUint32(page.data[newTrailOff:newTrailOff+4], lastChild)
page.keyCount = uint16(i)
binary.LittleEndian.PutUint16(page.data[0:2], page.keyCount)
// Write both pages
newPage.writeTo(idx.file, newPageOff)
page.writeTo(idx.file, pageOff)
// Update header
idx.file.Seek(0, 0)
WriteHeader(idx.file, &idx.header)
// Harbour: pKeyNew->Tag = pNewPage->Page (left child = newPage)
return promKey, promRecNo, uint32(newPageOff), nil
}
// setKeyEntry writes a key entry at position i in a page. Zero extra allocation.
func (idx *Index) setKeyEntry(page *Page, i int, child uint32, recNo uint32, key []byte) {
off := int(page.keyOffset(i))
binary.LittleEndian.PutUint32(page.data[off:off+4], child)
binary.LittleEndian.PutUint32(page.data[off+4:off+8], recNo)
dest := page.data[off+8 : off+8+idx.keyLen]
n := copy(dest, key)
for j := n; j < idx.keyLen; j++ {
dest[j] = ' '
}
}
// copyKeyEntry copies a full key entry (child+recNo+key) from src page position srcI to dst position dstI.
func (idx *Index) copyKeyEntry(dst *Page, dstI int, src *Page, srcI int) {
itemSize := int(idx.header.ItemSize)
dstOff := int(dst.keyOffset(dstI))
srcOff := int(src.keyOffset(srcI))
copy(dst.data[dstOff:dstOff+itemSize], src.data[srcOff:srcOff+itemSize])
}
// writeTo writes a page to file at the given offset.
func (p *Page) writeTo(f *os.File, offset int64) {
f.WriteAt(p.data[:], offset)
}
// --- Single key operations (legacy, uses rebuild) ---
func (idx *Index) InsertKey(key []byte, recNo uint32) error {
keys := idx.collectAllKeys()
newKey := make([]byte, idx.keyLen)
copy(newKey, key)
keys = append(keys, KeyRecord{Key: newKey, RecNo: recNo})
sort.Slice(keys, func(i, j int) bool {
cmp := idx.compareKeys(keys[i].Key, keys[j].Key)
if cmp == 0 {
return keys[i].RecNo < keys[j].RecNo
}
return cmp < 0
})
path := idx.file.Name()
idx.Close()
newIdx, err := CreateIndex(path, idx.header.GetKeyExpr(), idx.keyLen,
idx.uniqueKey, !idx.ascendKey, keys)
if err != nil {
return err
}
*idx = *newIdx
return nil
}
func (idx *Index) DeleteKey(recNo uint32) error {
keys := idx.collectAllKeys()
filtered := keys[:0]
for _, kr := range keys {
if kr.RecNo != recNo {
filtered = append(filtered, kr)
}
}
path := idx.file.Name()
idx.Close()
newIdx, err := CreateIndex(path, idx.header.GetKeyExpr(), idx.keyLen,
idx.uniqueKey, !idx.ascendKey, filtered)
if err != nil {
return err
}
*idx = *newIdx
return nil
}
func (idx *Index) collectAllKeys() []KeyRecord {
var keys []KeyRecord
if !idx.GoTop() {
return keys
}
keys = append(keys, KeyRecord{
Key: append([]byte{}, idx.curKey[:idx.keyLen]...),
RecNo: idx.curRecNo,
})
for idx.SkipNext() {
keys = append(keys, KeyRecord{
Key: append([]byte{}, idx.curKey[:idx.keyLen]...),
RecNo: idx.curRecNo,
})
}
return keys
}