From 058051f1e9efc5e537872661b0653fc20fee5ae3 Mon Sep 17 00:00:00 2001 From: ulinja <56582668+ulinja@users.noreply.github.com> Date: Wed, 20 Apr 2022 22:45:44 +0200 Subject: [PATCH] import image procurement code into project --- udib/gpgverify.py | 126 ++++++++++++++++++++++++++++++ udib/userinput.py | 48 ++++++++++++ udib/webdownload.py | 181 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 355 insertions(+) create mode 100644 udib/gpgverify.py create mode 100644 udib/userinput.py create mode 100644 udib/webdownload.py diff --git a/udib/gpgverify.py b/udib/gpgverify.py new file mode 100644 index 0000000..992d0a4 --- /dev/null +++ b/udib/gpgverify.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 + +""" This module validates GPG signatures for downloaded files. """ + +from pathlib import Path + +import gnupg + +import userinput +from printmsg import perror, pfailure, pinfo, pok, psuccess, pwarning + +def gpg_signature_is_valid( + path_to_signature_file, + path_to_data_file, + fallback_keyserver_name +): + """ Validates a PGP signature for a data file. The detached signature is + provided as plaintext (UTF8) in the specified file. + + If the discovered signing key is unknown to gpg on this system for the + invoking user, and if 'fallback_keyserver_name' is not None, an attempt + is made to import the key from the specified keyserver using its ID + after prompting the user for permission to import the key. + + Parameters + ---------- + path_to_signature_file : str or pathlike object + Path to a detached PGP signature stored in a standalone file. + Example value: "/path/to/SHA256SUMS.sig" + path_to_data_file : str or pathlike object + Path to a signed data file, for which the signature is to be verified. + Example value: "/path/to/SHA256SUMS" + fallback_keyserver_name : str + FQDN of a keyserver from which to import unknown public keys. + Example value: "keyring.debian.org" + + Returns + ------- + True : bool + If the signature is valid. + False : bool + If the signature is invalid or could not be validated. + """ + path_to_signature_file = Path(path_to_signature_file) + path_to_data_file = Path(path_to_data_file) + + gpg = gnupg.GPG() + gpg.encoding = "utf-8" + + pinfo("Validating signature...") + with open(path_to_signature_file, "rb") as signature_file: + verification = gpg.verify_file( + signature_file, + path_to_data_file, + close_file=False + ) + + # check if a key and fingerprint were found in the signature file + if verification.key_id is None or verification.fingerprint is None: + raise ValueError( + f"Not a valid PGP signature file: '{path_to_signature_file}'." + ) + else: + pinfo(f"Signature mentions a key with ID "\ + f"{verification.key_id} and fingerprint "\ + f"{verification.fingerprint}." + ) + + if verification.valid: + pok(f"GPG signature is valid with trustlevel "\ + f"'{verification.trust_level}'." + ) + return True + + # this case commonly occurrs when the GPG key has not been imported + if verification.status == "no public key": + pwarning("Could not find the public GPG key locally!") + + # prompt user until answer is unambiguous + key_will_be_imported = None + while key_will_be_imported is None: + key_will_be_imported = userinput.prompt_yes_or_no( + f"[PROMPT] Do you want to import the GPG key from "\ + f"'{fallback_keyserver_name}'?" + ) + + if key_will_be_imported is None: + perror("Unrecognized input. Please try again.") + + if not key_will_be_imported: + pwarning("Aborting without importing key.") + return False + + # import missing key + pinfo("Importing key...") + import_result = gpg.recv_keys( + fallback_keyserver_name, verification.key_id + ) + + if import_result.count < 1: + perror("Failed to import key.") + return False + + # display some of gpg's output + gpg_output = import_result.stderr.split('\n') + for line in gpg_output: + if line.startswith("gpg: "): + pinfo(f"{line}") + + # validate signature again + pinfo("Validating signature...") + with open(path_to_signature_file, "rb") as signature_file: + verification = gpg.verify_file( + signature_file, + path_to_data_file, + close_file=False + ) + + if verification.valid: + pok(f"GPG signature is valid with trustlevel "\ + f"'{verification.trust_level}'." + ) + return True + else: + perror("GPG signature is not valid!!!") + return False diff --git a/udib/userinput.py b/udib/userinput.py new file mode 100644 index 0000000..1cbd626 --- /dev/null +++ b/udib/userinput.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 + +""" This module is responsible for (interactive) user input handling. """ + +import re + +from printmsg import pinput + +def prompt_yes_or_no(question, ask_until_valid = False): + """ Prompts the user with the specified yes/no question. + + If 'ask_until_valid' is True, keeps repeating the prompt until the user + provides recognizable input to answer the question. + + Parameters + ---------- + question : str + The question to ask the user while prompting. The string is suffixed + with " (Yes/No) ". + ask_until_valid : bool + If this is set to True and the user provides an ambiguous answer, keeps + prompting indefinitely until an unambiguous answer is read. + + Returns + ------- + True : bool + If the user has answered 'yes'. + False : bool + If the user has answered 'no'. + None : None + If 'ask_until_valid' is False and the user has provided an ambiguous + response to the prompt. + """ + user_input_is_valid = False + + regex_yes = re.compile(r"^(y)$|^(Y)$|^(YES)$|^(Yes)$|^(yes)$") + regex_no = re.compile(r"^(n)$|^(N)$|^(NO)$|^(No)$|^(no)$") + + while(not user_input_is_valid): + user_input = pinput(f"{question} (Yes/No): ") + + if (regex_yes.match(user_input)): + return True + elif (regex_no.match(user_input)): + return False + elif (not ask_until_valid): + return None + diff --git a/udib/webdownload.py b/udib/webdownload.py new file mode 100644 index 0000000..da55a1b --- /dev/null +++ b/udib/webdownload.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 + +"""This module downloads Debian Linux installation images from the web.""" + +import os, re, subprocess, zipfile +from pathlib import Path +from datetime import date + +import requests +from bs4 import BeautifulSoup +from tqdm import tqdm + +from printmsg import perror, pfailure, pinfo, pok, psuccess, pwarning +import gpgverify + +def download_file(path_to_output_file, url_to_file, show_progress = False): + """ Downloads the file at the specified URL via HTTP and saves it as the + specified output file. Optionally, displays a nice status bar. + + Parameters + ---------- + path_to_output_file : str or pathlike object + Path to a file as which the downloaded file is saved. + url_to_file : str + URL to the file to be downloaded. + show_progress : bool + When True, a progress bar is displayed on StdOut indicating the + progress of the download. + """ + + path_to_output_file = Path(path_to_output_file).resolve() + if not path_to_output_file.parent.is_dir(): + raise FileNotFoundError( + f"No such directory: '{path_to_output_file.parent}'." + ) + if path_to_output_file.exists(): + raise FileExistsError( + f"File already exists: '{path_to_output_file}'" + ) + + output_file_name = path_to_output_file.name + with open(path_to_output_file, "wb") as output_file: + pinfo(f"Downloading '{output_file_name}'...") + file_response = requests.get(url_to_file, stream=True) + total_length = file_response.headers.get('content-length') + + if total_length is None: # no content length header + output_file.write(response.content) + else: + if (show_progress): + total_length = int(total_length) + progress_bar = tqdm( + total=total_length, + unit="B", + unit_scale=True, + unit_divisor=1024 + ) + + for data in file_response.iter_content(chunk_size=4096): + output_file.write(data) + if (show_progress): + progress_bar.update(len(data)) + + if (show_progress): + progress_bar.close() + + pok(f"Received '{output_file_name}'.") + +def debian_obtain_image(path_to_output_dir): + """ Obtains the latest official debian installation image, the SHA512SUMS + file it is listed in, as well as the GPG signature for the SHA512SUMS + file. + + File are obtained from the debian.org HTTPS mirrors and stored in the + specified directory. The obtained image is the FOSS-only, stable x64 + build. + + First, the GPG signature of the hash is validated. Then, the hash of + the image file is checked. If either check fails, an exception is + raised. + + Parameters + ---------- + path_to_output_dir : str or pathlike object + Path to the directory to which all downloaded files will be saved. + + Returns + ------- + path_to_image_file : str + Full path to the obtained and validated image file. + """ + + pinfo("Obtaining and verifying the latest Debian stable image...") + + path_to_output_dir = Path(path_to_output_dir) + + if not path_to_output_dir.is_dir(): + raise ValueError(f"No such directory: '{path_to_output_dir}'") + + releases_url = "https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/" + releases_page = requests.get(releases_url) + soup = BeautifulSoup(releases_page.content, "html.parser") + + hash_file_name = "SHA512SUMS" + hash_file_url = releases_url + hash_file_name + signature_file_name = "SHA512SUMS.sign" + signature_file_url = releases_url + signature_file_name + + # find the URL to the latest stable x64 image + image_file_links = soup.find_all( + name="a", + string=re.compile(r"debian-[0-9.]*-amd64-netinst.iso")) + if len(image_file_links) != 1: + raise RuntimeError("Failed to find an exact match while looking for "\ + "a link to the latest debian image file.") + image_file_name = image_file_links[0]['href'] + image_file_url = releases_url + image_file_name + + # download the SHA512SUMS file + download_file(path_to_output_dir / hash_file_name, hash_file_url) + # download the GPG signature file + download_file(path_to_output_dir / signature_file_name, signature_file_url) + + # verify GPG signature of hash file + if not gpgverify.gpg_signature_is_valid( + path_to_output_dir / signature_file_name, + path_to_output_dir / hash_file_name, + "keyring.debian.org" + ): + raise RuntimeError("GPG signature verification failed!") + + # download the image file + download_file(path_to_output_dir / image_file_name, image_file_url, True) + + # remove unwanted lines from hash file + hash_file = open(path_to_output_dir / hash_file_name, "r") + hash_file_lines = hash_file.readlines() + hash_file_lines_to_keep = [] + for line in hash_file_lines: + if image_file_name in line: + hash_file_lines_to_keep.append(line) + hash_file.close() + if len(hash_file_lines_to_keep) != 1: + raise RuntimeError("Unexpected error while truncating hash file.") + os.remove(path_to_output_dir / hash_file_name) + with open(path_to_output_dir / hash_file_name, "w") as hash_file: + hash_file.writelines(hash_file_lines_to_keep) + + # validate SHA512 checksum + pinfo("Validating file integrity...") + hash_check_result = subprocess.run( + ["sha512sum", "--check", path_to_output_dir / hash_file_name], + capture_output = True, + cwd = path_to_output_dir + ) + + stdout_lines = hash_check_result.stdout.decode("utf-8").split('\n') + stderr_lines = hash_check_result.stderr.decode("utf-8").split('\n') + + if len(stdout_lines) > 0: + for line in stdout_lines: + if len(line) > 0: + pinfo(f"{line}") + + if hash_check_result.returncode != 0: + if len(stderr_lines) > 0: + for line in stderr_lines: + if len(line) > 0: + perror(f"{line}") + raise RuntimeError("SHA512 validation failed.") + + pok("File integrity checks passed.") + + # clean up obsolete files + pinfo("Cleaning up files...") + os.remove(path_to_output_dir / hash_file_name) + os.remove(path_to_output_dir / signature_file_name) + + psuccess("Debian image obtained.") + + return str(path_to_output_dir / image_file_name)