diff --git a/api/main.py b/api/main.py index 27a39548..8c7f2d7f 100644 --- a/api/main.py +++ b/api/main.py @@ -86,6 +86,11 @@ SUBSCRIPTION_CLEANUP_INTERVAL_MINUTES = 15 # How often to run cleanup task SUBSCRIPTION_MAX_AGE_MINUTES = 15 # Max age before stale SUBSCRIPTION_CLEANUP_RETRY_MINUTES = 1 # Retry interval if cleanup fails +NODE_PURGE_INTERVAL_HOURS = 24 # How often to purge old nodes +NODE_PURGE_STARTUP_DELAY_MINUTES = 10 # Delay before first purge after start +NODE_PURGE_RETRY_HOURS = 1 # Retry interval if purge fails +NODE_PURGE_AGE_DAYS = int(os.getenv("NODE_PURGE_AGE_DAYS", "180")) +NODE_PURGE_BATCH_SIZE = 10000 DEFAULT_MONGO_SERVICE = "mongodb://db:27017" @@ -169,9 +174,31 @@ async def subscription_cleanup_task(): await asyncio.sleep(SUBSCRIPTION_CLEANUP_RETRY_MINUTES * 60) +async def node_purge_task(): + """Background task to purge old nodes once per day""" + await asyncio.sleep(NODE_PURGE_STARTUP_DELAY_MINUTES * 60) + while True: + try: + result = await purge_old_nodes( + age_days=NODE_PURGE_AGE_DAYS, + batch_size=NODE_PURGE_BATCH_SIZE, + ) + metrics.add("nodes_purged", result["deleted"]) + await asyncio.sleep(NODE_PURGE_INTERVAL_HOURS * 3600) + except ( + ConnectionError, + OSError, + RuntimeError, + pymongo.errors.PyMongoError, + ) as e: + print(f"Node purge error: {e}") + await asyncio.sleep(NODE_PURGE_RETRY_HOURS * 3600) + + async def start_background_tasks(): """Start background cleanup tasks""" asyncio.create_task(subscription_cleanup_task()) + asyncio.create_task(node_purge_task()) async def create_indexes(): @@ -2281,23 +2308,6 @@ async def get_metrics(): return PlainTextResponse(response) -@app.get("/maintenance/purge-old-nodes") -async def purge_handler( - current_user: User = Depends(get_current_superuser), - days: int = 180, - batch_size: int = 1000, -): - """Purge old nodes from the database - This is a maintenance operation and should be performed - only by superusers. - Accepts GET parameters: - - days: Number of days to keep nodes, default is 180. - - batch_size: Number of nodes to delete in one batch, default is 1000. - """ - metrics.add("http_requests_total", 1) - return await purge_old_nodes(age_days=days, batch_size=batch_size) - - # Build versioned app using include_router instead of fastapi-versioning. # VersionedFastAPI (fastapi-versioning==0.10.0) is incompatible with # FastAPI >= 0.118.0: it transplants route objects by directly appending diff --git a/api/maintenance.py b/api/maintenance.py index 925b83c5..4c7f572f 100644 --- a/api/maintenance.py +++ b/api/maintenance.py @@ -10,67 +10,71 @@ functions to purge old nodes from the database and manage MongoDB connections. """ +import asyncio import datetime import os +import time -from pymongo import MongoClient +from pymongo import AsyncMongoClient DEFAULT_MONGO_SERVICE = "mongodb://db:27017" +# Node kinds that must never be purged, whatever their age +PURGE_EXCLUDED_KINDS = ["checkout"] +# How often to report purge progress, in seconds +PURGE_PROGRESS_INTERVAL = 30 +# Pause between deletion batches to let other queries run +PURGE_BATCH_PAUSE = 1 -def purge_ids(db, collection, ids): - """ - Delete documents from the specified collection in the database - by their IDs. - - Args: - db: The MongoDB database instance. - collection (str): The name of the collection to purge from. - ids (list): List of document IDs to delete. - """ - print("Purging", len(ids), "from", collection) - db[collection].delete_many({"_id": {"$in": ids}}) - - -def connect_to_db(): - """ - Connect to the MongoDB database using the MONGO_SERVICE environment - variable, with a default fallback. - - Returns: - db: The 'kernelci' MongoDB database instance. - """ - mongo_service = os.getenv("MONGO_SERVICE", DEFAULT_MONGO_SERVICE) - client = MongoClient(mongo_service) - db = client["kernelci"] - return db - - -async def purge_old_nodes(age_days=180, batch_size=1000): +async def purge_old_nodes(age_days=180, batch_size=10000): """ Purge nodes from the 'node' collection that are older than the - specified number of days. + specified number of days, except the kinds listed in + PURGE_EXCLUDED_KINDS. Args: age_days (int, optional): The age in days to use as the - threshold for deletion. - Defaults to 180. + threshold for deletion. Defaults to 180. + batch_size (int, optional): Number of nodes to delete in one + batch. Defaults to 10000. """ date_end = datetime.datetime.today() - datetime.timedelta(days=age_days) - db = connect_to_db() - nodes = db["node"].find({"created": {"$lt": date_end}}) - # We need to delete node in chunks of {batch_size} - # to not block the main thread for too long + mongo_service = os.getenv("MONGO_SERVICE", DEFAULT_MONGO_SERVICE) + client = AsyncMongoClient(mongo_service) + collection = client["kernelci"]["node"] + node_filter = { + "created": {"$lt": date_end}, + "kind": {"$nin": PURGE_EXCLUDED_KINDS}, + } + total = await collection.count_documents(node_filter) + print( + f"Node purge started: {total} nodes older than {age_days} days " + f"(created before {date_end.isoformat()})" + ) + started = time.monotonic() + last_report = started deleted = 0 - del_batch = [] - for node in nodes: - del_batch.append(node["_id"]) - if len(del_batch) == batch_size: - deleted += len(del_batch) - purge_ids(db, "node", del_batch) - del_batch = [] - if del_batch: - deleted += len(del_batch) - purge_ids(db, "node", del_batch) - db = {"response": "ok", "deleted": deleted, "age_days": age_days} - return db + # Paginate on _id (indexed) as there is no index on 'created' + query = dict(node_filter) + while True: + batch = ( + await collection.find(query, {"_id": 1}) + .sort("_id", 1) + .limit(batch_size) + .to_list(batch_size) + ) + if not batch: + break + ids = [doc["_id"] for doc in batch] + result = await collection.delete_many({"_id": {"$in": ids}}) + deleted += result.deleted_count + query["_id"] = {"$gt": ids[-1]} + now = time.monotonic() + if now - last_report >= PURGE_PROGRESS_INTERVAL: + print(f"Node purge progress: {deleted}/{total} nodes deleted") + last_report = now + await asyncio.sleep(PURGE_BATCH_PAUSE) + elapsed = round(time.monotonic() - started) + print(f"Node purge finished: {deleted} nodes deleted in {elapsed}s") + await client.close() + return {"response": "ok", "deleted": deleted, "age_days": age_days} diff --git a/api/static/js/viewer.js b/api/static/js/viewer.js index a1ca6f39..b2a3b1a5 100644 --- a/api/static/js/viewer.js +++ b/api/static/js/viewer.js @@ -635,11 +635,51 @@ const ViewerApp = (() => { }); } + /** + * Fetch all nodes matching a query, paginating through every page + * + * The /nodes endpoint returns documents in insertion order without + * sorting, so a single truncated page only covers the oldest part of + * the queried window. Fetching all pages avoids silently dropping + * recent nodes. + * + * @param {string} query - Query string without limit/offset, + * e.g. "kind=checkout&created__gt=..." + * @returns {Promise>} All matching nodes + */ + async function fetchAllNodes(query) { + const pageSize = 1000; + const maxPages = 20; + const items = []; + + for (let page = 0; page < maxPages; page++) { + const offset = page * pageSize; + const url = `${state.apiUrl}/latest/nodes?${query}&limit=${pageSize}&offset=${offset}`; + console.log('Fetching nodes:', url); + + const response = await fetch(url); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + const data = await response.json(); + items.push(...data.items); + + if (data.items.length === 0 || items.length >= data.total) { + break; + } + } + + return items; + } + /** * Fetch available tree+branch combinations from the API * * Strategy: - * 1. Query recent kbuilds (last 4 weeks) + * 1. Query recent checkouts (last 4 weeks). Checkouts are used instead + * of kbuilds as there is one per revision rather than dozens of + * builds. * 2. Extract unique tree+branch combinations from data.kernel_revision * 3. Sort alphabetically by "tree/branch" format * @@ -653,21 +693,15 @@ const ViewerApp = (() => { fourWeeksAgo.setDate(fourWeeksAgo.getDate() - 28); const dateStr = fourWeeksAgo.toISOString().split('.')[0]; - // Query kbuilds from last 4 weeks to find active tree/branch combinations - const url = `${state.apiUrl}/latest/nodes?kind=kbuild&created__gt=${dateStr}&limit=1000`; - console.log('Fetching tree/branch combinations from:', url); - - const response = await fetch(url); - if (!response.ok) { - throw new Error(`HTTP ${response.status}: ${response.statusText}`); - } - - const data = await response.json(); + // Query checkouts from last 4 weeks to find active tree/branch combinations + const checkouts = await fetchAllNodes( + `kind=checkout&created__gt=${dateStr}` + ); // Extract unique tree+branch combinations // Use a Map to deduplicate by "tree/branch" key const treeBranchMap = new Map(); - data.items.forEach(item => { + checkouts.forEach(item => { const tree = item.data?.kernel_revision?.tree; const branch = item.data?.kernel_revision?.branch; @@ -721,22 +755,18 @@ const ViewerApp = (() => { // Fetch all kbuilds for this tree+branch in the date range // IMPORTANT: Filter by BOTH tree AND branch - const url = `${state.apiUrl}/latest/nodes?kind=kbuild&data.kernel_revision.tree=${treeBranch.tree}&data.kernel_revision.branch=${treeBranch.branch}&created__gt=${dateStr}&limit=1000`; - console.log('Fetching kbuilds:', url); - - const response = await fetch(url); - if (!response.ok) { - throw new Error(`HTTP ${response.status}: ${response.statusText}`); - } - - const data = await response.json(); + const kbuilds = await fetchAllNodes( + `kind=kbuild&data.kernel_revision.tree=${treeBranch.tree}` + + `&data.kernel_revision.branch=${treeBranch.branch}` + + `&created__gt=${dateStr}` + ); hideModal(); const displayName = `${treeBranch.tree}/${treeBranch.branch}`; - console.log(`Fetched ${data.items.length} kbuilds for ${displayName}`); + console.log(`Fetched ${kbuilds.length} kbuilds for ${displayName}`); // Process and display the matrix - displayKbuildMatrix(data.items, displayName); + displayKbuildMatrix(kbuilds, displayName); } catch (error) { hideModal();