feat(napi): VM pool + per-request isolation for serial/parallel handling
- fnode capi shim: single VM+mutex -> VM pool (FIVENODE_VM_POOL, default 4; 1=serial, N=parallel). Each request checks out its own VM so PRG runs concurrently across libuv worker threads. - per-request data keyed by VM (FN_NAPI_REQ via ctx.T.VM()) -- no shared capiReq race. - napibridge: per-VM handle tracking; ReleaseAll(vm) auto-ends only that request's npm handles (parallel-safe auto-__end__). FN_AWAIT replaces the reserved Five AWAIT keyword (Clipper-compat, no gengo codegen -> NIL). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -205,6 +205,8 @@ import "C"
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"sync"
|
||||
"unsafe"
|
||||
|
||||
@@ -213,30 +215,62 @@ import (
|
||||
"fivenode_go/hbrtl_ext/napibridge"
|
||||
)
|
||||
|
||||
// VM pool — supports both serial (pool=1) and parallel (pool=N) request
|
||||
// handling. Each in-flight request checks out its own VM, so PRG runs
|
||||
// concurrently across libuv worker threads without sharing VM state. The
|
||||
// npm bridge (C side) gives each worker thread its own slot, so one request
|
||||
// awaiting npm I/O does not block others.
|
||||
var (
|
||||
capiInit sync.Once
|
||||
capiVM *hbrt.VM
|
||||
capiMu sync.Mutex
|
||||
capiReq string
|
||||
capiErr string
|
||||
capiInit sync.Once
|
||||
capiPool chan *hbrt.VM
|
||||
capiReqMu sync.Mutex
|
||||
capiReqByVM = map[*hbrt.VM]string{}
|
||||
)
|
||||
|
||||
// FN_NAPI_REQ is registered via HB_FUNC (same mechanism as core/ext RTL) so
|
||||
// PRG direct calls resolve it. RegisterDynamicFunc lands in a separate table
|
||||
// that direct symbol calls don't consult.
|
||||
// FN_NAPI_REQ returns the request JSON for the VM running this PRG. Keyed by
|
||||
// VM (ctx.T.VM()) so parallel requests — each on its own pooled VM — never
|
||||
// see each other's request.
|
||||
func init() {
|
||||
hbrt.HB_FUNC("FN_NAPI_REQ", func(ctx *hbrt.HBContext) {
|
||||
ctx.RetC(capiReq)
|
||||
vm := ctx.T.VM()
|
||||
capiReqMu.Lock()
|
||||
r := capiReqByVM[vm]
|
||||
capiReqMu.Unlock()
|
||||
ctx.RetC(r)
|
||||
})
|
||||
}
|
||||
|
||||
// capiPoolSize: FIVENODE_VM_POOL env (default 4). 1 = serial, N = parallel.
|
||||
func capiPoolSize() int {
|
||||
if v := os.Getenv("FIVENODE_VM_POOL"); v != "" {
|
||||
if n, err := strconv.Atoi(v); err == nil && n > 0 {
|
||||
return n
|
||||
}
|
||||
}
|
||||
return 4
|
||||
}
|
||||
|
||||
func capiEnsure() {
|
||||
capiInit.Do(func() {
|
||||
capiVM = hbrt.NewVM()
|
||||
hbrtl.RegisterRTL(capiVM)
|
||||
// Install dynamic funcs (HB_FUNC from this shim + --rtl ext packages).
|
||||
// vm.Run drains libModules but NOT dynamicFuncs, so do it explicitly.
|
||||
capiVM.RegisterLibModules()
|
||||
// Snapshot the lib registry (PRG modules + every HB_FUNC) ONCE, then
|
||||
// install the SAME set into each pooled VM. RegisterLibModules drains
|
||||
// the global registry, so without snapshotting only the first VM would
|
||||
// have FN_HANDLE and the bridge funcs.
|
||||
mods, dyns := hbrt.LibRegistrySnapshotAndDrain()
|
||||
n := capiPoolSize()
|
||||
capiPool = make(chan *hbrt.VM, n)
|
||||
for i := 0; i < n; i++ {
|
||||
vm := hbrt.NewVM()
|
||||
hbrtl.RegisterRTL(vm)
|
||||
for _, m := range mods {
|
||||
vm.RegisterModule(m)
|
||||
}
|
||||
for j := range dyns {
|
||||
s := dyns[j]
|
||||
vm.RegisterSymbol(&s)
|
||||
}
|
||||
capiPool <- vm
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -251,28 +285,41 @@ func hb_bridge_shutdown() {}
|
||||
|
||||
//export hb_bridge_handle_request
|
||||
func hb_bridge_handle_request(req *C.char) *C.char {
|
||||
capiMu.Lock()
|
||||
defer capiMu.Unlock()
|
||||
capiEnsure()
|
||||
capiReq = C.GoString(req)
|
||||
capiErr = ""
|
||||
vm := <-capiPool
|
||||
defer func() { capiPool <- vm }()
|
||||
|
||||
capiReqMu.Lock()
|
||||
capiReqByVM[vm] = C.GoString(req)
|
||||
capiReqMu.Unlock()
|
||||
|
||||
// On request end (normal or panic): auto-release this request's npm
|
||||
// handles (P3, PRG may skip __end__) and clear its request slot.
|
||||
defer func() {
|
||||
napibridge.ReleaseAll(vm)
|
||||
capiReqMu.Lock()
|
||||
delete(capiReqByVM, vm)
|
||||
capiReqMu.Unlock()
|
||||
}()
|
||||
|
||||
out := ""
|
||||
errStr := ""
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
capiErr = fmt.Sprintf("%v", r)
|
||||
errStr = fmt.Sprintf("%v", r)
|
||||
}
|
||||
}()
|
||||
out = capiVM.Run("FN_HANDLE").AsString()
|
||||
out = vm.Run("FN_HANDLE").AsString()
|
||||
}()
|
||||
if capiErr != "" {
|
||||
if errStr != "" {
|
||||
out = "{\"status\":500,\"headers\":{\"Content-Type\":\"text/plain\"},\"body\":\"PRG error\"}"
|
||||
}
|
||||
return C.CString(out)
|
||||
}
|
||||
|
||||
//export hb_bridge_last_error
|
||||
func hb_bridge_last_error() *C.char { return C.CString(capiErr) }
|
||||
func hb_bridge_last_error() *C.char { return C.CString("") }
|
||||
|
||||
//export hb_bridge_set_auth
|
||||
func hb_bridge_set_auth(a *C.char) {}
|
||||
|
||||
Reference in New Issue
Block a user