// Five Example: Concurrent Data Processing Pipeline // // Go goroutines + channels for parallel processing. // 10만 건 레코드를 CPU 코어 수만큼 병렬 처리. PROCEDURE Main() LOCAL nRecords, aResult nRecords := 100000 ? "=== Five Concurrent Data Pipeline ===" ? "Processing", nRecords, "records with Go goroutines" ? aResult := GoPipeline(nRecords) ? "Results:" ? " Total records:", aResult["total"] ? " Total amount: ", aResult["amount"] ? " Avg per record:", aResult["average"] ? " Max single: ", aResult["max"] ? " Min single: ", aResult["min"] ? " Processing ms:", aResult["elapsed_ms"] ? " Records/sec: ", aResult["throughput"] ? ? "Category breakdown:" ? " Electronics: ", aResult["cat_electronics"] ? " Clothing: ", aResult["cat_clothing"] ? " Food: ", aResult["cat_food"] ? " Books: ", aResult["cat_books"] RETURN #pragma BEGINDUMP import ( "five/hbrt" "fmt" "math" "math/rand" "runtime" "strings" "sync" "time" ) func init() { hbrt.HB_FUNC("GOPIPELINE", goPipeline) } type record struct { id int category string amount float64 quantity int } type summary struct { total int amount float64 max, min float64 categories map[string]float64 } func goPipeline(ctx *hbrt.HBContext) { nRecords := ctx.ParNIDef(1, 100000) numWorkers := runtime.NumCPU() start := time.Now() // Stage 1: Generate records (simulates DB read) recordCh := make(chan record, 1000) go func() { categories := []string{"Electronics", "Clothing", "Food", "Books"} for i := 0; i < nRecords; i++ { recordCh <- record{ id: i + 1, category: categories[rand.Intn(len(categories))], amount: math.Round(rand.Float64()*1000*100) / 100, quantity: rand.Intn(50) + 1, } } close(recordCh) }() // Stage 2: Transform (parallel workers) type transformed struct { category string total float64 } transformCh := make(chan transformed, 1000) var wg sync.WaitGroup for w := 0; w < numWorkers; w++ { wg.Add(1) go func() { defer wg.Done() for r := range recordCh { total := r.amount * float64(r.quantity) total = math.Round(total*100) / 100 transformCh <- transformed{category: r.category, total: total} } }() } go func() { wg.Wait() close(transformCh) }() // Stage 3: Aggregate sum := summary{ min: math.MaxFloat64, categories: make(map[string]float64), } for t := range transformCh { sum.total++ sum.amount += t.total if t.total > sum.max { sum.max = t.total } if t.total < sum.min { sum.min = t.total } sum.categories[t.category] += t.total } elapsed := time.Since(start) // Build result hash for PRG result := ctx.HashNew() ctx.HashAdd(result, hbrt.MakeString("total"), hbrt.MakeInt(sum.total)) ctx.HashAdd(result, hbrt.MakeString("amount"), hbrt.MakeDouble(sum.amount, 0, 0)) ctx.HashAdd(result, hbrt.MakeString("average"), hbrt.MakeDouble(sum.amount/float64(sum.total), 0, 0)) ctx.HashAdd(result, hbrt.MakeString("max"), hbrt.MakeDouble(sum.max, 0, 0)) ctx.HashAdd(result, hbrt.MakeString("min"), hbrt.MakeDouble(sum.min, 0, 0)) ctx.HashAdd(result, hbrt.MakeString("elapsed_ms"), hbrt.MakeInt(int(elapsed.Milliseconds()))) throughput := fmt.Sprintf("%.0f", float64(nRecords)/elapsed.Seconds()) ctx.HashAdd(result, hbrt.MakeString("throughput"), hbrt.MakeString(throughput)) for _, cat := range []string{"Electronics", "Clothing", "Food", "Books"} { key := "cat_" + strings.ToLower(cat) ctx.HashAdd(result, hbrt.MakeString(key), hbrt.MakeDouble(sum.categories[cat], 0, 0)) } ctx.RetVal(result) } #pragma ENDDUMP