# Copyright 2021 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""
Task to build Debian packages with sbuild.
This task implements the PackageBuild generic task for its task_data:
https://freexian-team.pages.debian.net/debusine/reference/tasks.html#task-packagebuild
"""
import email.utils
import os
import subprocess
from pathlib import Path
from typing import Any, cast
try:
import pydantic.v1 as pydantic
except ImportError:
import pydantic # type: ignore
import debian.deb822 as deb822
import yaml
import debusine.utils
from debusine.artifacts import (
BinaryPackage,
BinaryPackages,
PackageBuildLog,
SigningInputArtifact,
Upload,
)
from debusine.artifacts.models import (
ArtifactCategory,
CollectionCategory,
DoseDistCheck,
)
from debusine.client.models import RemoteArtifact
from debusine.tasks import BaseTaskWithExecutor, RunCommandTask, TaskConfigError
from debusine.tasks.models import SbuildData, SbuildDynamicData
from debusine.tasks.sbuild_validator_mixin import SbuildValidatorMixin
from debusine.tasks.server import TaskDatabaseInterface
from debusine.utils import read_dsc
[docs]
class Sbuild(
SbuildValidatorMixin,
RunCommandTask[SbuildData, SbuildDynamicData],
BaseTaskWithExecutor[SbuildData, SbuildDynamicData],
):
"""Task implementing a Debian package build with sbuild."""
TASK_VERSION = 1
[docs]
def __init__(
self,
task_data: dict[str, Any],
dynamic_task_data: dict[str, Any] | None = None,
) -> None:
"""Initialize the sbuild task."""
super().__init__(task_data, dynamic_task_data)
self.chroots = None
self.builder = "sbuild"
# dsc_file Path. Set by self.configure_for_execution()
self._dsc_file: Path | None = None
self._extra_packages: list[Path] = []
[docs]
def compute_dynamic_data(
self, task_database: TaskDatabaseInterface
) -> SbuildDynamicData:
"""Resolve artifact lookups for this task."""
DEBUSINE_FQDN = task_database.get_server_setting("DEBUSINE_FQDN")
return SbuildDynamicData(
environment_id=(
None
if self.data.environment is None
else self.get_environment(
task_database,
self.data.environment,
default_category=CollectionCategory.ENVIRONMENTS,
)
),
input_source_artifact_id=task_database.lookup_single_artifact(
self.data.input.source_artifact
),
input_extra_binary_artifacts_ids=(
task_database.lookup_multiple_artifacts(
self.data.input.extra_binary_artifacts
)
),
binnmu_maintainer=f"Debusine <noreply@{DEBUSINE_FQDN}>",
)
[docs]
def get_source_artifacts_ids(self) -> list[int]:
"""
Return the list of source artifact IDs used by this task.
This refers to the artifacts actually used by the task. If
dynamic_data is empty, this returns the empty list.
"""
if not self.dynamic_data:
return []
result: list[int] = []
if val := self.dynamic_data.environment_id:
result.append(val)
result.append(self.dynamic_data.input_source_artifact_id)
result.extend(self.dynamic_data.input_extra_binary_artifacts_ids)
return result
@property
def chroot_name(self) -> str:
"""Build name of required chroot."""
return "{}-{}".format(
self.data.distribution,
self.data.host_architecture,
)
@staticmethod
def _call_dpkg_architecture(): # pragma: no cover
return (
subprocess.check_output(["dpkg", "--print-architecture"])
.decode("utf-8")
.strip()
)
[docs]
@classmethod
def analyze_worker(cls):
"""Report metadata for this task on this worker."""
metadata = super().analyze_worker()
available_key = cls.prefix_with_task_name("available")
metadata[available_key] = debusine.utils.is_command_available("sbuild")
if debusine.utils.is_command_available("schroot"):
chroots_key = cls.prefix_with_task_name("chroots")
metadata[chroots_key] = cls._list_chroots()
return metadata
[docs]
def can_run_on(self, worker_metadata: dict[str, Any]) -> bool:
"""Check the specified worker can run the requested task."""
if not super().can_run_on(worker_metadata):
return False
available_key = self.prefix_with_task_name("available")
if not worker_metadata.get(available_key, False):
return False
if self.backend == "schroot":
chroot_key = self.prefix_with_task_name("chroots")
if self.chroot_name not in worker_metadata.get(chroot_key, []):
return False
else:
executor_available_key = f"executor:{self.backend}:available"
if not worker_metadata.get(executor_available_key, False):
return False
if self.backend != "unshare":
if not worker_metadata.get("autopkgtest:available", False):
return False
return True
@staticmethod
def _call_schroot_list(): # pragma: no cover
return (
subprocess.check_output(["schroot", "--list"])
.decode("utf-8")
.strip()
)
@classmethod
def _list_chroots(cls) -> list[str]:
"""
Provide support for finding available chroots.
Ensure that aliases are supported as the DSC may explicitly refer to
<codename>-security (or -backports) etc.
Return the list of detected chroots.
"""
chroots = []
output = cls._call_schroot_list()
for line in output.split("\n"):
if line.startswith("chroot:") and line.endswith("-sbuild"):
chroots.append(line[7:-7])
return chroots
def _update_chroots_list(self):
"""
Populate the self.chroots list, if the list is empty.
No return value, this is a find, not a get.
"""
if self.chroots is not None:
return
self.chroots = self._list_chroots()
def _verify_schroot(self):
"""Verify a suitable schroot exists."""
self._update_chroots_list()
if not self.chroots:
self.logger.error("No sbuild chroots found")
return False
if self.chroot_name in self.chroots:
return True
self.logger.error("No suitable chroot found for %s", self.chroot_name)
return False
def _cmdline(self) -> list[str]:
"""
Build the sbuild command line.
Use self.data and self._dsc_file.
"""
cmd = [
self.builder,
"--no-clean",
"--purge-deps=never",
]
if "any" in self.data.build_components:
cmd.append("--arch-any")
else:
cmd.append("--no-arch-any")
if "all" in self.data.build_components:
cmd.append("--arch-all")
else:
cmd.append("--no-arch-all")
if "source" in self.data.build_components:
cmd.append("--source")
else:
cmd.append("--no-source")
cmd.append("--arch=" + self.data.host_architecture)
if self.backend == "schroot":
# Using cast because SbuildTaskData validators enforce that
# distribution is not None if backend is schroot
cmd.append("--dist=" + cast(str, self.data.distribution))
else:
# set in configure_for_execution
if self.executor is None:
raise AssertionError("self.executor not set")
distribution = self.executor.system_image.data["codename"]
cmd.append(f"--dist={distribution}")
if self.backend == "unshare":
cmd += [
"--chroot-mode=unshare",
f"--chroot={self.executor.image_name()}",
# Remove any dangling resolv.conf symlink (from
# systemd-resolved installed in the environment, #1071736)
"--chroot-setup-commands=rm -f /etc/resolv.conf",
]
else:
virt_server = self.executor.autopkgtest_virt_server()
cmd += [
"--chroot-mode=autopkgtest",
f"--autopkgtest-virt-server={virt_server}",
] + [
f"--autopkgtest-virt-server-opt={opt.replace('%', '%%')}"
for opt in self.executor.autopkgtest_virt_args()
]
# binNMU
if self.data.binnmu is not None:
cmd.append(f"--make-binNMU={self.data.binnmu.changelog}")
cmd.append(f"--append-to-version={self.data.binnmu.suffix}")
cmd.append("--binNMU=0")
if self.data.binnmu.timestamp is not None:
rfc5322date = email.utils.format_datetime(
self.data.binnmu.timestamp
)
cmd.append(f"--binNMU-timestamp={rfc5322date}")
maintainer: str | None
if self.data.binnmu.maintainer is not None:
maintainer = str(self.data.binnmu.maintainer)
else:
assert self.dynamic_data
maintainer = self.dynamic_data.binnmu_maintainer
cmd.append(f"--maintainer={maintainer}")
if self.data.build_profiles:
cmd.append(f"--profiles={','.join(self.data.build_profiles)}")
# BD-Uninstallable
cmd.append("--bd-uninstallable-explainer=dose3")
for package in self._extra_packages:
cmd.append(f"--extra-package={package}")
cmd.append(str(self._dsc_file))
return cmd
def _cmd_env(self) -> dict[str, str] | None:
"""Set DEB_BUILD_OPTIONS for build profiles."""
build_options = [
profile
for profile in self.data.build_profiles or []
if profile in ("nocheck", "nodoc")
]
if build_options:
env = dict(os.environ)
env["DEB_BUILD_OPTIONS"] = " ".join(
os.environ.get("DEB_BUILD_OPTIONS", "").split() + build_options
)
return env
return None
[docs]
def execute(self) -> bool:
"""
Verify task can be executed and super().execute().
:raises: TaskConfigError.
""" # noqa: D402
if self.backend == "schroot" and not self._verify_schroot():
raise TaskConfigError(
"No suitable schroot for"
f" {self.data.distribution}-{self.data.host_architecture}"
)
return super().execute()
@staticmethod
def _extract_dose3_explanation(
build_log_path: Path,
) -> DoseDistCheck | None:
"""Isolate and parse dose3 output in BD-Uninstallable scenario."""
output = ''
with open(build_log_path, errors='replace') as f:
# find start of dose-debcheck output
while (line := f.readline()) != '':
if line == '(I)Dose_applications: Solving...\n':
break
# exit if no/invalid output
if not (line := f.readline()).startswith('output-version'):
return None
output += line
# grab until next section
while (line := f.readline()) != '':
if line.startswith('+----'):
break
output += line
# convert & validate to Pydantic structure
# Use yaml.CBaseLoad to work-around malformed yaml
# https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=834059#22
# https://gitlab.com/irill/dose3/-/issues/18
parsed_dict = yaml.load(output, yaml.CBaseLoader)
try:
return DoseDistCheck.parse_obj(parsed_dict)
except pydantic.ValidationError:
return None
def _upload_package_build_log(
self,
build_directory: Path,
source: str,
version: str,
execution_success: bool,
) -> RemoteArtifact | None:
if not self.debusine:
raise AssertionError("self.debusine not set")
build_log_path = debusine.utils.find_file_suffixes(
build_directory, [".build"]
)
if build_log_path is None:
return None
explanation = None
if not execution_success:
# BD-Uninstallable: look for the dose3 output
explanation = self._extract_dose3_explanation(build_log_path)
package_build_log = PackageBuildLog.create(
source=source,
version=version,
file=build_log_path,
bd_uninstallable=explanation,
)
return self.debusine.upload_artifact(
package_build_log,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
def _upload_binary_upload(
self, build_directory: Path
) -> RemoteArtifact | None:
if not self.debusine:
raise AssertionError("self.debusine not set")
changes_path = debusine.utils.find_file_suffixes(
build_directory, [".changes"]
)
if changes_path is None:
return None
artifact_binary_upload = Upload.create(
changes_file=changes_path,
)
return self.debusine.upload_artifact(
artifact_binary_upload,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
def _upload_signing_input(
self, build_directory: Path
) -> RemoteArtifact | None:
if not self.debusine:
raise AssertionError("self.debusine not set")
changes_path = debusine.utils.find_file_suffixes(
build_directory, [".changes"]
)
if changes_path is None:
return None
artifact_signing_input = SigningInputArtifact.create(
[changes_path], build_directory
)
return self.debusine.upload_artifact(
artifact_signing_input,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
def _create_binary_package_local_artifacts(
self,
build_directory: Path,
dsc: deb822.Dsc,
architecture: str,
suffixes: list[str],
) -> list[BinaryPackage | BinaryPackages]:
deb_paths = debusine.utils.find_files_suffixes(
build_directory, suffixes
)
artifacts: list[BinaryPackage | BinaryPackages] = []
for deb_path in deb_paths:
artifacts.append(BinaryPackage.create(file=deb_path))
artifacts.append(
BinaryPackages.create(
srcpkg_name=dsc["source"],
srcpkg_version=dsc["version"],
version=dsc["version"],
architecture=architecture,
files=deb_paths,
)
)
return artifacts
def _upload_binary_packages(
self, build_directory: Path, dsc: deb822.Dsc
) -> list[RemoteArtifact]:
r"""Upload \*.deb and \*.udeb files."""
if not self.debusine:
raise AssertionError("self.debusine not set")
host_arch = self.data.host_architecture
packages = []
if "any" in self.data.build_components:
prefix = "_" + host_arch
packages.extend(
self._create_binary_package_local_artifacts(
build_directory,
dsc,
host_arch,
[prefix + ".deb", prefix + ".udeb"],
)
)
if "all" in self.data.build_components:
prefix = "_all"
packages.extend(
self._create_binary_package_local_artifacts(
build_directory,
dsc,
"all",
[prefix + ".deb", prefix + ".udeb"],
)
)
remote_artifacts: list[RemoteArtifact] = []
for package in packages:
if package.files:
remote_artifacts.append(
self.debusine.upload_artifact(
package,
workspace=self.workspace_name,
work_request=self.work_request_id,
)
)
return remote_artifacts
def _create_remote_binary_packages_relations(
self,
remote_build_log: RemoteArtifact | None,
remote_binary_upload: RemoteArtifact | None,
remote_binary_packages: list[RemoteArtifact],
remote_signing_input: RemoteArtifact | None,
):
if not self.debusine:
raise AssertionError("self.debusine not set")
for remote_binary_package in remote_binary_packages:
for source_artifact_id in self._source_artifacts_ids:
self.debusine.relation_create(
remote_binary_package.id,
source_artifact_id,
"built-using",
)
if remote_build_log is not None:
self.debusine.relation_create(
remote_build_log.id, remote_binary_package.id, "relates-to"
)
if remote_binary_upload is not None:
self.debusine.relation_create(
remote_binary_upload.id,
remote_binary_package.id,
"extends",
)
self.debusine.relation_create(
remote_binary_upload.id,
remote_binary_package.id,
"relates-to",
)
if (
remote_binary_upload is not None
and remote_signing_input is not None
):
self.debusine.relation_create(
remote_signing_input.id, remote_binary_upload.id, "relates-to"
)
[docs]
def upload_artifacts(
self, directory: Path, *, execution_success: bool
) -> None:
"""
Upload the artifacts from directory.
:param directory: directory containing the files that
will be uploaded.
:param execution_success: if False skip uploading .changes and
*.deb/*.udeb
"""
if not self.debusine:
raise AssertionError("self.debusine not set")
dsc = read_dsc(self._dsc_file)
if dsc is not None:
# Upload the .build file (PackageBuildLog)
remote_build_log = self._upload_package_build_log(
directory, dsc["source"], dsc["version"], execution_success
)
if remote_build_log is not None:
for source_artifact_id in self._source_artifacts_ids:
self.debusine.relation_create(
remote_build_log.id,
source_artifact_id,
"relates-to",
)
if execution_success:
# Upload the *.deb/*.udeb files (BinaryPackages)
remote_binary_packages = self._upload_binary_packages(
directory, dsc
)
# Upload the .changes and the rest of the files
remote_binary_changes = self._upload_binary_upload(directory)
# Upload the .changes on its own as signing input
remote_signing_input = self._upload_signing_input(directory)
# Create the relations
self._create_remote_binary_packages_relations(
remote_build_log,
remote_binary_changes,
remote_binary_packages,
remote_signing_input,
)
[docs]
def get_label(self) -> str:
"""Return the task label."""
# TODO: copy the source package information in dynamic task data and
# use them here if available
return "build a package"