import image procurement code into project

This commit is contained in:
ulinja
2022-04-20 22:45:44 +02:00
parent ffa78ff335
commit 058051f1e9
3 changed files with 355 additions and 0 deletions

126
udib/gpgverify.py Normal file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
""" This module validates GPG signatures for downloaded files. """
from pathlib import Path
import gnupg
import userinput
from printmsg import perror, pfailure, pinfo, pok, psuccess, pwarning
def gpg_signature_is_valid(
path_to_signature_file,
path_to_data_file,
fallback_keyserver_name
):
""" Validates a PGP signature for a data file. The detached signature is
provided as plaintext (UTF8) in the specified file.
If the discovered signing key is unknown to gpg on this system for the
invoking user, and if 'fallback_keyserver_name' is not None, an attempt
is made to import the key from the specified keyserver using its ID
after prompting the user for permission to import the key.
Parameters
----------
path_to_signature_file : str or pathlike object
Path to a detached PGP signature stored in a standalone file.
Example value: "/path/to/SHA256SUMS.sig"
path_to_data_file : str or pathlike object
Path to a signed data file, for which the signature is to be verified.
Example value: "/path/to/SHA256SUMS"
fallback_keyserver_name : str
FQDN of a keyserver from which to import unknown public keys.
Example value: "keyring.debian.org"
Returns
-------
True : bool
If the signature is valid.
False : bool
If the signature is invalid or could not be validated.
"""
path_to_signature_file = Path(path_to_signature_file)
path_to_data_file = Path(path_to_data_file)
gpg = gnupg.GPG()
gpg.encoding = "utf-8"
pinfo("Validating signature...")
with open(path_to_signature_file, "rb") as signature_file:
verification = gpg.verify_file(
signature_file,
path_to_data_file,
close_file=False
)
# check if a key and fingerprint were found in the signature file
if verification.key_id is None or verification.fingerprint is None:
raise ValueError(
f"Not a valid PGP signature file: '{path_to_signature_file}'."
)
else:
pinfo(f"Signature mentions a key with ID "\
f"{verification.key_id} and fingerprint "\
f"{verification.fingerprint}."
)
if verification.valid:
pok(f"GPG signature is valid with trustlevel "\
f"'{verification.trust_level}'."
)
return True
# this case commonly occurrs when the GPG key has not been imported
if verification.status == "no public key":
pwarning("Could not find the public GPG key locally!")
# prompt user until answer is unambiguous
key_will_be_imported = None
while key_will_be_imported is None:
key_will_be_imported = userinput.prompt_yes_or_no(
f"[PROMPT] Do you want to import the GPG key from "\
f"'{fallback_keyserver_name}'?"
)
if key_will_be_imported is None:
perror("Unrecognized input. Please try again.")
if not key_will_be_imported:
pwarning("Aborting without importing key.")
return False
# import missing key
pinfo("Importing key...")
import_result = gpg.recv_keys(
fallback_keyserver_name, verification.key_id
)
if import_result.count < 1:
perror("Failed to import key.")
return False
# display some of gpg's output
gpg_output = import_result.stderr.split('\n')
for line in gpg_output:
if line.startswith("gpg: "):
pinfo(f"{line}")
# validate signature again
pinfo("Validating signature...")
with open(path_to_signature_file, "rb") as signature_file:
verification = gpg.verify_file(
signature_file,
path_to_data_file,
close_file=False
)
if verification.valid:
pok(f"GPG signature is valid with trustlevel "\
f"'{verification.trust_level}'."
)
return True
else:
perror("GPG signature is not valid!!!")
return False

48
udib/userinput.py Normal file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/env python3
""" This module is responsible for (interactive) user input handling. """
import re
from printmsg import pinput
def prompt_yes_or_no(question, ask_until_valid = False):
""" Prompts the user with the specified yes/no question.
If 'ask_until_valid' is True, keeps repeating the prompt until the user
provides recognizable input to answer the question.
Parameters
----------
question : str
The question to ask the user while prompting. The string is suffixed
with " (Yes/No) ".
ask_until_valid : bool
If this is set to True and the user provides an ambiguous answer, keeps
prompting indefinitely until an unambiguous answer is read.
Returns
-------
True : bool
If the user has answered 'yes'.
False : bool
If the user has answered 'no'.
None : None
If 'ask_until_valid' is False and the user has provided an ambiguous
response to the prompt.
"""
user_input_is_valid = False
regex_yes = re.compile(r"^(y)$|^(Y)$|^(YES)$|^(Yes)$|^(yes)$")
regex_no = re.compile(r"^(n)$|^(N)$|^(NO)$|^(No)$|^(no)$")
while(not user_input_is_valid):
user_input = pinput(f"{question} (Yes/No): ")
if (regex_yes.match(user_input)):
return True
elif (regex_no.match(user_input)):
return False
elif (not ask_until_valid):
return None

181
udib/webdownload.py Normal file
View File

