From 69dba08714ed200e37c0c9c2a445b337ffcec97a Mon Sep 17 00:00:00 2001 From: Mansur Iqbal Date: Wed, 10 Jun 2026 13:36:53 +0200 Subject: [PATCH] feat(worker): add per-request worker_timeout (hard request timeout) Add an experimental `worker_timeout` worker option: a hard per-request timeout for worker mode, the equivalent of PHP-FPM's request_terminate_timeout. When a worker request runs longer than the timeout it is aborted with a "Worker request timeout of N second(s) exceeded" fatal and the worker restarts cleanly for the next request. Unlike max_execution_time, this also covers time spent blocked in an external call. A signal/EINTR alone cannot abort such a call (PHP retries EINTR, and mysqlnd even drops its socket from EG(regular_list)), so on Linux the watchdog inspects what the thread is parked in via /proc/self/task//syscall and shuts down the socket(s) involved: - read/recvfrom/recvmsg/connect: fd is the syscall's first argument; - poll/ppoll: the pollfd array is read from the process's own memory with process_vm_readv(2) (PHP's stream layer, and Redis/HTTP/DB clients on it, always poll before reading). Both syscalls are matched: glibc and musl implement poll() via the dedicated poll syscall on arches that have one (e.g. amd64) and via ppoll only elsewhere (e.g. arm64); - epoll_wait/epoll_pwait: watched fds are enumerated from /proc/self/fdinfo/ (covers curl_multi, gRPC). Every fd is confirmed to be a socket, and after recovering a pointer/table-derived fd the thread's syscall is re-read to confirm it is still parked there before shutdown, so a stale pointer or reused fd cannot close an unrelated descriptor. The watchdog body runs under the same mutex as its cancellation, so a watchdog racing request completion can never interrupt the wrong request. A long sleep() is woken by the realtime kill signal (Linux/FreeBSD). The fatal is raised at the next opcode via a custom zend_interrupt_function (guarded against double installation across embedded Init/Shutdown cycles). On macOS/Windows only the VM-interrupt flag is set (CPU-bound overruns are caught; a blocking syscall already in progress cannot be unblocked). Configurable per worker via the Caddyfile `worker_timeout` directive and the WithWorkerTimeout API; defaults to 0 (disabled). Co-Authored-By: Claude Opus 4.8 (1M context) Co-Authored-By: Claude Fable 5 --- caddy/app.go | 1 + caddy/config_test.go | 59 ++++ caddy/workerconfig.go | 19 +- docs/config.md | 1 + docs/worker.md | 68 ++++ frankenphp.c | 136 +++++++- frankenphp.go | 6 + frankenphp.h | 15 + go.mod | 2 +- internal/blockio/blockio.go | 15 + internal/blockio/blockio_linux.go | 279 +++++++++++++++++ internal/blockio/blockio_linux_oldabi.go | 15 + .../blockio/blockio_linux_oldabi_other.go | 12 + .../blockio/blockio_linux_socketsyscalls.go | 13 + .../blockio_linux_socketsyscalls_386.go | 16 + internal/blockio/blockio_linux_test.go | 63 ++++ internal/blockio/blockio_other.go | 9 + internal/blockio/doc.go | 10 + options.go | 27 ++ phpmainthread.go | 2 + phpthread.go | 9 + testdata/worker-blocking-read.php | 22 ++ testdata/worker-timeout-sleep.php | 37 +++ threadworker.go | 116 +++++++ worker.go | 2 + workertimeout_test.go | 292 ++++++++++++++++++ 26 files changed, 1243 insertions(+), 3 deletions(-) create mode 100644 internal/blockio/blockio.go create mode 100644 internal/blockio/blockio_linux.go create mode 100644 internal/blockio/blockio_linux_oldabi.go create mode 100644 internal/blockio/blockio_linux_oldabi_other.go create mode 100644 internal/blockio/blockio_linux_socketsyscalls.go create mode 100644 internal/blockio/blockio_linux_socketsyscalls_386.go create mode 100644 internal/blockio/blockio_linux_test.go create mode 100644 internal/blockio/blockio_other.go create mode 100644 internal/blockio/doc.go create mode 100644 testdata/worker-blocking-read.php create mode 100644 testdata/worker-timeout-sleep.php create mode 100644 workertimeout_test.go diff --git a/caddy/app.go b/caddy/app.go index f10bf965bf..6bc21c09c3 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -164,6 +164,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithWorkerWatchMode(w.Watch), frankenphp.WithWorkerMaxFailures(w.MaxConsecutiveFailures), frankenphp.WithWorkerMaxThreads(w.MaxThreads), + frankenphp.WithWorkerTimeout(w.WorkerTimeout), frankenphp.WithWorkerRequestOptions(w.requestOptions...), ) diff --git a/caddy/config_test.go b/caddy/config_test.go index a26d065a80..4e0a32c1de 100644 --- a/caddy/config_test.go +++ b/caddy/config_test.go @@ -2,6 +2,7 @@ package caddy import ( "testing" + "time" "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" "github.com/stretchr/testify/require" @@ -35,6 +36,64 @@ func TestModuleWorkerDuplicateFilenamesFail(t *testing.T) { require.Contains(t, err.Error(), "must not have duplicate filenames", "Error message should mention duplicate filenames") } +func TestModuleWorkerTimeoutParses(t *testing.T) { + config := ` + { + php { + worker { + file ../testdata/worker-with-env.php + num 1 + worker_timeout 30s + } + } + }` + + d := caddyfile.NewTestDispenser(config) + module := &FrankenPHPModule{} + + require.NoError(t, module.UnmarshalCaddyfile(d)) + require.Len(t, module.Workers, 1) + require.Equal(t, 30*time.Second, module.Workers[0].WorkerTimeout) +} + +func TestModuleWorkerTimeoutDefaultsToZero(t *testing.T) { + config := ` + { + php { + worker { + file ../testdata/worker-with-env.php + num 1 + } + } + }` + + d := caddyfile.NewTestDispenser(config) + module := &FrankenPHPModule{} + + require.NoError(t, module.UnmarshalCaddyfile(d)) + require.Len(t, module.Workers, 1) + require.Zero(t, module.Workers[0].WorkerTimeout) +} + +func TestModuleWorkerTimeoutInvalidDurationFails(t *testing.T) { + config := ` + { + php { + worker { + file ../testdata/worker-with-env.php + worker_timeout not-a-duration + } + } + }` + + d := caddyfile.NewTestDispenser(config) + module := &FrankenPHPModule{} + + err := module.UnmarshalCaddyfile(d) + require.Error(t, err) + require.Contains(t, err.Error(), "worker_timeout must be a valid duration") +} + func TestModuleWorkersWithDifferentFilenames(t *testing.T) { // Create a test configuration with different worker filenames configWithDifferentFilenames := ` diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index c50f0d0688..4bf198a51b 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -5,6 +5,7 @@ import ( "path" "path/filepath" "strconv" + "time" "github.com/caddyserver/caddy/v2" "github.com/caddyserver/caddy/v2/caddyconfig/caddyfile" @@ -41,6 +42,8 @@ type workerConfig struct { MatchPath []string `json:"match_path,omitempty"` // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` + // WorkerTimeout sets a hard per-request timeout (e.g. 30s). A worker request running longer is interrupted so the thread can be reclaimed. 0 (default) disables it. + WorkerTimeout time.Duration `json:"worker_timeout,omitempty"` options []frankenphp.WorkerOption requestOptions []frankenphp.RequestOption @@ -145,8 +148,22 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { } wc.MaxConsecutiveFailures = v + case "worker_timeout": + if !d.NextArg() { + return wc, d.ArgErr() + } + + v, err := time.ParseDuration(d.Val()) + if err != nil { + return wc, d.Errf("worker_timeout must be a valid duration (example: 30s): %v", err) + } + if v < 0 { + return wc, d.Err("worker_timeout must be >= 0") + } + + wc.WorkerTimeout = v default: - return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v) + return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads, worker_timeout", v) } } diff --git a/docs/config.md b/docs/config.md index 5655142bbb..69b3163941 100644 --- a/docs/config.md +++ b/docs/config.md @@ -111,6 +111,7 @@ You can also explicitly configure FrankenPHP using the [global option](https://c watch # Sets the path to watch for file changes. Can be specified more than once for multiple paths. name # Sets the name of the worker, used in logs and metrics. Default: absolute path of worker file max_consecutive_failures # Sets the maximum number of consecutive failures before the worker is considered unhealthy, -1 means the worker will always restart. Default: 6. + worker_timeout # (experimental) Hard per-request timeout (e.g. 30s). A request running longer is interrupted so the worker thread can be reclaimed. Default: 0 (disabled). } } } diff --git a/docs/worker.md b/docs/worker.md index 64f278519c..3d0c2fa193 100644 --- a/docs/worker.md +++ b/docs/worker.md @@ -151,6 +151,74 @@ frankenphp { } ``` +### Request timeout (experimental) + +By default a worker thread blocked on a slow external call (a hung MySQL query, a +stuck HTTP client, a Redis call, a long `sleep()`) holds that thread until the call +returns on its own. The `worker_timeout` option sets a hard per-request timeout — +the worker-mode equivalent of PHP-FPM's `request_terminate_timeout` — after which +FrankenPHP interrupts the PHP thread so the request bails out and the worker is +reclaimed: + +```caddyfile +frankenphp { + worker { + # ... + worker_timeout 30s + } +} +``` + +When the timeout elapses, the request is aborted with a fatal error whose message +is `Worker request timeout of N second(s) exceeded`. The worker script then +restarts cleanly and serves the next request — no special userland code is +required. Note that `max_execution_time` does **not** count time spent inside a +blocking call such as a database query, which is exactly the case `worker_timeout` +is designed to cover. + +How it works (and its limits): + +- A blocking syscall (a stuck database query, a hung Redis/Elasticsearch/HTTP + read, a black-holed `connect()`) cannot be aborted by PHP's timeout flag + alone, because PHP retries the interrupted read. On **Linux**, FrankenPHP + inspects what the worker thread is blocked on and shuts down the socket(s) + involved, so the read fails and the request unwinds. Only sockets are + aborted this way (a read blocked on a file or pipe is not). It recognises: + - `read`/`recvfrom`/`recvmsg` and a blocking `connect` — the descriptor is the + syscall's first argument; + - `poll`/`ppoll` — the descriptors are read out of the poll set (PHP's stream + layer, and thus most Redis/HTTP/DB clients built on it, always poll before + reading). This is what lets a stuck `SELECT SLEEP(30)` actually stop at the + timeout instead of running to completion; + - `epoll_wait`/`epoll_pwait` — the watched descriptors are enumerated from the + epoll instance (covers clients running their own event loop, such as + `curl_multi` and gRPC). + + Every descriptor is confirmed to be a socket before it is shut down. +- A long `sleep()`/`usleep()` (no socket) is interrupted by a realtime signal on + **Linux and FreeBSD**. +- On **macOS** and **Windows**, and for a tight CPU loop inside a C extension that + swallows `EINTR`, only PHP's VM-interrupt flag is set: a CPU-bound overrun is + still caught at the next opcode boundary, but a blocking syscall already in + progress cannot be unblocked. A client blocked in a `select`-based loop (rare on + Linux, where `poll` is preferred) is likewise not aborted. +- The socket abort needs no extra privilege (all inspection is of the process + itself), but it relies on `/proc` and — for poll-based waits, the common case — + on [`process_vm_readv(2)`](https://man7.org/linux/man-pages/man2/process_vm_readv.2.html). + Docker's default seccomp profile allows this syscall on kernels ≥ 4.8 + ([moby#42083](https://github.com/moby/moby/pull/42083)); under an older or + stricter policy (gVisor, custom profiles) the call fails closed: FrankenPHP + logs a warning once and a request blocked in a poll-based socket read can then + not be aborted (sleeps and CPU-bound overruns still are). +- `worker_timeout` aborts the request hard, like `request_terminate_timeout` + does in PHP-FPM. The database server rolls back an open transaction when its + connection is shut down, and PHP's request shutdown still runs (sessions are + released as usual). But application-level sequences are not rolled back: an + e-mail already sent, a file already written or an external lock with a TTL + stay as they are. Set the timeout comfortably above your slowest legitimate + request. +- `worker_timeout` defaults to `0` (disabled). + ## Superglobals behavior [PHP superglobals](https://www.php.net/manual/language.variables.superglobals.php) (`$_SERVER`, `$_ENV`, `$_GET`...) diff --git a/frankenphp.c b/frankenphp.c index 6bc3e0c1da..bca798d1f8 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -139,8 +139,35 @@ static void frankenphp_register_atfork(void) { static void CALLBACK frankenphp_noop_apc(ULONG_PTR param) { (void)param; } #endif +/* ===== Worker request timeout (per-request hard timeout) ===== + * + * A blocking syscall (a stuck SELECT SLEEP(), a hung HTTP read, ...) cannot be + * aborted by the VM-interrupt flag alone: PHP's network layer retries EINTR, so + * the read just resumes, and a driver like mysqlnd even removes its socket from + * EG(regular_list) so it can't be found by walking the resource list. To cut + * such a request short the Go watchdog shuts down the fd the thread is blocked + * on (found via /proc//syscall); the EINTR wakes sleep-style waits. Once + * the thread runs PHP again, this custom zend_interrupt_function raises a clear + * "Worker request timeout" fatal. + * + * Per-thread state is indexed by thread_index and allocated once max_threads is + * known (frankenphp_init_worker_timeout). */ +static zend_atomic_bool *worker_timeout_pending = NULL; +static double *worker_timeout_seconds = NULL; +static int worker_timeout_max_threads = 0; +/* Saved to chain PHP's own interrupt handler (fibers, pcntl, ...). */ +static void (*frankenphp_original_interrupt)(zend_execute_data *) = NULL; + +static bool frankenphp_worker_timeout_is_pending(uintptr_t idx) { + return worker_timeout_pending != NULL && + idx < (uintptr_t)worker_timeout_max_threads && + zend_atomic_bool_load(&worker_timeout_pending[idx]); +} + #ifdef FRANKENPHP_HAS_KILL_SIGNAL -/* No-op: delivery itself is what unblocks the syscall via EINTR. */ +/* No-op: delivery itself is what unblocks an EINTR-abortable wait. The socket + * abort that handles retried blocking reads is done from Go (shutdown on the + * blocked fd), not here. */ static void frankenphp_kill_signal_handler(int sig) { (void)sig; } static pthread_once_t kill_signal_handler_installed = PTHREAD_ONCE_INIT; @@ -222,6 +249,104 @@ void frankenphp_release_thread_for_kill(force_kill_slot slot) { #endif } +/* zend_interrupt_function hook: when a worker timeout is pending for this + * thread, raise a fatal that unwinds the request with a clear message. Any + * exception left over from the aborted I/O (e.g. a mysqli connection error + * caused by the socket shutdown) is dropped so our message is what surfaces. + * E_ERROR triggers a bailout, so the original handler is not chained in that + * case; otherwise we chain it. */ +static void frankenphp_timeout_interrupt(zend_execute_data *execute_data) { + if (is_worker_thread && frankenphp_worker_timeout_is_pending(thread_index)) { + zend_atomic_bool_store(&worker_timeout_pending[thread_index], false); + if (EG(exception)) { + zend_clear_exception(); + } + zend_error_noreturn(E_ERROR, "Worker request timeout of %g second(s) exceeded", + worker_timeout_seconds[thread_index]); + } + + if (frankenphp_original_interrupt != NULL) { + frankenphp_original_interrupt(execute_data); + } +} + +/* Installed on the main thread after SAPI startup. php_main can run more than + * once per process (Init/Shutdown cycles when embedding, and in the test + * suite) and zend_interrupt_function survives a SAPI shutdown, so guard + * against saving ourselves as the "original" handler - the chain call would + * recurse forever the first time the hook fired without a pending timeout. */ +static void frankenphp_install_timeout_interrupt(void) { + if (zend_interrupt_function == frankenphp_timeout_interrupt) { + return; + } + frankenphp_original_interrupt = zend_interrupt_function; + zend_interrupt_function = frankenphp_timeout_interrupt; +} + +/* Allocate per-thread timeout state once max_threads is known. Called from Go + * alongside frankenphp_init_thread_metrics. */ +void frankenphp_init_worker_timeout(int max_threads) { + worker_timeout_max_threads = max_threads; + worker_timeout_pending = calloc(max_threads, sizeof(zend_atomic_bool)); + worker_timeout_seconds = calloc(max_threads, sizeof(double)); +} + +void frankenphp_destroy_worker_timeout(void) { + free(worker_timeout_pending); + worker_timeout_pending = NULL; + free(worker_timeout_seconds); + worker_timeout_seconds = NULL; + worker_timeout_max_threads = 0; +} + +/* Arm the timeout for a thread that has overrun its worker_timeout: record the + * limit (for the message) and set the per-thread flag + VM interrupt so the + * interrupt hook fires the moment the thread next runs PHP. No wakeup yet - the + * caller first shuts down the blocked fd (so the message isn't pre-empted by the + * driver's own connection error), then calls frankenphp_wake_worker_thread. */ +void frankenphp_arm_worker_timeout(uintptr_t thread_index_arg, force_kill_slot slot, + double timeout_seconds) { + if (slot.vm_interrupt == NULL || + thread_index_arg >= (uintptr_t)worker_timeout_max_threads || + worker_timeout_pending == NULL || worker_timeout_seconds == NULL) { + return; + } + + worker_timeout_seconds[thread_index_arg] = timeout_seconds; + zend_atomic_bool_store(&worker_timeout_pending[thread_index_arg], true); + zend_atomic_bool_store(slot.vm_interrupt, true); +} + +/* Wake a thread parked in an EINTR-abortable wait (sleep, usleep) so it returns + * and reaches the VM interrupt. Socket reads are handled by the fd shutdown done + * before this call; this is the fallback for waits that have no fd. Safe on a + * thread that has already gone away (zeroed slot). */ +void frankenphp_wake_worker_thread(force_kill_slot slot) { + if (slot.vm_interrupt == NULL) { + return; + } +#ifdef FRANKENPHP_HAS_KILL_SIGNAL + if (zend_atomic_bool_load(&kill_signal_handler_active)) { + pthread_kill(slot.tid, FRANKENPHP_KILL_SIGNAL); + } +#elif defined(PHP_WIN32) + if (slot.thread_handle != NULL) { + CancelSynchronousIo(slot.thread_handle); + QueueUserAPC((PAPCFUNC)frankenphp_noop_apc, slot.thread_handle, 0); + } +#endif +} + +/* Clear a (possibly stale) pending flag at the start of a worker request so a + * watchdog that raced request completion cannot abort the next request. */ +void frankenphp_clear_worker_timeout(uintptr_t thread_index_arg) { + if (worker_timeout_pending == NULL || + thread_index_arg >= (uintptr_t)worker_timeout_max_threads) { + return; + } + zend_atomic_bool_store(&worker_timeout_pending[thread_index_arg], false); +} + void frankenphp_update_local_thread_context(bool is_worker) { is_worker_thread = is_worker; @@ -1269,6 +1394,12 @@ static void *php_thread(void *arg) { * grace period can wake it from a busy PHP loop or blocking syscall. */ frankenphp_register_thread_for_kill(thread_index); +#ifdef __linux__ + /* Publish the kernel thread id so the worker-timeout watchdog can locate the + * fd this thread blocks on (via /proc//syscall) and shut it down. */ + go_frankenphp_store_thread_tid(thread_index, (int)gettid()); +#endif + bool thread_is_healthy = true; bool has_attempted_shutdown = false; @@ -1467,6 +1598,9 @@ static void *php_main(void *arg) { frankenphp_sapi_module.startup(&frankenphp_sapi_module); + /* Hook the VM interrupt so worker_timeout can raise its own fatal. */ + frankenphp_install_timeout_interrupt(); + /* check if a default filter is set in php.ini and only filter if * it is, this is deprecated and will be removed in PHP 9 */ char *default_filter; diff --git a/frankenphp.go b/frankenphp.go index 52246d01c7..de48646319 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -37,6 +37,8 @@ import ( "unsafe" // debug on Linux //_ "github.com/ianlancetaylor/cgosymbolizer" + + "github.com/dunglas/frankenphp/internal/blockio" ) type contextKeyStruct struct{} @@ -269,6 +271,10 @@ func Init(options ...Option) error { opt.logger = nil } + // Let the worker-timeout watchdog report (once) when the platform denies + // the syscalls it needs to abort a blocked socket read. + blockio.SetLogger(globalLogger) + globalMu.Unlock() if opt.metrics != nil { diff --git a/frankenphp.h b/frankenphp.h index 31df007f18..baf74f72b5 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -226,6 +226,21 @@ size_t frankenphp_get_thread_memory_usage(uintptr_t thread_index); void frankenphp_force_kill_thread(force_kill_slot slot); void frankenphp_release_thread_for_kill(force_kill_slot slot); +/* Per-request worker timeout. When a request overruns its worker_timeout, the + * Go watchdog calls frankenphp_arm_worker_timeout (set the per-thread pending + * flag + VM interrupt so a "Worker request timeout" fatal is raised at the + * next opcode boundary), then shuts down the socket fd(s) the thread is + * blocked on (so a blocked DB/HTTP read fails instead of retrying EINTR), and + * finally calls frankenphp_wake_worker_thread to wake EINTR-abortable waits + * such as sleep. The init/destroy pair allocates the per-thread state; clear + * resets a stale flag at request start. */ +void frankenphp_init_worker_timeout(int max_threads); +void frankenphp_destroy_worker_timeout(void); +void frankenphp_arm_worker_timeout(uintptr_t thread_index, force_kill_slot slot, + double timeout_seconds); +void frankenphp_wake_worker_thread(force_kill_slot slot); +void frankenphp_clear_worker_timeout(uintptr_t thread_index); + void register_extensions(zend_module_entry **m, int len); #endif diff --git a/go.mod b/go.mod index 696d5b5445..7e6bb13bbe 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/prometheus/client_golang v1.23.2 github.com/stretchr/testify v1.11.1 golang.org/x/net v0.55.0 + golang.org/x/sys v0.45.0 ) require ( @@ -61,7 +62,6 @@ require ( go.opentelemetry.io/otel/trace v1.44.0 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.52.0 // indirect - golang.org/x/sys v0.45.0 // indirect golang.org/x/text v0.37.0 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/internal/blockio/blockio.go b/internal/blockio/blockio.go new file mode 100644 index 0000000000..75ecb82e0d --- /dev/null +++ b/internal/blockio/blockio.go @@ -0,0 +1,15 @@ +package blockio + +import "log/slog" + +// logger, when set, is used to report (once) that the platform denies a +// syscall Abort relies on, so a silently degraded worker_timeout (e.g. under +// a seccomp policy that blocks process_vm_readv) is visible in the logs. +var logger *slog.Logger + +// SetLogger sets the logger used for the one-time degradation warning. +// Optional: without it, degradation stays silent. Not safe for concurrent use +// with Abort; call it during initialization. +func SetLogger(l *slog.Logger) { + logger = l +} diff --git a/internal/blockio/blockio_linux.go b/internal/blockio/blockio_linux.go new file mode 100644 index 0000000000..8289281d95 --- /dev/null +++ b/internal/blockio/blockio_linux.go @@ -0,0 +1,279 @@ +//go:build linux + +package blockio + +import ( + "bufio" + "context" + "encoding/binary" + "errors" + "log/slog" + "os" + "strconv" + "strings" + "sync" + "syscall" + + "golang.org/x/sys/unix" +) + +// maxWatchedFDs bounds how many descriptors we will read out of a poll set or an +// epoll instance, so a misread count can never make us scan or shut down an +// unreasonable number of fds. It is generous on purpose: a client doing a +// dual-stack connect or a small fan-out poll legitimately watches a handful of +// fds (this is NOT clamped to 1 - that would only fit mysqlnd's single-socket +// poll and miss curl, gRPC and friends). +const maxWatchedFDs = 256 + +// Abort best-effort interrupts the blocking I/O the given kernel thread is +// parked in, by shutting down the socket fd(s) so the syscall returns +// terminally instead of being retried after the EINTR our wake-up delivers. This +// is what lets worker_timeout actually cut short a request stuck in an external +// call - a slow DB query (mysqlnd), a hung Redis/Elasticsearch/HTTP read, a +// black-holed connect - whose socket is not reachable via PHP's resource list. +// +// It reads /proc/self/task//syscall to learn the blocked syscall and its +// arguments and handles, in order of how PHP actually blocks: +// +// - read / recvfrom / recvmsg / connect: the fd is the first argument. +// - poll / ppoll: arg0 points to a struct pollfd array (PHP's stream layer, +// and thus Redis/HTTP/DB clients riding on it, always poll before recv); we +// read the array out of the process's own memory with process_vm_readv(2). +// Both syscalls must be matched: glibc and musl implement poll() via the +// dedicated poll syscall on arches that have one (e.g. amd64) and via ppoll +// only where they don't (e.g. arm64). +// - epoll_wait / epoll_pwait: arg0 is the epoll fd; the watched fds are not in +// the syscall arguments at all, so we enumerate them from +// /proc/self/fdinfo/ (covers curl_multi, gRPC and other own-loop +// clients). +// +// Every fd is confirmed to be a socket before shutdown, and after recovering a +// pointer-derived fd we re-read /proc/.../syscall to confirm the thread is still +// parked in the same syscall on the same argument before acting - so a stale +// pointer or a reused fd cannot make us shut down an unrelated descriptor. No-op +// if the thread is not in a recognised blocking syscall (CPU-bound overruns are +// handled by the VM interrupt instead). +func Abort(tid int) { + if tid <= 0 { + return + } + + nr, args, ok := readBlockingSyscall(tid) + if !ok { + return + } + + // switch on booleans (not constant labels): sysConnect/sysRecvfrom/sysRecvmsg, + // sysPoll and sysEpollWait collapse to the same impossible value on arches that + // lack the corresponding syscall, which would be a duplicate case in a value + // switch. + switch { + case nr == syscall.SYS_READ || nr == sysRecvfrom || nr == sysRecvmsg || nr == sysConnect: + // arg0 is the fd. + fd := int(int32(args[0])) + if isSocketFD(fd) && stillBlockedIn(tid, nr, args[0]) { + shutdownSocket(fd) + } + case nr == sysPoll || nr == syscall.SYS_PPOLL: + // arg0 -> struct pollfd array, arg1 = nfds (identical layout for both). + // poll exists only on the older ABIs (e.g. amd64); sysPoll is an + // impossible value elsewhere, leaving this branch ppoll-only there. + shutdownPollFDs(tid, nr, args[0], args[1]) + case nr == sysEpollWait || nr == syscall.SYS_EPOLL_PWAIT: + // arg0 is the epoll fd. Like poll, epoll_wait exists only on the older + // ABIs; sysEpollWait is an impossible value elsewhere, leaving this + // branch epoll_pwait-only there. + shutdownEpollFDs(tid, nr, args[0]) + } +} + +// readBlockingSyscall reads /proc/self/task//syscall and returns the syscall +// number and its arguments. It reports ok == false when the thread is not +// currently in a syscall ("running", or a negative number). +func readBlockingSyscall(tid int) (nr int64, args [6]uint64, ok bool) { + data, err := os.ReadFile("/proc/self/task/" + strconv.Itoa(tid) + "/syscall") + if err != nil { + return 0, args, false + } + + fields := strings.Fields(string(data)) + if len(fields) < 1 { + return 0, args, false + } + + // fields[0] is the syscall number ("running"/"-1" when not in a syscall), + // followed by the (hex) arguments. + nr, err = strconv.ParseInt(fields[0], 10, 64) + if err != nil || nr < 0 { + return 0, args, false + } + + for i := 0; i < len(args) && i+1 < len(fields); i++ { + args[i] = parseSyscallArg(fields[i+1]) + } + + return nr, args, true +} + +// stillBlockedIn re-reads the thread's syscall and reports whether it is still +// parked in the same syscall on the same first argument. We call this after +// recovering an fd from a pointer (or from a process-wide fd table) and right +// before shutting it down, to shrink the window in which the thread could have +// returned and the fd been reused for something else. +func stillBlockedIn(tid int, nr int64, arg0 uint64) bool { + nr2, args2, ok := readBlockingSyscall(tid) + + return ok && nr2 == nr && args2[0] == arg0 +} + +func parseSyscallArg(s string) uint64 { + v, err := strconv.ParseUint(s, 0, 64) // base 0 handles the 0x prefix + + if err != nil { + return 0 + } + + return v +} + +// shutdownPollFDs reads nfds struct pollfd entries from the process's own memory +// at ptr and shuts down each socket fd. struct pollfd is { int fd; short events; +// short revents; } = 8 bytes, fd at offset 0, little-endian on the supported +// arches. +func shutdownPollFDs(tid int, nr int64, ptr, nfds uint64) { + if nfds == 0 || nfds > maxWatchedFDs { + return + } + + buf := make([]byte, nfds*8) + if !readProcessMemory(uintptr(ptr), buf) { + return + } + + // Confirm the thread is still parked in this poll on the same array before we + // trust the bytes we just read and start shutting fds down. + if !stillBlockedIn(tid, nr, ptr) { + return + } + + for i := uint64(0); i < nfds; i++ { + fd := int(int32(binary.LittleEndian.Uint32(buf[i*8:]))) + if isSocketFD(fd) { + shutdownSocket(fd) + } + } +} + +// shutdownEpollFDs enumerates the descriptors registered in the epoll instance +// epfd (from /proc/self/fdinfo/, which lists "tfd: " entries) and shuts +// down those that are sockets. This needs no memory read: the watched fds live +// inside the kernel epoll object, not in the syscall arguments. +func shutdownEpollFDs(tid int, nr int64, epfd uint64) { + fd := int(int32(epfd)) + fds := epollMonitoredFDs(fd) + if len(fds) == 0 || !stillBlockedIn(tid, nr, epfd) { + return + } + + for _, watched := range fds { + if isSocketFD(watched) { + shutdownSocket(watched) + } + } +} + +func epollMonitoredFDs(epfd int) []int { + if epfd < 0 { + return nil + } + + f, err := os.Open("/proc/self/fdinfo/" + strconv.Itoa(epfd)) + if err != nil { + return nil + } + defer f.Close() + + var fds []int + sc := bufio.NewScanner(f) + for sc.Scan() { + // Lines look like: "tfd: 5 events: 19 data: ..." + fields := strings.Fields(sc.Text()) + if len(fields) < 2 || fields[0] != "tfd:" { + continue + } + if fd, err := strconv.Atoi(fields[1]); err == nil { + fds = append(fds, fd) + } + if len(fds) >= maxWatchedFDs { + break + } + } + + return fds +} + +// readProcessMemory copies len(buf) bytes from our own address space at +// remoteAddr into buf using process_vm_readv(2). We read from this process (all +// threads share the address space), so no ptrace privilege is required; the call +// simply fails closed under a seccomp policy that blocks it (and the +// degradation is reported once, see reportPolicyError). +func readProcessMemory(remoteAddr uintptr, buf []byte) bool { + if len(buf) == 0 { + return false + } + + local := unix.Iovec{Base: &buf[0]} + local.SetLen(len(buf)) + remote := unix.RemoteIovec{Base: remoteAddr, Len: len(buf)} + + n, err := unix.ProcessVMReadv(os.Getpid(), []unix.Iovec{local}, []unix.RemoteIovec{remote}, 0) + if err != nil { + reportPolicyError(err) + + return false + } + + return n == len(buf) +} + +// policyOnce guards the one-time warning emitted when the platform denies +// process_vm_readv altogether. +var policyOnce sync.Once + +// reportPolicyError logs (once) when process_vm_readv is denied by policy +// rather than failing transiently: under a seccomp profile that blocks the +// syscall (Docker's default profile only allows it on kernels >= 4.8, and +// gVisor or stricter custom profiles may not at all) the poll-set path of the +// watchdog is unavailable, so a request blocked in a poll-based socket read +// cannot be aborted by worker_timeout. Transient errors (ESRCH when the +// thread returned, EFAULT on a stale pointer) are expected and stay silent. +func reportPolicyError(err error) { + if !errors.Is(err, unix.EPERM) && !errors.Is(err, unix.EACCES) && !errors.Is(err, unix.ENOSYS) { + return + } + + policyOnce.Do(func() { + if logger != nil { + logger.LogAttrs(context.Background(), slog.LevelWarn, + "worker_timeout: process_vm_readv is denied (seccomp policy?); a request blocked in a poll-based socket read cannot be aborted", + slog.Any("error", err), + ) + } + }) +} + +// isSocketFD reports whether fd is a socket, so a misclassified argument can +// never make us shut down an unrelated descriptor (a file, a pipe, ...). +func isSocketFD(fd int) bool { + if fd < 0 { + return false + } + + target, err := os.Readlink("/proc/self/fd/" + strconv.Itoa(fd)) + + return err == nil && strings.HasPrefix(target, "socket:") +} + +func shutdownSocket(fd int) { + _ = syscall.Shutdown(fd, syscall.SHUT_RDWR) +} diff --git a/internal/blockio/blockio_linux_oldabi.go b/internal/blockio/blockio_linux_oldabi.go new file mode 100644 index 0000000000..0ab6d7e6ed --- /dev/null +++ b/internal/blockio/blockio_linux_oldabi.go @@ -0,0 +1,15 @@ +//go:build linux && !arm64 && !riscv64 && !loong64 + +package blockio + +import "syscall" + +// Older Linux ABIs ship dedicated poll and epoll_wait syscalls next to +// ppoll/epoll_pwait, and the libc prefers them: glibc and musl implement +// poll() via SYS_POLL and epoll_wait() via SYS_EPOLL_WAIT on these arches +// (e.g. amd64, 386, arm), so the watchdog must match them — a PHP thread +// parked in a stream poll on amd64 sits in SYS_POLL, not SYS_PPOLL. +const ( + sysPoll int64 = syscall.SYS_POLL + sysEpollWait int64 = syscall.SYS_EPOLL_WAIT +) diff --git a/internal/blockio/blockio_linux_oldabi_other.go b/internal/blockio/blockio_linux_oldabi_other.go new file mode 100644 index 0000000000..9cc1301e46 --- /dev/null +++ b/internal/blockio/blockio_linux_oldabi_other.go @@ -0,0 +1,12 @@ +//go:build linux && (arm64 || riscv64 || loong64) + +package blockio + +// These newer ABIs never had dedicated poll/epoll_wait syscalls: the libc +// implements poll() via ppoll and epoll_wait() via epoll_pwait, which +// Abort matches directly. Impossible values keep the dedicated +// branches dead; readBlockingSyscall never returns a negative number. +const ( + sysPoll int64 = -1 + sysEpollWait int64 = -1 +) diff --git a/internal/blockio/blockio_linux_socketsyscalls.go b/internal/blockio/blockio_linux_socketsyscalls.go new file mode 100644 index 0000000000..2b2ad68e51 --- /dev/null +++ b/internal/blockio/blockio_linux_socketsyscalls.go @@ -0,0 +1,13 @@ +//go:build linux && !386 + +package blockio + +import "syscall" + +// connect/recvfrom/recvmsg are individual syscalls on these arches, so the fd a +// thread blocks on is the syscall's first argument. +const ( + sysConnect int64 = syscall.SYS_CONNECT + sysRecvfrom int64 = syscall.SYS_RECVFROM + sysRecvmsg int64 = syscall.SYS_RECVMSG +) diff --git a/internal/blockio/blockio_linux_socketsyscalls_386.go b/internal/blockio/blockio_linux_socketsyscalls_386.go new file mode 100644 index 0000000000..3280c2ff02 --- /dev/null +++ b/internal/blockio/blockio_linux_socketsyscalls_386.go @@ -0,0 +1,16 @@ +//go:build linux && 386 + +package blockio + +// linux/386 multiplexes socket operations through socketcall(2), so connect, +// recvfrom and recvmsg are not distinct syscalls and Go's syscall package does +// not define their numbers. Set them to an impossible value so those branches +// never match; readBlockingSyscall never returns a negative number. This loses +// little in practice: PHP's stream layer (and the DB/Redis/HTTP clients built on +// it) always polls before reading, so the poll/ppoll path still aborts blocking +// reads on 386. +const ( + sysConnect int64 = -1 + sysRecvfrom int64 = -1 + sysRecvmsg int64 = -1 +) diff --git a/internal/blockio/blockio_linux_test.go b/internal/blockio/blockio_linux_test.go new file mode 100644 index 0000000000..44d371aabd --- /dev/null +++ b/internal/blockio/blockio_linux_test.go @@ -0,0 +1,63 @@ +//go:build linux + +package blockio + +import ( + "os" + "syscall" + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestReadProcessMemory verifies the process_vm_readv path reads our own memory +// back faithfully - this is what recovers the struct pollfd array for the ppoll +// case (replacing the /proc//mem file read). +func TestReadProcessMemory(t *testing.T) { + want := []byte("frankenphp-process_vm_readv-roundtrip") + got := make([]byte, len(want)) + + require.True(t, readProcessMemory(uintptr(unsafe.Pointer(&want[0])), got)) + assert.Equal(t, want, got) + + // A zero-length read is rejected (nothing to shut down, no &buf[0]). + assert.False(t, readProcessMemory(uintptr(unsafe.Pointer(&want[0])), nil)) +} + +// TestIsSocketFD verifies we only ever classify real sockets as shutdown +// targets, so a misread argument can never close a file or pipe. +func TestIsSocketFD(t *testing.T) { + s, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0) + require.NoError(t, err) + defer syscall.Close(s) + assert.True(t, isSocketFD(s), "a socket fd must be recognised") + + f, err := os.CreateTemp(t.TempDir(), "notasocket") + require.NoError(t, err) + defer f.Close() + assert.False(t, isSocketFD(int(f.Fd())), "a regular file must not be a socket") + + assert.False(t, isSocketFD(-1), "an invalid fd must not be a socket") +} + +// TestEpollMonitoredFDs verifies we can enumerate the descriptors registered in +// an epoll instance from /proc/self/fdinfo/ - the basis for aborting +// curl_multi/gRPC-style clients parked in epoll_wait, with no memory read. +func TestEpollMonitoredFDs(t *testing.T) { + epfd, err := syscall.EpollCreate1(0) + require.NoError(t, err) + defer syscall.Close(epfd) + + s, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0) + require.NoError(t, err) + defer syscall.Close(s) + + ev := syscall.EpollEvent{Events: syscall.EPOLLIN, Fd: int32(s)} + require.NoError(t, syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, s, &ev)) + + assert.Contains(t, epollMonitoredFDs(epfd), s, + "the registered socket must be discovered via fdinfo") + assert.Empty(t, epollMonitoredFDs(-1)) +} diff --git a/internal/blockio/blockio_other.go b/internal/blockio/blockio_other.go new file mode 100644 index 0000000000..469006037b --- /dev/null +++ b/internal/blockio/blockio_other.go @@ -0,0 +1,9 @@ +//go:build !linux + +package blockio + +// Abort is a no-op off Linux: it relies on /proc//syscall to +// find the fd a thread is blocked on. Elsewhere worker_timeout falls back to the +// VM interrupt plus the wake-up signal, which catches CPU-bound and +// EINTR-abortable waits but cannot unblock a socket read already in progress. +func Abort(tid int) {} diff --git a/internal/blockio/doc.go b/internal/blockio/doc.go new file mode 100644 index 0000000000..a30138873e --- /dev/null +++ b/internal/blockio/doc.go @@ -0,0 +1,10 @@ +// Package blockio interrupts the blocking I/O a kernel thread is parked in. +// +// On Linux, Abort inspects /proc/self/task//syscall to find the socket +// fd(s) the thread is blocked on (directly, through a poll set or through an +// epoll instance) and shuts them down, so the syscall fails terminally instead +// of being retried after an EINTR. The worker_timeout watchdog uses this to +// cut short requests stuck in an external call - a slow DB query, a hung +// Redis/HTTP read - whose socket is not reachable via PHP's resource list. +// On other platforms Abort is a no-op. +package blockio diff --git a/options.go b/options.go index a9cd2a2630..e11758274c 100644 --- a/options.go +++ b/options.go @@ -45,6 +45,7 @@ type workerOpt struct { requestOptions []RequestOption watch []string maxConsecutiveFailures int + workerTimeout time.Duration extensionWorkers *extensionWorkers onThreadReady func(int) onThreadShutdown func(int) @@ -224,6 +225,32 @@ func WithWorkerMaxFailures(maxFailures int) WorkerOption { } } +// EXPERIMENTAL: WithWorkerTimeout sets a hard per-request timeout for the worker +// (0 = disabled, the default). When a worker request runs longer than the +// timeout, FrankenPHP aborts it with a "Worker request timeout of N second(s) +// exceeded" fatal and the worker script restarts cleanly, ready for the next +// request. This is the worker-mode equivalent of PHP-FPM's +// request_terminate_timeout, and unlike max_execution_time it also covers time +// spent inside a blocking call such as a slow database query. +// +// On Linux, a request blocked in a socket read (a stuck DB query, a hung HTTP +// read) is aborted by shutting down the file descriptor it is parked on, so the +// read fails and the request unwinds. A long sleep is woken by a realtime signal +// on Linux/FreeBSD. On macOS/Windows, and for tight CPU loops in extensions that +// swallow EINTR, only the VM-interrupt flag is set: CPU-bound overruns are still +// caught at the next opcode boundary, but a blocking syscall already in progress +// cannot be unblocked. +func WithWorkerTimeout(timeout time.Duration) WorkerOption { + return func(w *workerOpt) error { + if timeout < 0 { + return fmt.Errorf("worker timeout must be >= 0, got %s", timeout) + } + w.workerTimeout = timeout + + return nil + } +} + func WithWorkerOnReady(f func(int)) WorkerOption { return func(w *workerOpt) error { w.onThreadReady = f diff --git a/phpmainthread.go b/phpmainthread.go index b892d52f19..a2b90847e2 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -57,6 +57,7 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) // Must follow start(): maxThreads is only final once // setAutomaticMaxThreads runs on the main PHP thread (before Ready). C.frankenphp_init_thread_metrics(C.int(mainThread.maxThreads)) + C.frankenphp_init_worker_timeout(C.int(mainThread.maxThreads)) // initialize all other threads phpThreads = make([]*phpThread, mainThread.maxThreads) @@ -107,6 +108,7 @@ func drainPHPThreads() { mainThread.state.Set(state.Done) mainThread.state.WaitFor(state.Reserved) C.frankenphp_destroy_thread_metrics() + C.frankenphp_destroy_worker_timeout() phpThreads = nil } diff --git a/phpthread.go b/phpthread.go index 5d94bd62f6..7857836fc2 100644 --- a/phpthread.go +++ b/phpthread.go @@ -26,6 +26,10 @@ type phpThread struct { contextMu sync.RWMutex state *state.ThreadState requestCount atomic.Int64 + // kernelTID is the Linux kernel thread id (gettid), published by the C + // thread at boot. Used by the worker-timeout watchdog to find the fd the + // thread is blocked on via /proc. 0 on platforms without it. + kernelTID atomic.Int64 // forceKill holds &EG() pointers captured on the PHP thread itself. // forceKillMu pairs with go_frankenphp_clear_force_kill_slot's write // lock so a concurrent kill never dereferences pointers freed by @@ -259,6 +263,11 @@ func go_frankenphp_clear_force_kill_slot(threadIndex C.uintptr_t) { thread.forceKillMu.Unlock() } +//export go_frankenphp_store_thread_tid +func go_frankenphp_store_thread_tid(threadIndex C.uintptr_t, tid C.int) { + phpThreads[threadIndex].kernelTID.Store(int64(tid)) +} + //export go_frankenphp_on_thread_shutdown func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] diff --git a/testdata/worker-blocking-read.php b/testdata/worker-blocking-read.php new file mode 100644 index 0000000000..50deac35f2 --- /dev/null +++ b/testdata/worker-blocking-read.php @@ -0,0 +1,22 @@ + 0) { + sleep($seconds); + } + + echo "instance:$instance,count:$count,completed"; +}; + +do { + $ret = \frankenphp_handle_request($fn); +} while ($ret); diff --git a/threadworker.go b/threadworker.go index 21e1034805..40071a1c78 100644 --- a/threadworker.go +++ b/threadworker.go @@ -7,9 +7,11 @@ import ( "fmt" "log/slog" "path/filepath" + "sync" "time" "unsafe" + "github.com/dunglas/frankenphp/internal/blockio" "github.com/dunglas/frankenphp/internal/state" ) @@ -27,6 +29,11 @@ type workerThread struct { isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet failureCount int // number of consecutive startup failures requestCount int // number of requests handled since last restart + + // per-request timeout watchdog (worker.workerTimeout > 0) + timeoutMu sync.Mutex + requestTimer *time.Timer + requestEpoch uint64 // bumped on every request finish to invalidate a stale watchdog } func convertToWorkerThread(thread *phpThread, worker *worker) { @@ -134,6 +141,12 @@ func setupWorkerScript(handler *workerThread, worker *worker) { func tearDownWorkerScript(handler *workerThread, exitStatus int) { worker := handler.worker + + // Stop any pending request-timeout watchdog. On a fatal error (including a + // timeout-induced bailout) go_frankenphp_finish_worker_request is skipped, + // so this is the crash-path counterpart to the cancel done there. + handler.cancelRequestTimeout() + handler.dummyFrankenPHPContext = nil handler.dummyContext = nil @@ -277,9 +290,108 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { } } + handler.armRequestTimeout() + return true, handler.workerFrankenPHPContext.handlerParameters } +// armRequestTimeout starts a watchdog for the request that is about to execute. +// If the request runs longer than worker.workerTimeout, the watchdog arms the +// per-thread timeout flag + VM interrupt (so a "Worker request timeout" fatal +// is raised at the next opcode boundary), on Linux shuts down the socket fd(s) +// the thread is blocked on (so a blocked DB/HTTP read returns instead of +// retrying EINTR) and wakes EINTR-abortable waits (sleep) via the realtime kill +// signal (Linux/FreeBSD). The worker script then restarts. +// +// A timeout of 0 disables the watchdog. On platforms without a realtime kill +// signal (macOS, Windows non-alertable waits) a blocking syscall already in +// progress cannot be unblocked; only the VM-interrupt flag is set, which is +// honored at the next opcode boundary (so CPU-bound overruns are still caught). +func (handler *workerThread) armRequestTimeout() { + timeout := handler.worker.workerTimeout + if timeout <= 0 { + return + } + + thread := handler.thread + + // Reset any stale pending flag from a previous request whose watchdog raced + // completion, so it can't abort this one. Runs on the PHP thread, and always + // after such a stale watchdog has finished: cancelRequestTimeout (called on + // the same PHP thread when the previous request ended) blocks on timeoutMu + // until a mid-flight watchdog body has run to completion. + C.frankenphp_clear_worker_timeout(C.uintptr_t(thread.threadIndex)) + + handler.timeoutMu.Lock() + epoch := handler.requestEpoch + handler.requestTimer = time.AfterFunc(timeout, func() { + // timeoutMu is held for the entire interrupt sequence so the watchdog + // cannot interleave with its request finishing: cancelRequestTimeout + // (and therefore the next request's arm + pending-flag clear) waits + // until this body is done. Checking the epoch under the same mutex + // makes a watchdog whose request already finished a strict no-op - it + // can never arm the timeout for (or shut down the sockets of) a request + // it wasn't armed for, and it can never touch the per-thread C state + // after a teardown's cancelRequestTimeout returned. + handler.timeoutMu.Lock() + defer handler.timeoutMu.Unlock() + + if handler.requestEpoch != epoch { + return + } + + // Only interrupt a thread that is actively handling a request. Any + // other state means the thread is yielding, restarting, rebooting or + // shutting down on its own and its force-kill slot may already be + // cleared (frankenphp_force_kill_thread is still safe on a zeroed slot). + if !handler.state.Is(state.Ready) { + return + } + + if globalLogger.Enabled(globalCtx, slog.LevelWarn) { + globalLogger.LogAttrs(globalCtx, slog.LevelWarn, "worker request timeout, interrupting thread", + slog.String("worker", handler.worker.name), + slog.Int("thread", thread.threadIndex), + slog.Duration("timeout", timeout), + ) + } + + thread.forceKillMu.RLock() + // 1. Arm: set the pending flag + VM interrupt so the interrupt hook + // raises our fatal as soon as the thread runs PHP again. Done before + // any wakeup so the driver's own I/O error can't pre-empt the message. + C.frankenphp_arm_worker_timeout( + C.uintptr_t(thread.threadIndex), + thread.forceKill, + C.double(timeout.Seconds()), + ) + // 2. Abort the fd the thread is blocked on (e.g. a mysqlnd socket that + // isn't reachable via the resource list) so a retried blocking read + // fails terminally instead of resuming. + blockio.Abort(int(thread.kernelTID.Load())) + // 3. Wake EINTR-abortable waits (sleep) that have no fd to shut down. + C.frankenphp_wake_worker_thread(thread.forceKill) + thread.forceKillMu.RUnlock() + }) + handler.timeoutMu.Unlock() +} + +// cancelRequestTimeout stops the watchdog armed by armRequestTimeout and bumps +// the request epoch so a watchdog whose timer already fired but has not yet taken +// timeoutMu becomes a no-op. Because the watchdog body holds timeoutMu for its +// whole run, this call also blocks until a mid-flight watchdog has finished - +// after it returns, the watchdog can no longer interrupt the thread or touch +// the per-thread C state. Safe to call when no watchdog is armed. +func (handler *workerThread) cancelRequestTimeout() { + handler.timeoutMu.Lock() + handler.requestEpoch++ + if handler.requestTimer != nil { + handler.requestTimer.Stop() + handler.requestTimer = nil + } + handler.timeoutMu.Unlock() +} + // go_frankenphp_worker_handle_request_start is called at the start of every php request served. // //export go_frankenphp_worker_handle_request_start @@ -310,6 +422,10 @@ func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) (C.bool, //export go_frankenphp_finish_worker_request func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t, retval *C.zval) { thread := phpThreads[threadIndex] + + // the request completed normally: disarm the timeout watchdog + thread.handler.(*workerThread).cancelRequestTimeout() + ctx := thread.context() fc := ctx.Value(contextKey).(*frankenPHPContext) diff --git a/worker.go b/worker.go index fdc3098da6..5435cc9499 100644 --- a/worker.go +++ b/worker.go @@ -30,6 +30,7 @@ type worker struct { threadMutex sync.RWMutex allowPathMatching bool maxConsecutiveFailures int + workerTimeout time.Duration onThreadReady func(int) onThreadShutdown func(int) queuedRequests atomic.Int32 @@ -146,6 +147,7 @@ func newWorker(o workerOpt) (*worker, error) { threads: make([]*phpThread, 0, o.num), allowPathMatching: allowPathMatching, maxConsecutiveFailures: o.maxConsecutiveFailures, + workerTimeout: o.workerTimeout, onThreadReady: o.onThreadReady, onThreadShutdown: o.onThreadShutdown, } diff --git a/workertimeout_test.go b/workertimeout_test.go new file mode 100644 index 0000000000..4050f249ae --- /dev/null +++ b/workertimeout_test.go @@ -0,0 +1,292 @@ +package frankenphp + +import ( + "bytes" + "log/slog" + "net" + "net/http/httptest" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const timeoutTestScript = "worker-timeout-sleep.php" + +// requiresKillSignal skips tests that rely on interrupting a blocking syscall +// (sleep). Only Linux/FreeBSD ship the realtime kill signal used by the +// force-kill primitive; elsewhere the watchdog can only set the VM-interrupt +// flag, which never unblocks sleep(). +func requiresKillSignal(t *testing.T) { + t.Helper() + if runtime.GOOS != "linux" && runtime.GOOS != "freebsd" { + t.Skipf("worker timeout cannot interrupt blocking syscalls on %s", runtime.GOOS) + } +} + +// lockedBuffer is a goroutine-safe io.Writer for capturing logs emitted from +// the watchdog goroutine. +type lockedBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (b *lockedBuffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + + return b.buf.Write(p) +} + +func (b *lockedBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + + return b.buf.String() +} + +func initTimeoutWorker(t *testing.T, timeout time.Duration, numThreads, numWorkers int, logger *slog.Logger) string { + t.Helper() + + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + + opts := []Option{ + WithNumThreads(numThreads), + WithWorkers("timeout-worker", testDataDir+timeoutTestScript, numWorkers, WithWorkerTimeout(timeout)), + } + if logger != nil { + opts = append(opts, WithLogger(logger)) + } + + require.NoError(t, Init(opts...)) + t.Cleanup(Shutdown) + + return testDataDir +} + +// serveTimeoutRequest issues one request to the timeout worker. It is safe to +// call from a non-test goroutine (it never calls t.FailNow via require). +func serveTimeoutRequest(testDataDir, marker string, sleepSeconds int) (*httptest.ResponseRecorder, error) { + req := httptest.NewRequest("GET", "http://example.com/"+timeoutTestScript, nil) + if marker != "" { + req.Header.Set("Sleep-Marker", marker) + } + req.Header.Set("Sleep-Seconds", strconv.Itoa(sleepSeconds)) + + fr, err := NewRequestWithContext(req, WithRequestDocumentRoot(testDataDir, false)) + if err != nil { + return nil, err + } + + rec := httptest.NewRecorder() + + return rec, ServeHTTP(rec, fr) +} + +func newTimeoutTestLogger() (*slog.Logger, *lockedBuffer) { + buf := &lockedBuffer{} + + return slog.New(slog.NewTextHandler(buf, &slog.HandlerOptions{Level: slog.LevelDebug})), buf +} + +// TestWorkerTimeout_InterruptsSlowRequest spins up a worker with a 1s timeout, +// sends a request whose handler sleeps far longer, and asserts the request is +// interrupted well within budget and that the worker recovers afterwards. +func TestWorkerTimeout_InterruptsSlowRequest(t *testing.T) { + requiresKillSignal(t) + + logger, buf := newTimeoutTestLogger() + testDataDir := initTimeoutWorker(t, time.Second, 2, 1, logger) + + // Per-run marker the worker touches right before sleep(), so we only start + // the budget once the worker is provably parked in sleep(). + marker := filepath.Join(t.TempDir(), "in-sleep") + + var rec *httptest.ResponseRecorder + done := make(chan struct{}) + go func() { + defer close(done) + rec, _ = serveTimeoutRequest(testDataDir, marker, 60) + }() + + require.Eventually(t, func() bool { + _, err := os.Stat(marker) + return err == nil + }, 5*time.Second, 10*time.Millisecond, "worker never entered sleep()") + + // Timeout (1s) + slack for signal dispatch, VM tick and the restart loop. + const budget = 5 * time.Second + select { + case <-done: + case <-time.After(budget): + t.Fatal("request was not interrupted within the worker timeout budget") + } + + assert.NotContains(t, rec.Body.String(), "completed", + "interrupted request must not have produced the post-sleep output") + assert.Contains(t, buf.String(), "worker request timeout", + "watchdog should have logged the interruption") + + // The worker must restart and serve the next request normally. + require.Eventually(t, func() bool { + rec2, _ := serveTimeoutRequest(testDataDir, "", 0) + return rec2.Code == 200 && strings.Contains(rec2.Body.String(), "completed") + }, 5*time.Second, 50*time.Millisecond, "worker did not recover after timeout") +} + +// TestWorkerTimeout_InterruptsBlockingSocketRead verifies the watchdog aborts a +// request blocked in a socket read - the case that the VM-interrupt flag alone +// cannot handle and that the fd shutdown exists for. The worker connects to a +// listener that accepts but never replies, so fread() parks in ppoll exactly +// like a slow DB query. +func TestWorkerTimeout_InterruptsBlockingSocketRead(t *testing.T) { + requiresKillSignal(t) + + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer ln.Close() + + var ( + mu sync.Mutex + conns []net.Conn + ) + go func() { + for { + c, err := ln.Accept() + if err != nil { + return + } + mu.Lock() + conns = append(conns, c) // hold open, never write + mu.Unlock() + } + }() + defer func() { + mu.Lock() + for _, c := range conns { + _ = c.Close() + } + mu.Unlock() + }() + + logger, buf := newTimeoutTestLogger() + cwd, _ := os.Getwd() + testDataDir := cwd + "/testdata/" + require.NoError(t, Init( + WithNumThreads(2), + WithLogger(logger), + WithWorkers("sock-worker", testDataDir+"worker-blocking-read.php", 1, WithWorkerTimeout(time.Second)), + )) + t.Cleanup(Shutdown) + + req := httptest.NewRequest("GET", "http://example.com/worker-blocking-read.php", nil) + req.Header.Set("Upstream-Addr", ln.Addr().String()) + fr, err := NewRequestWithContext(req, WithRequestDocumentRoot(testDataDir, false)) + require.NoError(t, err) + rec := httptest.NewRecorder() + + start := time.Now() + done := make(chan struct{}) + go func() { + defer close(done) + _ = ServeHTTP(rec, fr) + }() + + select { + case <-done: + case <-time.After(6 * time.Second): + t.Fatal("blocking socket read was not aborted by the worker timeout") + } + + assert.Less(t, time.Since(start), 6*time.Second) + assert.Contains(t, buf.String(), "worker request timeout") + assert.NotContains(t, rec.Body.String(), "read returned", + "fread should have been interrupted, not returned") +} + +// TestWorkerTimeout_DoesNotFireOnFastRequest verifies the watchdog is cancelled +// for a request that finishes before the timeout and never interrupts the thread. +func TestWorkerTimeout_DoesNotFireOnFastRequest(t *testing.T) { + logger, buf := newTimeoutTestLogger() + testDataDir := initTimeoutWorker(t, 5*time.Second, 2, 1, logger) + + rec, err := serveTimeoutRequest(testDataDir, "", 0) + require.NoError(t, err) + assert.Equal(t, 200, rec.Code) + assert.Contains(t, rec.Body.String(), "completed") + + // A cancelled watchdog must never fire, even given a moment to misbehave. + time.Sleep(200 * time.Millisecond) + assert.NotContains(t, buf.String(), "worker request timeout") +} + +// TestWorkerTimeout_Disabled verifies that WorkerTimeout = 0 disables the +// watchdog: a slow request runs to natural completion, uninterrupted. +func TestWorkerTimeout_Disabled(t *testing.T) { + logger, buf := newTimeoutTestLogger() + testDataDir := initTimeoutWorker(t, 0, 2, 1, logger) + + start := time.Now() + rec, err := serveTimeoutRequest(testDataDir, "", 1) + require.NoError(t, err) + + assert.GreaterOrEqual(t, time.Since(start), time.Second, "request must have slept its full duration") + assert.Equal(t, 200, rec.Code) + assert.Contains(t, rec.Body.String(), "completed") + assert.NotContains(t, buf.String(), "worker request timeout") +} + +// TestWorkerTimeout_PoolDoesNotCrossSignals fires one stuck request per thread +// in a pool and asserts every request is interrupted within budget. If a signal +// were delivered to the wrong thread, at least one request would hang past the +// budget (and another thread would be killed twice), so the all-interrupted +// outcome proves each thread received its own interrupt. +func TestWorkerTimeout_PoolDoesNotCrossSignals(t *testing.T) { + requiresKillSignal(t) + + const pool = 5 + testDataDir := initTimeoutWorker(t, time.Second, pool+1, pool, nil) + + var wg sync.WaitGroup + results := make([]*httptest.ResponseRecorder, pool) + for i := 0; i < pool; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + results[i], _ = serveTimeoutRequest(testDataDir, "", 60) + }(i) + } + + completed := make(chan struct{}) + go func() { + wg.Wait() + close(completed) + }() + + // Timeout (1s) + generous slack for 5 concurrent restarts. + const budget = 8 * time.Second + select { + case <-completed: + case <-time.After(budget): + t.Fatal("not all stuck requests were interrupted; a signal may have hit the wrong thread") + } + + for i := 0; i < pool; i++ { + assert.NotContains(t, results[i].Body.String(), "completed", + "request %d completed instead of being interrupted", i) + } + + // The whole pool must recover and serve fast follow-up requests. + require.Eventually(t, func() bool { + rec, _ := serveTimeoutRequest(testDataDir, "", 0) + return rec.Code == 200 && strings.Contains(rec.Body.String(), "completed") + }, 5*time.Second, 50*time.Millisecond, "pool did not recover after concurrent timeouts") +}