Skip to content
96 changes: 77 additions & 19 deletions src/specify_cli/commands/bundle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,14 @@ def catalog_remove(
console.print(f"[green]✓[/green] Removed catalog source '{removed}'.")


# ZIP magic-byte signatures used to detect .zip payloads from REST API asset
# URLs, which carry no file extension. The three signatures cover all valid
# ZIP variants (PK\x03\x04 = local file header, PK\x05\x06 = empty archive,
# PK\x07\x08 = spanning marker) without the false-positive risk of checking
# only the 2-byte "PK" prefix.
_ZIP_SIGNATURES = (b"PK\x03\x04", b"PK\x05\x06", b"PK\x07\x08")


# ===== internal helpers =====


Expand Down Expand Up @@ -794,41 +802,91 @@ def _download_remote_manifest(entry_id: str, url: str):
"""Fetch a remote bundle artifact over HTTPS and extract its manifest."""
import io
import tempfile
from pathlib import PurePosixPath
from urllib.parse import urlparse as _urlparse

import yaml as _yaml

from ...authentication.http import open_url
from ...authentication.http import github_provider_hosts, open_url
from ..._github_http import resolve_github_release_asset_api_url
from ...bundler.models.manifest import BundleManifest

def _validate_redirect(old_url: str, new_url: str) -> None:
_require_https(f"bundle '{entry_id}'", new_url)

_require_https(f"bundle '{entry_id}'", url)

# For private/SSO-protected GitHub repos, browser release download URLs
# (https://github.com/<owner>/<repo>/releases/download/<tag>/<asset>)
# redirect to an HTML/SSO page instead of delivering the asset. Resolve
# such URLs to the GitHub REST API asset URL so the authenticated client
# can download the actual file.
extra_headers = None
effective_url = url
resolved = resolve_github_release_asset_api_url(
url, open_url, timeout=30, github_hosts=github_provider_hosts()
)
if resolved:
effective_url = resolved
_require_https(f"bundle '{entry_id}'", effective_url)
extra_headers = {"Accept": "application/octet-stream"}

try:
with open_url(url, timeout=30, redirect_validator=_validate_redirect) as resp:
with open_url(
effective_url,
timeout=30,
redirect_validator=_validate_redirect,
extra_headers=extra_headers,
) as resp:
_require_https(f"bundle '{entry_id}'", resp.geturl())
raw = resp.read()
except BundlerError:
raise
except Exception as exc: # noqa: BLE001
raise BundlerError(f"Failed to download bundle '{entry_id}' from {url}: {exc}") from exc
# Report the original catalog URL so users know which entry to fix,
# and include the resolved URL when it differs for easier debugging.
if effective_url != url:
msg = f"Failed to download bundle '{entry_id}' from {url} (resolved to {effective_url}): {exc}"
else:
msg = f"Failed to download bundle '{entry_id}' from {url}: {exc}"
raise BundlerError(msg) from exc

# A .zip artifact is written to a temp file and parsed via the local-source
# path (which extracts bundle.yml); any other payload is treated as YAML.
if url.lower().endswith(".zip"):
with tempfile.TemporaryDirectory() as tmp:
artifact = Path(tmp) / "bundle.zip"
artifact.write_bytes(raw)
manifest = _local_manifest_source(str(artifact))
if manifest is None:
raise BundlerError(
f"Downloaded artifact for bundle '{entry_id}' is not a valid bundle."
)
return manifest

import yaml as _yaml

from ...bundler.models.manifest import BundleManifest
# Detection uses the path component of the original catalog URL (via
# PurePosixPath so query strings and fragments are ignored, and URL paths
# are always treated as POSIX regardless of host OS), falling back to the
# module-level _ZIP_SIGNATURES magic-byte check for direct REST API asset
# URLs which carry no file extension.
_url_ext = PurePosixPath(_urlparse(url).path).suffix.lower()
try:
if _url_ext == ".zip" or raw[:4] in _ZIP_SIGNATURES:
with tempfile.TemporaryDirectory() as tmp:
artifact = Path(tmp) / "bundle.zip"
artifact.write_bytes(raw)
manifest = _local_manifest_source(str(artifact))
# _local_manifest_source returns None only when the file does
# not exist; since we just wrote *artifact* that cannot happen
# here. The explicit guard ensures callers never receive None
# and silently degrade instead of raising a clear error.
if manifest is None:
raise BundlerError(
f"Downloaded artifact for bundle '{entry_id}' is not a valid bundle."
)
return manifest

data = _yaml.safe_load(io.BytesIO(raw))
return BundleManifest.from_dict(data)
data = _yaml.safe_load(io.BytesIO(raw))
return BundleManifest.from_dict(data)
except BundlerError:
raise
except _yaml.YAMLError as exc:
raise BundlerError(
f"Downloaded content for bundle '{entry_id}' is not valid YAML: {exc}"
) from exc
except Exception as exc: # noqa: BLE001
raise BundlerError(
f"Failed to parse downloaded bundle '{entry_id}': {exc}"
) from exc


def register(app: typer.Typer) -> None:
Expand Down
Loading