@@ -0,0 +1,181 @@
#!/usr/bin/env python3
"""This module downloads Debian Linux installation images from the web."""
import os, re, subprocess, zipfile
from pathlib import Path
from datetime import date
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
from printmsg import perror, pfailure, pinfo, pok, psuccess, pwarning
import gpgverify
def download_file(path_to_output_file, url_to_file, show_progress = False):
""" Downloads the file at the specified URL via HTTP and saves it as the
specified output file. Optionally, displays a nice status bar.
Parameters
----------
path_to_output_file : str or pathlike object
Path to a file as which the downloaded file is saved.
url_to_file : str
URL to the file to be downloaded.
show_progress : bool
When True, a progress bar is displayed on StdOut indicating the
progress of the download.
"""
path_to_output_file = Path(path_to_output_file).resolve()
if not path_to_output_file.parent.is_dir():
raise FileNotFoundError(
f"No such directory: '{path_to_output_file.parent}'."
)
if path_to_output_file.exists():
raise FileExistsError(
f"File already exists: '{path_to_output_file}'"
)
output_file_name = path_to_output_file.name
with open(path_to_output_file, "wb") as output_file:
pinfo(f"Downloading '{output_file_name}'...")
file_response = requests.get(url_to_file, stream=True)
total_length = file_response.headers.get('content-length')
if total_length is None: # no content length header
output_file.write(response.content)
else:
if (show_progress):
total_length = int(total_length)
progress_bar = tqdm(
total=total_length,
unit="B",
unit_scale=True,
unit_divisor=1024
)
for data in file_response.iter_content(chunk_size=4096):
output_file.write(data)
if (show_progress):
progress_bar.update(len(data))
if (show_progress):
progress_bar.close()
pok(f"Received '{output_file_name}'.")
def debian_obtain_image(path_to_output_dir):
""" Obtains the latest official debian installation image, the SHA512SUMS
file it is listed in, as well as the GPG signature for the SHA512SUMS
file.
File are obtained from the debian.org HTTPS mirrors and stored in the
specified directory. The obtained image is the FOSS-only, stable x64
build.
First, the GPG signature of the hash is validated. Then, the hash of
the image file is checked. If either check fails, an exception is
raised.
Parameters
----------
path_to_output_dir : str or pathlike object
Path to the directory to which all downloaded files will be saved.
Returns
-------
path_to_image_file : str
Full path to the obtained and validated image file.
"""
pinfo("Obtaining and verifying the latest Debian stable image...")
path_to_output_dir = Path(path_to_output_dir)
if not path_to_output_dir.is_dir():
raise ValueError(f"No such directory: '{path_to_output_dir}'")
releases_url = "https://cdimage.debian.org/debian-cd/current/amd64/iso-cd/"
releases_page = requests.get(releases_url)
soup = BeautifulSoup(releases_page.content, "html.parser")
hash_file_name = "SHA512SUMS"
hash_file_url = releases_url + hash_file_name
signature_file_name = "SHA512SUMS.sign"
signature_file_url = releases_url + signature_file_name
# find the URL to the latest stable x64 image
image_file_links = soup.find_all(
name="a",
string=re.compile(r"debian-[0-9.]*-amd64-netinst.iso"))
if len(image_file_links) != 1:
raise RuntimeError("Failed to find an exact match while looking for "\
"a link to the latest debian image file.")
image_file_name = image_file_links[0]['href']
image_file_url = releases_url + image_file_name
# download the SHA512SUMS file
download_file(path_to_output_dir / hash_file_name, hash_file_url)
# download the GPG signature file
download_file(path_to_output_dir / signature_file_name, signature_file_url)
# verify GPG signature of hash file
if not gpgverify.gpg_signature_is_valid(
path_to_output_dir / signature_file_name,
path_to_output_dir / hash_file_name,
"keyring.debian.org"
):
raise RuntimeError("GPG signature verification failed!")
# download the image file
download_file(path_to_output_dir / image_file_name, image_file_url, True)
# remove unwanted lines from hash file
hash_file = open(path_to_output_dir / hash_file_name, "r")
hash_file_lines = hash_file.readlines()
hash_file_lines_to_keep = []
for line in hash_file_lines:
if image_file_name in line:
hash_file_lines_to_keep.append(line)
hash_file.close()
if len(hash_file_lines_to_keep) != 1:
raise RuntimeError("Unexpected error while truncating hash file.")
os.remove(path_to_output_dir / hash_file_name)
with open(path_to_output_dir / hash_file_name, "w") as hash_file:
hash_file.writelines(hash_file_lines_to_keep)
# validate SHA512 checksum
pinfo("Validating file integrity...")
hash_check_result = subprocess.run(
["sha512sum", "--check", path_to_output_dir / hash_file_name],
capture_output = True,
cwd = path_to_output_dir
)
stdout_lines = hash_check_result.stdout.decode("utf-8").split('\n')
stderr_lines = hash_check_result.stderr.decode("utf-8").split('\n')
if len(stdout_lines) > 0:
for line in stdout_lines:
if len(line) > 0:
pinfo(f"{line}")
if hash_check_result.returncode != 0:
if len(stderr_lines) > 0:
for line in stderr_lines:
if len(line) > 0:
perror(f"{line}")
raise RuntimeError("SHA512 validation failed.")
pok("File integrity checks passed.")
# clean up obsolete files
pinfo("Cleaning up files...")
os.remove(path_to_output_dir / hash_file_name)
os.remove(path_to_output_dir / signature_file_name)
psuccess("Debian image obtained.")
return str(path_to_output_dir / image_file_name)