Files
obitools4/pkg/obilua/luahttp.go
T

129 lines
3.3 KiB
Go
Raw Normal View History

2026-04-07 08:36:50 +02:00
package obilua
import (
2026-04-13 18:30:16 +02:00
"context"
2026-04-07 08:36:50 +02:00
"io"
"net/http"
"strings"
"sync"
2026-04-07 08:36:50 +02:00
"time"
"git.metabarcoding.org/obitools/obitools4/obitools4/pkg/obidefault"
2026-04-07 08:36:50 +02:00
lua "github.com/yuin/gopher-lua"
)
2026-04-13 18:30:16 +02:00
const httpClientTimeout = 300 * time.Second
2026-04-07 08:36:50 +02:00
var (
_httpClient *http.Client
_httpClientOnce sync.Once
2026-04-13 18:30:16 +02:00
// _httpSemaphore limits the number of concurrent HTTP requests.
// Initialised lazily alongside the client.
_httpSemaphore chan struct{}
)
func getHTTPClient() *http.Client {
_httpClientOnce.Do(func() {
conns := 2 * obidefault.ParallelWorkers()
_httpClient = &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: conns,
MaxConnsPerHost: conns,
IdleConnTimeout: 90 * time.Second,
},
Timeout: httpClientTimeout,
}
2026-04-13 18:30:16 +02:00
_httpSemaphore = make(chan struct{}, obidefault.ParallelWorkers())
})
return _httpClient
2026-04-07 08:36:50 +02:00
}
2026-04-13 14:29:08 +02:00
// RegisterHTTP registers the http module in the Lua state as a global,
// consistent with obicontext and BioSequence.
2026-04-07 08:36:50 +02:00
//
// Exposes:
//
2026-04-13 18:30:16 +02:00
// http.post(url, body [, timeout_ms]) → response string (on success)
// http.post(url, body [, timeout_ms]) → nil, err string (on error)
// http.set_concurrency(n) → set max simultaneous requests
2026-04-07 08:36:50 +02:00
func RegisterHTTP(luaState *lua.LState) {
table := luaState.NewTable()
luaState.SetField(table, "post", luaState.NewFunction(luaHTTPPost))
2026-04-13 18:30:16 +02:00
luaState.SetField(table, "set_concurrency", luaState.NewFunction(luaHTTPSetConcurrency))
2026-04-07 08:36:50 +02:00
luaState.SetGlobal("http", table)
}
2026-04-13 18:30:16 +02:00
// luaHTTPPost implements http.post(url, body [, timeout_ms]) for Lua.
//
// The optional third argument overrides the default timeout (in milliseconds).
// Concurrent requests are throttled through _httpSemaphore so that a
// single-threaded backend server is not overwhelmed by K parallel workers.
2026-04-07 08:36:50 +02:00
//
// Lua signature:
//
2026-04-13 18:30:16 +02:00
// local response = http.post(url, body)
// local response = http.post(url, body, 5000) -- 5 s timeout
// local response, err = http.post(url, body)
2026-04-07 08:36:50 +02:00
func luaHTTPPost(L *lua.LState) int {
url := L.CheckString(1)
body := L.CheckString(2)
2026-04-13 18:30:16 +02:00
client := getHTTPClient()
timeout := httpClientTimeout
if L.GetTop() >= 3 {
ms := L.CheckInt(3)
timeout = time.Duration(ms) * time.Millisecond
}
// Acquire semaphore slot — blocks until a slot is free.
_httpSemaphore <- struct{}{}
defer func() { <-_httpSemaphore }()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, strings.NewReader(body))
2026-04-07 08:36:50 +02:00
if err != nil {
L.Push(lua.LNil)
L.Push(lua.LString(err.Error()))
return 2
}
req.Header.Set("Content-Type", "application/json")
2026-04-13 18:30:16 +02:00
resp, err := client.Do(req)
2026-04-07 08:36:50 +02:00
if err != nil {
L.Push(lua.LNil)
L.Push(lua.LString(err.Error()))
return 2
}
defer resp.Body.Close()
respBytes, err := io.ReadAll(resp.Body)
if err != nil {
L.Push(lua.LNil)
L.Push(lua.LString(err.Error()))
return 2
}
L.Push(lua.LString(respBytes))
return 1
}
2026-04-13 18:30:16 +02:00
// luaHTTPSetConcurrency replaces the semaphore with a new one of size n.
// Must be called before the first http.post (e.g. in begin()).
//
// Lua signature:
//
// http.set_concurrency(1) -- serialise all HTTP requests
func luaHTTPSetConcurrency(L *lua.LState) int {
n := L.CheckInt(1)
if n < 1 {
n = 1
}
getHTTPClient() // ensure singleton is initialised
_httpSemaphore = make(chan struct{}, n)
return 0
}