refactor: fix PEP violations
This commit is contained in:
@@ -5,6 +5,7 @@ import sys
|
|||||||
import colorama
|
import colorama
|
||||||
from colorama import Fore, Style
|
from colorama import Fore, Style
|
||||||
|
|
||||||
|
|
||||||
class _Prefix:
|
class _Prefix:
|
||||||
"""The Prefix class represents text which is prepended to text output.
|
"""The Prefix class represents text which is prepended to text output.
|
||||||
|
|
||||||
@@ -39,15 +40,17 @@ class _Prefix:
|
|||||||
|
|
||||||
return longest_prefix_length
|
return longest_prefix_length
|
||||||
|
|
||||||
_prefix_info = _Prefix("INFO", Fore.WHITE)
|
|
||||||
_prefix_ok = _Prefix("OK", Fore.GREEN)
|
_prefix_info = _Prefix("INFO", Fore.WHITE)
|
||||||
|
_prefix_ok = _Prefix("OK", Fore.GREEN)
|
||||||
_prefix_success = _Prefix("SUCCESS", Fore.GREEN)
|
_prefix_success = _Prefix("SUCCESS", Fore.GREEN)
|
||||||
_prefix_debug = _Prefix("DEBUG", Fore.BLUE)
|
_prefix_debug = _Prefix("DEBUG", Fore.BLUE)
|
||||||
_prefix_input = _Prefix("PROMPT", Fore.CYAN)
|
_prefix_input = _Prefix("PROMPT", Fore.CYAN)
|
||||||
_prefix_warning = _Prefix("WARNING", Fore.YELLOW)
|
_prefix_warning = _Prefix("WARNING", Fore.YELLOW)
|
||||||
_prefix_error = _Prefix("ERROR", Fore.RED)
|
_prefix_error = _Prefix("ERROR", Fore.RED)
|
||||||
_prefix_failure = _Prefix("FAILURE", Fore.RED)
|
_prefix_failure = _Prefix("FAILURE", Fore.RED)
|
||||||
|
|
||||||
|
|
||||||
class Printer:
|
class Printer:
|
||||||
"""Printer objects print prefixed output to the specified output stream."""
|
"""Printer objects print prefixed output to the specified output stream."""
|
||||||
|
|
||||||
@@ -98,7 +101,8 @@ class Printer:
|
|||||||
|
|
||||||
Printer._num_of_active_printers -= 1
|
Printer._num_of_active_printers -= 1
|
||||||
|
|
||||||
def _print_prefixed_output(self, prefix, *args, color_enabled=True, **kwargs):
|
def _print_prefixed_output(self, prefix, *args, color_enabled=True,
|
||||||
|
**kwargs):
|
||||||
"""Prints the output with the specified prefix prepended.
|
"""Prints the output with the specified prefix prepended.
|
||||||
|
|
||||||
This method works the same as the standard library print() function,
|
This method works the same as the standard library print() function,
|
||||||
|
|||||||
@@ -18,13 +18,14 @@ def gpg_signature_is_valid(
|
|||||||
path_to_data_file,
|
path_to_data_file,
|
||||||
fallback_keyserver_name
|
fallback_keyserver_name
|
||||||
):
|
):
|
||||||
""" Validates a PGP signature for a data file. The detached signature is
|
"""Validates a PGP signature for a data file.
|
||||||
provided as plaintext (UTF8) in the specified file.
|
|
||||||
|
|
||||||
If the discovered signing key is unknown to gpg on this system for the
|
The detached signature must be provided as plaintext (UTF8) in the
|
||||||
invoking user, and if 'fallback_keyserver_name' is not None, an attempt
|
specified signature file.
|
||||||
is made to import the key from the specified keyserver using its ID
|
If the discovered signing key is unknown to gpg on this system for the
|
||||||
after prompting the user for permission to import the key.
|
invoking user, and if 'fallback_keyserver_name' is not None, an attempt is
|
||||||
|
made to import the key from the specified keyserver using its ID after
|
||||||
|
prompting the user for permission to import the key.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@@ -45,6 +46,7 @@ def gpg_signature_is_valid(
|
|||||||
False : bool
|
False : bool
|
||||||
If the signature is invalid or could not be validated.
|
If the signature is invalid or could not be validated.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
path_to_signature_file = Path(path_to_signature_file)
|
path_to_signature_file = Path(path_to_signature_file)
|
||||||
path_to_data_file = Path(path_to_data_file)
|
path_to_data_file = Path(path_to_data_file)
|
||||||
|
|
||||||
@@ -65,15 +67,15 @@ def gpg_signature_is_valid(
|
|||||||
f"Not a valid PGP signature file: '{path_to_signature_file}'."
|
f"Not a valid PGP signature file: '{path_to_signature_file}'."
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
p.info(f"Signature mentions a key with ID "\
|
p.info(f"Signature mentions a key with ID "
|
||||||
f"{verification.key_id} and fingerprint "\
|
f"{verification.key_id} and fingerprint "
|
||||||
f"{verification.fingerprint}."
|
f"{verification.fingerprint}."
|
||||||
)
|
)
|
||||||
|
|
||||||
if verification.valid:
|
if verification.valid:
|
||||||
p.ok(f"GPG signature is valid with trustlevel "\
|
p.ok(f"GPG signature is valid with trustlevel "
|
||||||
f"'{verification.trust_level}'."
|
f"'{verification.trust_level}'."
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# this case commonly occurrs when the GPG key has not been imported
|
# this case commonly occurrs when the GPG key has not been imported
|
||||||
@@ -84,7 +86,7 @@ def gpg_signature_is_valid(
|
|||||||
key_will_be_imported = None
|
key_will_be_imported = None
|
||||||
while key_will_be_imported is None:
|
while key_will_be_imported is None:
|
||||||
key_will_be_imported = userinput.prompt_yes_or_no(
|
key_will_be_imported = userinput.prompt_yes_or_no(
|
||||||
f"[PROMPT] Do you want to import the GPG key from "\
|
f"[PROMPT] Do you want to import the GPG key from "
|
||||||
f"'{fallback_keyserver_name}'?"
|
f"'{fallback_keyserver_name}'?"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -121,9 +123,9 @@ def gpg_signature_is_valid(
|
|||||||
)
|
)
|
||||||
|
|
||||||
if verification.valid:
|
if verification.valid:
|
||||||
p.ok(f"GPG signature is valid with trustlevel "\
|
p.ok(f"GPG signature is valid with trustlevel "
|
||||||
f"'{verification.trust_level}'."
|
f"'{verification.trust_level}'."
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
p.error("GPG signature is not valid!!!")
|
p.error("GPG signature is not valid!!!")
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
def extract_iso(path_to_output_dir, path_to_input_file):
|
def extract_iso(path_to_output_dir, path_to_input_file):
|
||||||
"""Extracts the input ISO-file into the specified directory using 'bsdtar'.
|
"""Extracts the input ISO-file into the specified directory using 'bsdtar'.
|
||||||
|
|
||||||
@@ -42,7 +43,7 @@ def extract_iso(path_to_output_dir, path_to_input_file):
|
|||||||
try:
|
try:
|
||||||
subprocess.run(["command", "-v", "bsdtar"], check=True)
|
subprocess.run(["command", "-v", "bsdtar"], check=True)
|
||||||
except subprocess.CalledProcessError:
|
except subprocess.CalledProcessError:
|
||||||
raise RuntimeError(f"Program not found in $PATH: 'bsdtar'.")
|
raise RuntimeError("Program not found in $PATH: 'bsdtar'.")
|
||||||
|
|
||||||
# extract file to destination
|
# extract file to destination
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
""" This module is responsible for (interactive) user input handling. """
|
"""This module provides functions for (interactive) user input handling."""
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
|
||||||
@@ -10,11 +10,11 @@ import clibella
|
|||||||
p = clibella.Printer()
|
p = clibella.Printer()
|
||||||
|
|
||||||
|
|
||||||
def prompt_yes_or_no(question, ask_until_valid = False):
|
def prompt_yes_or_no(question, ask_until_valid=False):
|
||||||
""" Prompts the user with the specified yes/no question.
|
"""Prompts the user for an answer to the specified yes/no question.
|
||||||
|
|
||||||
If 'ask_until_valid' is True, keeps repeating the prompt until the user
|
If 'ask_until_valid' is True, keeps repeating the prompt until the user
|
||||||
provides recognizable input to answer the question.
|
provides recognizable input to answer the question.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@@ -35,10 +35,11 @@ def prompt_yes_or_no(question, ask_until_valid = False):
|
|||||||
If 'ask_until_valid' is False and the user has provided an ambiguous
|
If 'ask_until_valid' is False and the user has provided an ambiguous
|
||||||
response to the prompt.
|
response to the prompt.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
user_input_is_valid = False
|
user_input_is_valid = False
|
||||||
|
|
||||||
regex_yes = re.compile(r"^(y)$|^(Y)$|^(YES)$|^(Yes)$|^(yes)$")
|
regex_yes = re.compile(r"^(y)$|^(Y)$|^(YES)$|^(Yes)$|^(yes)$")
|
||||||
regex_no = re.compile(r"^(n)$|^(N)$|^(NO)$|^(No)$|^(no)$")
|
regex_no = re.compile(r"^(n)$|^(N)$|^(NO)$|^(No)$|^(no)$")
|
||||||
|
|
||||||
while(not user_input_is_valid):
|
while(not user_input_is_valid):
|
||||||
user_input = p.input(f"{question} (Yes/No): ")
|
user_input = p.input(f"{question} (Yes/No): ")
|
||||||
|
|||||||
@@ -2,9 +2,10 @@
|
|||||||
|
|
||||||
"""This module downloads Debian Linux installation images from the web."""
|
"""This module downloads Debian Linux installation images from the web."""
|
||||||
|
|
||||||
import os, re, subprocess, zipfile
|
import os
|
||||||
|
import re
|
||||||
|
import subprocess
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from datetime import date
|
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
@@ -17,9 +18,11 @@ import gpgverify
|
|||||||
p = clibella.Printer()
|
p = clibella.Printer()
|
||||||
|
|
||||||
|
|
||||||
def download_file(path_to_output_file, url_to_file, show_progress = False):
|
def download_file(path_to_output_file, url_to_file, show_progress=False):
|
||||||
""" Downloads the file at the specified URL via HTTP and saves it as the
|
"""Downloads the file at the input URL to the specified path.
|
||||||
specified output file. Optionally, displays a nice status bar.
|
|
||||||
|
The file is downloaded via HTTP/HTTPS and saved to the specified path.
|
||||||
|
Optionally, displays a nice status bar.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@@ -70,18 +73,21 @@ def download_file(path_to_output_file, url_to_file, show_progress = False):
|
|||||||
|
|
||||||
p.ok(f"Received '{output_file_name}'.")
|
p.ok(f"Received '{output_file_name}'.")
|
||||||
|
|
||||||
|
|
||||||
def debian_obtain_image(path_to_output_dir):
|
def debian_obtain_image(path_to_output_dir):
|
||||||
""" Obtains the latest official debian installation image, the SHA512SUMS
|
"""Downloads the latest debian installation image and its hashes.
|
||||||
file it is listed in, as well as the GPG signature for the SHA512SUMS
|
|
||||||
file.
|
|
||||||
|
|
||||||
File are obtained from the debian.org HTTPS mirrors and stored in the
|
The image file, the SHA512SUMS file it is listed in, as well as the GPG
|
||||||
specified directory. The obtained image is the FOSS-only, stable x64
|
signature for the SHA512SUMS file are downloaded from the debian.org HTTPS
|
||||||
build.
|
mirrors and written into the specified output directory.
|
||||||
|
The obtained image is for the FOSS-only, stable x64 build.
|
||||||
|
|
||||||
First, the GPG signature of the hash is validated. Then, the hash of
|
Once the image file is downloaded, the GPG signature of the hash is
|
||||||
the image file is checked. If either check fails, an exception is
|
validated. Then, the hash of the image file is verified. If either check
|
||||||
raised.
|
fails, an exception is raised.
|
||||||
|
|
||||||
|
If the verification suceeds, the hash file and GPG signature file are
|
||||||
|
removed again.
|
||||||
|
|
||||||
Parameters
|
Parameters
|
||||||
----------
|
----------
|
||||||
@@ -115,29 +121,30 @@ def debian_obtain_image(path_to_output_dir):
|
|||||||
name="a",
|
name="a",
|
||||||
string=re.compile(r"debian-[0-9.]*-amd64-netinst.iso"))
|
string=re.compile(r"debian-[0-9.]*-amd64-netinst.iso"))
|
||||||
if len(image_file_links) != 1:
|
if len(image_file_links) != 1:
|
||||||
raise RuntimeError("Failed to find an exact match while looking for "\
|
raise RuntimeError("Failed to find an exact match while looking for "
|
||||||
"a link to the latest debian image file.")
|
"a link to the latest debian image file."
|
||||||
|
)
|
||||||
image_file_name = image_file_links[0]['href']
|
image_file_name = image_file_links[0]['href']
|
||||||
image_file_url = releases_url + image_file_name
|
image_file_url = releases_url + image_file_name
|
||||||
|
|
||||||
# download the SHA512SUMS file
|
# download the SHA512SUMS file
|
||||||
download_file(path_to_output_dir / hash_file_name, hash_file_url)
|
download_file(path_to_output_dir/hash_file_name, hash_file_url)
|
||||||
# download the GPG signature file
|
# download the GPG signature file
|
||||||
download_file(path_to_output_dir / signature_file_name, signature_file_url)
|
download_file(path_to_output_dir/signature_file_name, signature_file_url)
|
||||||
|
|
||||||
# verify GPG signature of hash file
|
# verify GPG signature of hash file
|
||||||
if not gpgverify.gpg_signature_is_valid(
|
if not gpgverify.gpg_signature_is_valid(
|
||||||
path_to_output_dir / signature_file_name,
|
path_to_output_dir/signature_file_name,
|
||||||
path_to_output_dir / hash_file_name,
|
path_to_output_dir/hash_file_name,
|
||||||
"keyring.debian.org"
|
"keyring.debian.org"
|
||||||
):
|
):
|
||||||
raise RuntimeError("GPG signature verification failed!")
|
raise RuntimeError("GPG signature verification failed!")
|
||||||
|
|
||||||
# download the image file
|
# download the image file
|
||||||
download_file(path_to_output_dir / image_file_name, image_file_url, True)
|
download_file(path_to_output_dir/image_file_name, image_file_url, True)
|
||||||
|
|
||||||
# remove unwanted lines from hash file
|
# remove unwanted lines from hash file
|
||||||
hash_file = open(path_to_output_dir / hash_file_name, "r")
|
hash_file = open(path_to_output_dir/hash_file_name, "r")
|
||||||
hash_file_lines = hash_file.readlines()
|
hash_file_lines = hash_file.readlines()
|
||||||
hash_file_lines_to_keep = []
|
hash_file_lines_to_keep = []
|
||||||
for line in hash_file_lines:
|
for line in hash_file_lines:
|
||||||
@@ -146,16 +153,16 @@ def debian_obtain_image(path_to_output_dir):
|
|||||||
hash_file.close()
|
hash_file.close()
|
||||||
if len(hash_file_lines_to_keep) != 1:
|
if len(hash_file_lines_to_keep) != 1:
|
||||||
raise RuntimeError("Unexpected error while truncating hash file.")
|
raise RuntimeError("Unexpected error while truncating hash file.")
|
||||||
os.remove(path_to_output_dir / hash_file_name)
|
os.remove(path_to_output_dir/hash_file_name)
|
||||||
with open(path_to_output_dir / hash_file_name, "w") as hash_file:
|
with open(path_to_output_dir/hash_file_name, "w") as hash_file:
|
||||||
hash_file.writelines(hash_file_lines_to_keep)
|
hash_file.writelines(hash_file_lines_to_keep)
|
||||||
|
|
||||||
# validate SHA512 checksum
|
# validate SHA512 checksum
|
||||||
p.info("Validating file integrity...")
|
p.info("Validating file integrity...")
|
||||||
hash_check_result = subprocess.run(
|
hash_check_result = subprocess.run(
|
||||||
["sha512sum", "--check", path_to_output_dir / hash_file_name],
|
["sha512sum", "--check", path_to_output_dir/hash_file_name],
|
||||||
capture_output = True,
|
capture_output=True,
|
||||||
cwd = path_to_output_dir
|
cwd=path_to_output_dir
|
||||||
)
|
)
|
||||||
|
|
||||||
stdout_lines = hash_check_result.stdout.decode("utf-8").split('\n')
|
stdout_lines = hash_check_result.stdout.decode("utf-8").split('\n')
|
||||||
@@ -177,9 +184,9 @@ def debian_obtain_image(path_to_output_dir):
|
|||||||
|
|
||||||
# clean up obsolete files
|
# clean up obsolete files
|
||||||
p.info("Cleaning up files...")
|
p.info("Cleaning up files...")
|
||||||
os.remove(path_to_output_dir / hash_file_name)
|
os.remove(path_to_output_dir/hash_file_name)
|
||||||
os.remove(path_to_output_dir / signature_file_name)
|
os.remove(path_to_output_dir/signature_file_name)
|
||||||
|
|
||||||
p.success("Debian image obtained.")
|
p.success("Debian image obtained.")
|
||||||
|
|
||||||
return str(path_to_output_dir / image_file_name)
|
return str(path_to_output_dir/image_file_name)
|
||||||
|
|||||||
Reference in New Issue
Block a user