From 72efa30cd1b974a182c3e90d1d3ce365d309e0b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Fri, 12 Jun 2026 17:47:55 +0200 Subject: [PATCH 1/3] libev reactor: Defer socket close until after watchers stop `close` can be called from anywhere, not only reactor threads. If such `close` call closes socket during `handle_write` / `handle_read`, then those functions may try to operate on closed socket. Solution implemented in this commit: defer socket closing until both watchers are stopped. --- cassandra/io/libevreactor.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index 3da809931f..f3b0541834 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -124,6 +124,7 @@ def _cleanup(self): for watcher in (conn._write_watcher, conn._read_watcher): if watcher: watcher.stop() + conn._socket.close() self.notify() # wake the timer watcher @@ -221,6 +222,8 @@ def _loop_will_run(self, prepare): conn._read_watcher.stop() # clear reference cycles from IO callback del conn._read_watcher + conn._socket.close() + log.debug("Closed socket to %s", conn.endpoint) changed = True @@ -233,7 +236,7 @@ def _loop_will_run(self, prepare): def _atexit_cleanup(): """Cleanup function called by atexit that uses the current _global_loop value. - + This wrapper ensures that cleanup receives the actual LibevLoop instance instead of None, which was the value of _global_loop when the module was imported. @@ -308,8 +311,6 @@ def close(self): log.debug("Closing connection (%s) to %s", id(self), self.endpoint) _global_loop.connection_destroyed(self) - self._socket.close() - log.debug("Closed socket to %s", self.endpoint) # don't leave in-progress operations hanging if not self.is_defunct: From 0ec3fd31893cbb9fee6fa9127efc640539ea400e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Fri, 12 Jun 2026 17:50:24 +0200 Subject: [PATCH 2/3] libev reactor: Return from watchers for closed connection Previous commit defered socket close until watchers are stopped, but there is one more case worth considering. If during one libev loop iteration socket gets ready for both read and write, then both watchers will be called. If one decides to close the connection, the other one will still get called anyway. This shouldn't cause EBADF, because socket won't be closed yet, but I see no reason to perform unnecessary work. --- cassandra/io/libevreactor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cassandra/io/libevreactor.py b/cassandra/io/libevreactor.py index f3b0541834..6cceb6c6bc 100644 --- a/cassandra/io/libevreactor.py +++ b/cassandra/io/libevreactor.py @@ -321,6 +321,8 @@ def close(self): self.connected_event.set() def handle_write(self, watcher, revents, errno=None): + if self.is_closed: + return if revents & libev.EV_ERROR: if errno: exc = IOError(errno, os.strerror(errno)) @@ -362,6 +364,8 @@ def handle_write(self, watcher, revents, errno=None): return def handle_read(self, watcher, revents, errno=None): + if self.is_closed: + return if revents & libev.EV_ERROR: if errno: exc = IOError(errno, os.strerror(errno)) From 7a9211fdccf964a921ca0b5ac6186f1db2af8150 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karol=20Bary=C5=82a?= Date: Fri, 12 Jun 2026 17:54:37 +0200 Subject: [PATCH 3/3] factory: raise on closed connections When connection is closed by the server, but there is no other error, it will be close (is_cloes == True) without setting `last_error`. This is true for all reactors apart from Twisted as far as I can tell. If we try to use such connection, we'll quickly discover that its broken, but we can slightly optimize this process by raising directly from factory(). --- cassandra/connection.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cassandra/connection.py b/cassandra/connection.py index f07160e385..eae018649b 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -986,6 +986,8 @@ def factory(cls, endpoint, timeout, host_conn = None, *args, **kwargs): conn.close() raise OperationTimedOut("Timed out creating connection (%s seconds)" % timeout, timeout=timeout) + elif conn.is_closed: + raise ConnectionShutdown("Connection to %s was closed by server" % conn.endpoint) else: return conn