Source code for debusine.tasks.extract_for_signing
# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Task to extract signing input from other artifacts."""
import json
import re
import shlex
import shutil
import subprocess
import tempfile
from pathlib import Path, PurePath
from typing import Any
from debusine.artifacts import SigningInputArtifact
from debusine.artifacts.models import ArtifactCategory, CollectionCategory
from debusine.client.models import RelationType
from debusine.tasks import BaseTaskWithExecutor, RunCommandTask
from debusine.tasks.models import (
ExtractForSigningData,
ExtractForSigningDynamicData,
)
from debusine.tasks.server import TaskDatabaseInterface
# https://www.debian.org/doc/debian-policy/ch-controlfields.html#s-f-source
_re_package_name = re.compile(r"^[a-z0-9][a-z0-9+.-]+$")
class ExtractError(Exception):
"""An error occurred while extracting signing input."""
[docs]
class ExtractForSigning(
RunCommandTask[ExtractForSigningData, ExtractForSigningDynamicData],
BaseTaskWithExecutor[ExtractForSigningData, ExtractForSigningDynamicData],
):
"""Task to extract signing input from other artifacts."""
TASK_VERSION = 1
[docs]
def __init__(
self,
task_data: dict[str, Any],
dynamic_task_data: dict[str, Any] | None = None,
) -> None:
"""Initialize the task."""
super().__init__(task_data, dynamic_task_data)
# Set by fetch_input.
self._template_deb_name: str | None = None
self._template_path: Path | None = None
self._binary_artifacts: dict[str, int] | None = None
self._binary_paths: dict[str, Path] | None = None
# Set by run.
self._remote_execute_directory: PurePath | None = None
self._local_execute_directory: Path | None = None
self._signing_input_artifacts: (
dict[str, SigningInputArtifact] | None
) = None
def _cmdline(self) -> list[str]:
"""Unused abstract method from RunCommandTask."""
raise NotImplementedError()
[docs]
def can_run_on(self, worker_metadata: dict[str, Any]) -> bool:
"""Check if the specified worker can run the task."""
if not super().can_run_on(worker_metadata):
return False
executor_available_key = f"executor:{self.backend}:available"
return bool(worker_metadata.get(executor_available_key, False))
[docs]
def compute_dynamic_data(
self, task_database: TaskDatabaseInterface
) -> ExtractForSigningDynamicData:
"""Resolve artifact lookups for this task."""
return ExtractForSigningDynamicData(
environment_id=self.get_environment(
task_database,
self.data.environment,
default_category=CollectionCategory.ENVIRONMENTS,
),
input_template_artifact_id=task_database.lookup_single_artifact(
self.data.input.template_artifact
),
input_binary_artifacts_ids=task_database.lookup_multiple_artifacts(
self.data.input.binary_artifacts
),
)
[docs]
def fetch_input(self, destination: Path) -> bool:
"""Download the required artifacts."""
assert self.dynamic_data is not None
artifact = self.fetch_artifact(
self.dynamic_data.input_template_artifact_id, destination
)
if artifact.category != ArtifactCategory.BINARY_PACKAGE:
self.append_to_log_file(
"fetch_input.log",
[
f"Expected template_artifact to be of category "
f"{ArtifactCategory.BINARY_PACKAGE}; got "
f"{artifact.category}"
],
)
return False
self._template_deb_name = artifact.data["deb_fields"]["Package"]
# Checked by BinaryPackage validator.
assert len(artifact.files) == 1
template_file = list(artifact.files)[0]
if not template_file.endswith(".deb"):
self.append_to_log_file(
"fetch_input.log",
[
f"Expected template_artifact file name to match *.deb; "
f"got {template_file}"
],
)
return False
self._template_path = destination / template_file
self._binary_artifacts = {}
self._binary_paths = {}
for artifact_id in self.dynamic_data.input_binary_artifacts_ids:
artifact = self.fetch_artifact(artifact_id, destination)
if artifact.category != ArtifactCategory.BINARY_PACKAGE:
self.append_to_log_file(
"fetch_input.log",
[
f"Expected each of binary_artifacts to be of category "
f"{ArtifactCategory.BINARY_PACKAGE}; got "
f"{artifact.category}"
],
)
return False
binary_deb_name = artifact.data["deb_fields"]["Package"]
self._binary_artifacts[binary_deb_name] = artifact_id
# Checked by BinaryPackage validator.
assert len(artifact.files) == 1
binary_file = list(artifact.files)[0]
self._binary_paths[binary_deb_name] = destination / binary_file
return True
[docs]
def configure_for_execution(
self, download_directory: Path # noqa: U100
) -> bool:
"""Configure task: create and start an executor instance."""
self._prepare_executor_instance()
if self.executor_instance is None:
raise AssertionError("self.executor_instance cannot be None")
return True
def _run_cmd_or_raise(
self,
cmd: list[str],
execute_directory: PurePath,
*,
override_cmd_name: str | None = None,
) -> None:
cmd_name = override_cmd_name or cmd[0]
self.logger.info("Executing: %s", shlex.join(cmd))
returncode = self.run_cmd(cmd, Path(execute_directory))
self.logger.info("%s exited with code %s", cmd_name, returncode)
if returncode != 0:
raise ExtractError(f"{cmd_name} exited with code {returncode}")
def _pull_from_executor(self, source: PurePath, target: Path) -> None:
"""Pull a subdirectory from the executor, preserving timestamps."""
assert self.executor_instance is not None
assert self._remote_execute_directory is not None
assert self._local_execute_directory is not None
# Guard against accidentally passing
# {local,remote}_execute_directory directly.
assert source != self._remote_execute_directory
assert target != self._local_execute_directory
source_tar_path = PurePath(f"{source}.tar")
target_tar_path = Path(f"{target}.tar")
self._run_cmd_or_raise(
[
"tar",
# Executors don't currently support cwd:
# https://salsa.debian.org/freexian-team/debusine/-/issues/434
"-C",
str(source.parent),
"-cf",
str(source_tar_path),
source.name,
],
source.parent,
)
self.executor_instance.file_pull(source_tar_path, target_tar_path)
subprocess.run(
["tar", "--one-top-level", "-xf", target_tar_path],
cwd=target.parent,
)
def _extract_binary(self, deb: PurePath, target: PurePath) -> None:
"""Extract a binary package."""
self._run_cmd_or_raise(
["dpkg-deb", "-x", str(deb), str(target)],
target.parent,
)
def _read_manifest(self) -> dict[str, Any]:
"""Read the files.json manifest from the template artifact."""
assert self.executor_instance is not None
assert self._template_deb_name is not None
assert self._template_path is not None
assert self._remote_execute_directory is not None
assert self._local_execute_directory is not None
remote_template_path = (
self._remote_execute_directory / self._template_deb_name
)
local_files_json_path = self._local_execute_directory / "files.json"
self._extract_binary(self._template_path, remote_template_path)
self.executor_instance.file_pull(
remote_template_path
/ "usr"
/ "share"
/ "code-signing"
/ self._template_deb_name
/ "files.json",
local_files_json_path,
)
manifest = json.loads(local_files_json_path.read_text())
# For mypy. If the manifest isn't a JSON object, we'll get a
# run-time error almost immediately afterwards anyway.
assert isinstance(manifest, dict)
return manifest
def _make_signing_input_artifact(
self, package: str, metadata: dict[str, Any]
) -> SigningInputArtifact:
"""
Check a package's metadata and make a signing-input artifact.
:raises ExtractError: if the package's metadata is invalid.
"""
assert self.executor_instance is not None
assert self._binary_paths is not None
assert self._remote_execute_directory is not None
assert self._local_execute_directory is not None
if _re_package_name.match(package) is None:
raise ExtractError(f"'{package}' is not a valid package name")
remote_binary_path = self._remote_execute_directory / package
self._extract_binary(self._binary_paths[package], remote_binary_path)
local_binary_path = self._local_execute_directory / package
self._pull_from_executor(remote_binary_path, local_binary_path)
signing_input_paths: list[Path] = []
for file in metadata["files"]:
file_path = PurePath(file["file"])
if ".." in file_path.parts:
raise ExtractError(
f"File name '{file_path}' may not contain '..' segments"
)
if file_path.is_absolute():
raise ExtractError(
f"File name '{file_path}' may not be absolute"
)
if not (
(local_binary_path / file_path)
.resolve()
.is_relative_to(local_binary_path)
):
raise ExtractError(
f"File name '{file_path}' may not traverse symlinks to "
f"outside the package"
)
signing_input_paths.append(local_binary_path / file_path)
return SigningInputArtifact.create(
signing_input_paths,
local_binary_path.parent,
trusted_certs=metadata.get("trusted_certs"),
binary_package_name=package,
)
[docs]
def run(self, execute_directory: Path) -> bool:
"""Do the main extraction work."""
assert self.executor_instance is not None
assert self._binary_paths is not None
self._remote_execute_directory = PurePath(execute_directory)
self._local_execute_directory = Path(
tempfile.mkdtemp(prefix="debusine-extract-for-signing-")
)
manifest = self._read_manifest()
self._signing_input_artifacts = {
package: self._make_signing_input_artifact(package, metadata)
for package, metadata in manifest["packages"].items()
}
return True
[docs]
def upload_artifacts(
self, execute_directory: Path, *, execution_success: bool # noqa: U100
) -> None:
"""Upload artifacts for the task."""
assert self.work_request_id is not None
assert self.debusine is not None
if execution_success:
assert self.dynamic_data is not None
assert self._binary_artifacts is not None
assert self._signing_input_artifacts is not None
for package, artifact in self._signing_input_artifacts.items():
uploaded_artifact = self.debusine.upload_artifact(
artifact,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
for related_to_id in (
self.dynamic_data.input_template_artifact_id,
self._binary_artifacts[package],
):
self.debusine.relation_create(
uploaded_artifact.id,
related_to_id,
RelationType.RELATES_TO,
)
[docs]
def cleanup(self) -> None:
"""Clean up after running the task."""
if self._local_execute_directory is not None:
shutil.rmtree(self._local_execute_directory)
super().cleanup()