Skip to content
Draft
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 2.4.8

### Added: opt-in streaming log channel via `--upload-logs`

- New `--upload-logs` flag (default off). When set, each CLI invocation registers a run, reports a per-run status (`in_progress` / `success` / `failure` / `cancelled`), and uploads a transcript of its own log output to the Socket backend for that run, visible in the Socket admin views. The transcript is captured regardless of the local `--enable-debug` state; the existing terminal verbosity is unchanged.
- New `--no-upload-logs` flag (mutually exclusive with `--upload-logs`) explicitly opts the run out of uploading logs, even when an org-level override would otherwise enable it. Use this when you need a guaranteed no-upload guarantee (e.g. legal/consent reasons).
- The Socket backend can also force-enable streaming for specific orgs in the absence of an explicit opt-out. The feature is best-effort — registration or upload failures silently degrade and never block the scan.

## 2.4.7

### Changed: pin @coana-tech/cli version; auto-update is now opt-in
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "hatchling.build"

[project]
name = "socketsecurity"
version = "2.4.7"
version = "2.4.8"
requires-python = ">= 3.11"
license = {"file" = "LICENSE"}
dependencies = [
Expand Down
2 changes: 1 addition & 1 deletion socketsecurity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__author__ = 'socket.dev'
__version__ = '2.4.7'
__version__ = '2.4.8'
USER_AGENT = f'SocketPythonCLI/{__version__}'
36 changes: 36 additions & 0 deletions socketsecurity/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ class CliConfig:
ignore_commit_files: bool = False
disable_blocking: bool = False
disable_ignore: bool = False
upload_logs: bool = False
decline_logs: bool = False
strict_blocking: bool = False
integration_type: IntegrationType = "api"
integration_org_slug: Optional[str] = None
Expand Down Expand Up @@ -212,6 +214,9 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig':

args = parser.parse_args(args_list)

if args.upload_logs and args.decline_logs:
parser.error("--upload-logs and --no-upload-logs are mutually exclusive")

if args.reach_exclude_paths:
logging.warning(
"--reach-exclude-paths is deprecated; use --exclude-paths instead. "
Expand Down Expand Up @@ -282,6 +287,8 @@ def from_args(cls, args_list: Optional[List[str]] = None) -> 'CliConfig':
'ignore_commit_files': args.ignore_commit_files,
'disable_blocking': args.disable_blocking,
'disable_ignore': args.disable_ignore,
'upload_logs': args.upload_logs,
'decline_logs': args.decline_logs,
'strict_blocking': args.strict_blocking,
'integration_type': args.integration,
'pending_head': args.pending_head,
Expand Down Expand Up @@ -866,6 +873,35 @@ def create_argument_parser() -> argparse.ArgumentParser:
action="store_true",
help=argparse.SUPPRESS
)
advanced_group.add_argument(
"--upload-logs",
dest="upload_logs",
action="store_true",
help="Upload the CLI's log output to the Socket backend for this run. "
"When set, the CLI registers the run with share_logs=true and streams "
"its log records in 5s batches. Default off. Mutually exclusive with "
"--no-upload-logs."
)
advanced_group.add_argument(
"--upload_logs",
dest="upload_logs",
action="store_true",
help=argparse.SUPPRESS
)
advanced_group.add_argument(
"--no-upload-logs",
dest="decline_logs",
action="store_true",
help="Explicitly opt out of uploading CLI logs to the Socket backend, even "
"when an org-level override would otherwise enable it. Mutually "
"exclusive with --upload-logs."
)
advanced_group.add_argument(
"--no_upload_logs",
dest="decline_logs",
action="store_true",
help=argparse.SUPPRESS
)
advanced_group.add_argument(
"--strict-blocking",
dest="strict_blocking",
Expand Down
78 changes: 78 additions & 0 deletions socketsecurity/core/cli_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Lifecycle helpers for a CLI run on the Socket backend.

A "run" represents a single CLI invocation. `register_cli_run` opens it and
returns a server-issued `run_id` when streaming is enabled; `finalize_cli_run`
closes it on exit. The run_id keys the rows that `BatchedLogUploader` POSTs to
`/python-cli-runs/<run_id>/logs` during the run so the dashboard can show
what the user saw in their terminal.

Streaming is opt-in via the `share_logs` field on register. The server may
also force-enable streaming for an org regardless of the client's request,
so the CLI always calls register and gates on the response's
`log_streaming_enabled` flag rather than the client's intent.

Both calls are best-effort: failures fall back to no-streaming and never
prevent the scan from running.
"""

import json
import logging
from typing import Optional

from .cli_client import CliClient
from .exceptions import APIFailure

log = logging.getLogger("socketcli")


def register_cli_run(
client: CliClient,
client_version: str,
share_logs: bool,
decline_logs: bool,
) -> Optional[str]:
try:
resp = client.request(
path="python-cli-runs",
method="POST",
payload=json.dumps({
"client_version": client_version,
"share_logs": share_logs,
"decline_logs": decline_logs,
}),
)
except APIFailure as e:
log.debug(f"cli-run register failed (streaming disabled): {e}")
return None

try:
body = resp.json()
except (ValueError, json.JSONDecodeError) as e:
log.debug(f"cli-run register: bad JSON body: {e}")
return None

if not body.get("log_streaming_enabled"):
log.debug("cli-run register: log streaming not enabled by server")
return None

run_id = body.get("run_id")
if not isinstance(run_id, str) or not run_id:
log.debug(f"cli-run register: enabled but missing run_id in response: {body!r}")
return None
return run_id


def finalize_cli_run(
client: CliClient,
run_id: str,
status: str = "success",
report_run_id: Optional[str] = None,
) -> None:
try:
client.request(
path=f"python-cli-runs/{run_id}/finalize",
method="POST",
payload=json.dumps({"status": status, "report_run_id": report_run_id}),
)
except Exception as e:
log.debug(f"cli-run finalize failed (swallowed): {e}")
122 changes: 122 additions & 0 deletions socketsecurity/core/log_uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
"""Buffer the CLI's local log records and POST them in batches to
/python-cli-runs/<run_id>/logs so the dashboard's view of a CLI run
mirrors what the user sees in their terminal.

Behavior:
- daemon thread, 5s flush
- swallow all network errors (debug log only)
- skip empty buffers
- drain on shutdown
- at-most-once semantics (failed batches dropped, not retried)

A thread-local recursion guard prevents the uploader's own request-error
log lines (emitted by `cli_client.py`'s `socketdev` logger) from being
re-enqueued during a flush.
"""

import json
import logging
import threading
from datetime import datetime, timezone
from typing import Optional

from .cli_client import CliClient

log = logging.getLogger(__name__)

_FLUSH_GUARD = threading.local()

_LEVEL_MAP = {
logging.DEBUG: "DEBUG",
logging.INFO: "INFO",
logging.WARNING: "WARN",
logging.ERROR: "ERROR",
logging.CRITICAL: "ERROR",
}


def _now_str() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]


class BatchedLogUploader:
def __init__(
self,
client: CliClient,
run_id: str,
flush_interval: float = 5.0,
):
self._client = client
self._run_id = run_id
self._flush_interval = flush_interval
self._buf: list = []
self._lock = threading.Lock()
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None

def add(self, entry: dict) -> None:
with self._lock:
self._buf.append(entry)

def start(self) -> None:
if self._thread is not None:
return
self._thread = threading.Thread(
target=self._run,
name=f"socket-log-uploader-{self._run_id[:8]}",
daemon=True,
)
self._thread.start()

def stop(self, timeout: float = 2.0) -> None:
if self._thread is None:
self._flush()
return
self._stop.set()
self._thread.join(timeout=timeout)
self._thread = None
self._flush()

def _run(self) -> None:
while not self._stop.is_set():
self._flush()
self._stop.wait(self._flush_interval)

def _flush(self) -> None:
with self._lock:
if not self._buf:
return
batch = self._buf
self._buf = []

_FLUSH_GUARD.active = True
try:
self._client.request(
path=f"python-cli-runs/{self._run_id}/logs",
method="POST",
payload=json.dumps({"logs": batch}),
)
except Exception as e:
log.debug(f"log upload failed (swallowed, {len(batch)} entries dropped): {e}")
finally:
_FLUSH_GUARD.active = False


class UploadingLogHandler(logging.Handler):
def __init__(self, uploader: BatchedLogUploader, context: str = "socket-python-cli"):
super().__init__()
self._uploader = uploader
self._context = context

def emit(self, record: logging.LogRecord) -> None:
if getattr(_FLUSH_GUARD, "active", False):
return
try:
self._uploader.add({
"timestamp": _now_str(),
"level": _LEVEL_MAP.get(record.levelno, "INFO"),
"message": self.format(record),
"context": self._context,
})
except Exception:
self.handleError(record)
87 changes: 87 additions & 0 deletions socketsecurity/core/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Wire the server log streaming pipeline for one CLI run.

`setup_streaming` registers the run with the backend, attaches handlers that
route the CLI's own log output through both the local terminal and a batched
uploader, and forces the loggers into DEBUG so the upload captures everything
regardless of local terminal verbosity.

Returns a teardown callable to invoke on exit (typically via `atexit.register`).
Returns None if registration failed; in that case nothing was wired up.
"""

import logging
from typing import Callable, Optional

from .cli_client import CliClient
from .cli_run import finalize_cli_run, register_cli_run
from .log_uploader import BatchedLogUploader, UploadingLogHandler

_run_status: str = "success"
_report_run_id: Optional[str] = None


def set_run_status(status: str) -> None:
global _run_status
_run_status = status


def set_report_run_id(report_run_id: Optional[str]) -> None:
global _report_run_id
_report_run_id = report_run_id


def setup_streaming(
*,
client: CliClient,
cli_logger: logging.Logger,
sdk_logger: logging.Logger,
client_version: str,
share_logs: bool,
decline_logs: bool,
enable_debug: bool,
) -> Optional[Callable[[], None]]:
run_id = register_cli_run(
client,
client_version=client_version,
share_logs=share_logs,
decline_logs=decline_logs,
)
if not run_id:
cli_logger.debug("server log streaming not active for this run")
return None

log_uploader = BatchedLogUploader(client, run_id)
log_uploader.start()
upload_handler = UploadingLogHandler(log_uploader, context="socket-python-cli")
upload_handler.setFormatter(logging.Formatter("%(message)s"))

terminal_handler = logging.StreamHandler()
terminal_handler.setLevel(logging.DEBUG if enable_debug else logging.INFO)
terminal_handler.setFormatter(logging.Formatter("%(asctime)s: %(message)s"))

saved_levels = (cli_logger.level, sdk_logger.level)
saved_propagate = (cli_logger.propagate, sdk_logger.propagate)
cli_logger.setLevel(logging.DEBUG)
sdk_logger.setLevel(logging.DEBUG)
cli_logger.propagate = False
sdk_logger.propagate = False
cli_logger.addHandler(terminal_handler)
sdk_logger.addHandler(terminal_handler)
cli_logger.addHandler(upload_handler)
sdk_logger.addHandler(upload_handler)

cli_logger.debug(f"server log streaming enabled (run_id={run_id})")

def teardown() -> None:
cli_logger.removeHandler(upload_handler)
sdk_logger.removeHandler(upload_handler)
log_uploader.stop()
finalize_cli_run(client, run_id, status=_run_status, report_run_id=_report_run_id)
cli_logger.removeHandler(terminal_handler)
sdk_logger.removeHandler(terminal_handler)
cli_logger.setLevel(saved_levels[0])
sdk_logger.setLevel(saved_levels[1])
cli_logger.propagate = saved_propagate[0]
sdk_logger.propagate = saved_propagate[1]

return teardown
Loading