Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 27 additions & 17 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
SUBSCRIPTION_CLEANUP_INTERVAL_MINUTES = 15 # How often to run cleanup task
SUBSCRIPTION_MAX_AGE_MINUTES = 15 # Max age before stale
SUBSCRIPTION_CLEANUP_RETRY_MINUTES = 1 # Retry interval if cleanup fails
NODE_PURGE_INTERVAL_HOURS = 24 # How often to purge old nodes
NODE_PURGE_STARTUP_DELAY_MINUTES = 10 # Delay before first purge after start
NODE_PURGE_RETRY_HOURS = 1 # Retry interval if purge fails
NODE_PURGE_AGE_DAYS = int(os.getenv("NODE_PURGE_AGE_DAYS", "180"))
NODE_PURGE_BATCH_SIZE = 10000
DEFAULT_MONGO_SERVICE = "mongodb://db:27017"


Expand Down Expand Up @@ -169,9 +174,31 @@ async def subscription_cleanup_task():
await asyncio.sleep(SUBSCRIPTION_CLEANUP_RETRY_MINUTES * 60)


async def node_purge_task():
"""Background task to purge old nodes once per day"""
await asyncio.sleep(NODE_PURGE_STARTUP_DELAY_MINUTES * 60)
while True:
try:
result = await purge_old_nodes(
age_days=NODE_PURGE_AGE_DAYS,
batch_size=NODE_PURGE_BATCH_SIZE,
)
metrics.add("nodes_purged", result["deleted"])
await asyncio.sleep(NODE_PURGE_INTERVAL_HOURS * 3600)
except (
ConnectionError,
OSError,
RuntimeError,
pymongo.errors.PyMongoError,
) as e:
print(f"Node purge error: {e}")
await asyncio.sleep(NODE_PURGE_RETRY_HOURS * 3600)


async def start_background_tasks():
"""Start background cleanup tasks"""
asyncio.create_task(subscription_cleanup_task())
asyncio.create_task(node_purge_task())


async def create_indexes():
Expand Down Expand Up @@ -2281,23 +2308,6 @@ async def get_metrics():
return PlainTextResponse(response)


@app.get("/maintenance/purge-old-nodes")
async def purge_handler(
current_user: User = Depends(get_current_superuser),
days: int = 180,
batch_size: int = 1000,
):
"""Purge old nodes from the database
This is a maintenance operation and should be performed
only by superusers.
Accepts GET parameters:
- days: Number of days to keep nodes, default is 180.
- batch_size: Number of nodes to delete in one batch, default is 1000.
"""
metrics.add("http_requests_total", 1)
return await purge_old_nodes(age_days=days, batch_size=batch_size)


# Build versioned app using include_router instead of fastapi-versioning.
# VersionedFastAPI (fastapi-versioning==0.10.0) is incompatible with
# FastAPI >= 0.118.0: it transplants route objects by directly appending
Expand Down
102 changes: 53 additions & 49 deletions api/maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,67 +10,71 @@
functions to purge old nodes from the database and manage MongoDB connections.
"""

import asyncio
import datetime
import os
import time

from pymongo import MongoClient
from pymongo import AsyncMongoClient

DEFAULT_MONGO_SERVICE = "mongodb://db:27017"
# Node kinds that must never be purged, whatever their age
PURGE_EXCLUDED_KINDS = ["checkout"]
# How often to report purge progress, in seconds
PURGE_PROGRESS_INTERVAL = 30
# Pause between deletion batches to let other queries run
PURGE_BATCH_PAUSE = 1


def purge_ids(db, collection, ids):
"""
Delete documents from the specified collection in the database
by their IDs.

Args:
db: The MongoDB database instance.
collection (str): The name of the collection to purge from.
ids (list): List of document IDs to delete.
"""
print("Purging", len(ids), "from", collection)
db[collection].delete_many({"_id": {"$in": ids}})


def connect_to_db():
"""
Connect to the MongoDB database using the MONGO_SERVICE environment
variable, with a default fallback.

Returns:
db: The 'kernelci' MongoDB database instance.
"""
mongo_service = os.getenv("MONGO_SERVICE", DEFAULT_MONGO_SERVICE)
client = MongoClient(mongo_service)
db = client["kernelci"]
return db


async def purge_old_nodes(age_days=180, batch_size=1000):
async def purge_old_nodes(age_days=180, batch_size=10000):
"""
Purge nodes from the 'node' collection that are older than the
specified number of days.
specified number of days, except the kinds listed in
PURGE_EXCLUDED_KINDS.

Args:
age_days (int, optional): The age in days to use as the
threshold for deletion.
Defaults to 180.
threshold for deletion. Defaults to 180.
batch_size (int, optional): Number of nodes to delete in one
batch. Defaults to 10000.
"""
date_end = datetime.datetime.today() - datetime.timedelta(days=age_days)
db = connect_to_db()
nodes = db["node"].find({"created": {"$lt": date_end}})
# We need to delete node in chunks of {batch_size}
# to not block the main thread for too long
mongo_service = os.getenv("MONGO_SERVICE", DEFAULT_MONGO_SERVICE)
client = AsyncMongoClient(mongo_service)
collection = client["kernelci"]["node"]
node_filter = {
"created": {"$lt": date_end},
"kind": {"$nin": PURGE_EXCLUDED_KINDS},
}
total = await collection.count_documents(node_filter)
print(
f"Node purge started: {total} nodes older than {age_days} days "
f"(created before {date_end.isoformat()})"
)
started = time.monotonic()
last_report = started
deleted = 0
del_batch = []
for node in nodes:
del_batch.append(node["_id"])
if len(del_batch) == batch_size:
deleted += len(del_batch)
purge_ids(db, "node", del_batch)
del_batch = []
if del_batch:
deleted += len(del_batch)
purge_ids(db, "node", del_batch)
db = {"response": "ok", "deleted": deleted, "age_days": age_days}
return db
# Paginate on _id (indexed) as there is no index on 'created'
query = dict(node_filter)
while True:
batch = (
await collection.find(query, {"_id": 1})
.sort("_id", 1)
.limit(batch_size)
.to_list(batch_size)
)
if not batch:
break
ids = [doc["_id"] for doc in batch]
result = await collection.delete_many({"_id": {"$in": ids}})
deleted += result.deleted_count
query["_id"] = {"$gt": ids[-1]}
now = time.monotonic()
if now - last_report >= PURGE_PROGRESS_INTERVAL:
print(f"Node purge progress: {deleted}/{total} nodes deleted")
last_report = now
await asyncio.sleep(PURGE_BATCH_PAUSE)
elapsed = round(time.monotonic() - started)
print(f"Node purge finished: {deleted} nodes deleted in {elapsed}s")
await client.close()
return {"response": "ok", "deleted": deleted, "age_days": age_days}
76 changes: 53 additions & 23 deletions api/static/js/viewer.js
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,51 @@ const ViewerApp = (() => {
});
}

/**
* Fetch all nodes matching a query, paginating through every page
*
* The /nodes endpoint returns documents in insertion order without
* sorting, so a single truncated page only covers the oldest part of
* the queried window. Fetching all pages avoids silently dropping
* recent nodes.
*
* @param {string} query - Query string without limit/offset,
* e.g. "kind=checkout&created__gt=..."
* @returns {Promise<Array<Object>>} All matching nodes
*/
async function fetchAllNodes(query) {
const pageSize = 1000;
const maxPages = 20;
const items = [];

for (let page = 0; page < maxPages; page++) {
const offset = page * pageSize;
const url = `${state.apiUrl}/latest/nodes?${query}&limit=${pageSize}&offset=${offset}`;
console.log('Fetching nodes:', url);

const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

const data = await response.json();
items.push(...data.items);

if (data.items.length === 0 || items.length >= data.total) {
break;
}
}

return items;
}

/**
* Fetch available tree+branch combinations from the API
*
* Strategy:
* 1. Query recent kbuilds (last 4 weeks)
* 1. Query recent checkouts (last 4 weeks). Checkouts are used instead
* of kbuilds as there is one per revision rather than dozens of
* builds.
* 2. Extract unique tree+branch combinations from data.kernel_revision
* 3. Sort alphabetically by "tree/branch" format
*
Expand All @@ -653,21 +693,15 @@ const ViewerApp = (() => {
fourWeeksAgo.setDate(fourWeeksAgo.getDate() - 28);
const dateStr = fourWeeksAgo.toISOString().split('.')[0];

// Query kbuilds from last 4 weeks to find active tree/branch combinations
const url = `${state.apiUrl}/latest/nodes?kind=kbuild&created__gt=${dateStr}&limit=1000`;
console.log('Fetching tree/branch combinations from:', url);

const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

const data = await response.json();
// Query checkouts from last 4 weeks to find active tree/branch combinations
const checkouts = await fetchAllNodes(
`kind=checkout&created__gt=${dateStr}`
);

// Extract unique tree+branch combinations
// Use a Map to deduplicate by "tree/branch" key
const treeBranchMap = new Map();
data.items.forEach(item => {
checkouts.forEach(item => {
const tree = item.data?.kernel_revision?.tree;
const branch = item.data?.kernel_revision?.branch;

Expand Down Expand Up @@ -721,22 +755,18 @@ const ViewerApp = (() => {

// Fetch all kbuilds for this tree+branch in the date range
// IMPORTANT: Filter by BOTH tree AND branch
const url = `${state.apiUrl}/latest/nodes?kind=kbuild&data.kernel_revision.tree=${treeBranch.tree}&data.kernel_revision.branch=${treeBranch.branch}&created__gt=${dateStr}&limit=1000`;
console.log('Fetching kbuilds:', url);

const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}

const data = await response.json();
const kbuilds = await fetchAllNodes(
`kind=kbuild&data.kernel_revision.tree=${treeBranch.tree}` +
`&data.kernel_revision.branch=${treeBranch.branch}` +
`&created__gt=${dateStr}`
);
hideModal();

const displayName = `${treeBranch.tree}/${treeBranch.branch}`;
console.log(`Fetched ${data.items.length} kbuilds for ${displayName}`);
console.log(`Fetched ${kbuilds.length} kbuilds for ${displayName}`);

// Process and display the matrix
displayKbuildMatrix(data.items, displayName);
displayKbuildMatrix(kbuilds, displayName);

} catch (error) {
hideModal();
Expand Down