feat: ajouts get testing et corrections
This commit is contained in:
354
env/lib/python3.12/site-packages/tqdm/utils.py
vendored
Normal file
354
env/lib/python3.12/site-packages/tqdm/utils.py
vendored
Normal file
@@ -0,0 +1,354 @@
|
||||
"""
|
||||
General helpers required for `tqdm.std`.
|
||||
"""
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from functools import wraps
|
||||
from warnings import warn
|
||||
from weakref import proxy
|
||||
|
||||
# py2/3 compat
|
||||
try:
|
||||
_range = xrange
|
||||
except NameError:
|
||||
_range = range
|
||||
|
||||
try:
|
||||
_unich = unichr
|
||||
except NameError:
|
||||
_unich = chr
|
||||
|
||||
try:
|
||||
_unicode = unicode
|
||||
except NameError:
|
||||
_unicode = str
|
||||
|
||||
try:
|
||||
_basestring = basestring
|
||||
except NameError:
|
||||
_basestring = str
|
||||
|
||||
CUR_OS = sys.platform
|
||||
IS_WIN = any(CUR_OS.startswith(i) for i in ['win32', 'cygwin'])
|
||||
IS_NIX = any(CUR_OS.startswith(i) for i in ['aix', 'linux', 'darwin'])
|
||||
RE_ANSI = re.compile(r"\x1b\[[;\d]*[A-Za-z]")
|
||||
|
||||
try:
|
||||
if IS_WIN:
|
||||
import colorama
|
||||
else:
|
||||
raise ImportError
|
||||
except ImportError:
|
||||
colorama = None
|
||||
else:
|
||||
try:
|
||||
colorama.init(strip=False)
|
||||
except TypeError:
|
||||
colorama.init()
|
||||
|
||||
|
||||
class FormatReplace(object):
|
||||
"""
|
||||
>>> a = FormatReplace('something')
|
||||
>>> "{:5d}".format(a)
|
||||
'something'
|
||||
""" # NOQA: P102
|
||||
def __init__(self, replace=''):
|
||||
self.replace = replace
|
||||
self.format_called = 0
|
||||
|
||||
def __format__(self, _):
|
||||
self.format_called += 1
|
||||
return self.replace
|
||||
|
||||
|
||||
class Comparable(object):
|
||||
"""Assumes child has self._comparable attr/@property"""
|
||||
def __lt__(self, other):
|
||||
return self._comparable < other._comparable
|
||||
|
||||
def __le__(self, other):
|
||||
return (self < other) or (self == other)
|
||||
|
||||
def __eq__(self, other):
|
||||
return self._comparable == other._comparable
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
||||
|
||||
def __gt__(self, other):
|
||||
return not self <= other
|
||||
|
||||
def __ge__(self, other):
|
||||
return not self < other
|
||||
|
||||
|
||||
class ObjectWrapper(object):
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._wrapped, name)
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
return setattr(self._wrapped, name, value)
|
||||
|
||||
def wrapper_getattr(self, name):
|
||||
"""Actual `self.getattr` rather than self._wrapped.getattr"""
|
||||
try:
|
||||
return object.__getattr__(self, name)
|
||||
except AttributeError: # py2
|
||||
return getattr(self, name)
|
||||
|
||||
def wrapper_setattr(self, name, value):
|
||||
"""Actual `self.setattr` rather than self._wrapped.setattr"""
|
||||
return object.__setattr__(self, name, value)
|
||||
|
||||
def __init__(self, wrapped):
|
||||
"""
|
||||
Thin wrapper around a given object
|
||||
"""
|
||||
self.wrapper_setattr('_wrapped', wrapped)
|
||||
|
||||
|
||||
class SimpleTextIOWrapper(ObjectWrapper):
|
||||
"""
|
||||
Change only `.write()` of the wrapped object by encoding the passed
|
||||
value and passing the result to the wrapped object's `.write()` method.
|
||||
"""
|
||||
# pylint: disable=too-few-public-methods
|
||||
def __init__(self, wrapped, encoding):
|
||||
super(SimpleTextIOWrapper, self).__init__(wrapped)
|
||||
self.wrapper_setattr('encoding', encoding)
|
||||
|
||||
def write(self, s):
|
||||
"""
|
||||
Encode `s` and pass to the wrapped object's `.write()` method.
|
||||
"""
|
||||
return self._wrapped.write(s.encode(self.wrapper_getattr('encoding')))
|
||||
|
||||
def __eq__(self, other):
|
||||
return self._wrapped == getattr(other, '_wrapped', other)
|
||||
|
||||
|
||||
class DisableOnWriteError(ObjectWrapper):
|
||||
"""
|
||||
Disable the given `tqdm_instance` upon `write()` or `flush()` errors.
|
||||
"""
|
||||
@staticmethod
|
||||
def disable_on_exception(tqdm_instance, func):
|
||||
"""
|
||||
Quietly set `tqdm_instance.miniters=inf` if `func` raises `errno=5`.
|
||||
"""
|
||||
tqdm_instance = proxy(tqdm_instance)
|
||||
|
||||
def inner(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except OSError as e:
|
||||
if e.errno != 5:
|
||||
raise
|
||||
try:
|
||||
tqdm_instance.miniters = float('inf')
|
||||
except ReferenceError:
|
||||
pass
|
||||
except ValueError as e:
|
||||
if 'closed' not in str(e):
|
||||
raise
|
||||
try:
|
||||
tqdm_instance.miniters = float('inf')
|
||||
except ReferenceError:
|
||||
pass
|
||||
return inner
|
||||
|
||||
def __init__(self, wrapped, tqdm_instance):
|
||||
super(DisableOnWriteError, self).__init__(wrapped)
|
||||
if hasattr(wrapped, 'write'):
|
||||
self.wrapper_setattr(
|
||||
'write', self.disable_on_exception(tqdm_instance, wrapped.write))
|
||||
if hasattr(wrapped, 'flush'):
|
||||
self.wrapper_setattr(
|
||||
'flush', self.disable_on_exception(tqdm_instance, wrapped.flush))
|
||||
|
||||
def __eq__(self, other):
|
||||
return self._wrapped == getattr(other, '_wrapped', other)
|
||||
|
||||
|
||||
class CallbackIOWrapper(ObjectWrapper):
|
||||
def __init__(self, callback, stream, method="read"):
|
||||
"""
|
||||
Wrap a given `file`-like object's `read()` or `write()` to report
|
||||
lengths to the given `callback`
|
||||
"""
|
||||
super(CallbackIOWrapper, self).__init__(stream)
|
||||
func = getattr(stream, method)
|
||||
if method == "write":
|
||||
@wraps(func)
|
||||
def write(data, *args, **kwargs):
|
||||
res = func(data, *args, **kwargs)
|
||||
callback(len(data))
|
||||
return res
|
||||
self.wrapper_setattr('write', write)
|
||||
elif method == "read":
|
||||
@wraps(func)
|
||||
def read(*args, **kwargs):
|
||||
data = func(*args, **kwargs)
|
||||
callback(len(data))
|
||||
return data
|
||||
self.wrapper_setattr('read', read)
|
||||
else:
|
||||
raise KeyError("Can only wrap read/write methods")
|
||||
|
||||
|
||||
def _is_utf(encoding):
|
||||
try:
|
||||
u'\u2588\u2589'.encode(encoding)
|
||||
except UnicodeEncodeError:
|
||||
return False
|
||||
except Exception:
|
||||
try:
|
||||
return encoding.lower().startswith('utf-') or ('U8' == encoding)
|
||||
except Exception:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def _supports_unicode(fp):
|
||||
try:
|
||||
return _is_utf(fp.encoding)
|
||||
except AttributeError:
|
||||
return False
|
||||
|
||||
|
||||
def _is_ascii(s):
|
||||
if isinstance(s, str):
|
||||
for c in s:
|
||||
if ord(c) > 255:
|
||||
return False
|
||||
return True
|
||||
return _supports_unicode(s)
|
||||
|
||||
|
||||
def _screen_shape_wrapper(): # pragma: no cover
|
||||
"""
|
||||
Return a function which returns console dimensions (width, height).
|
||||
Supported: linux, osx, windows, cygwin.
|
||||
"""
|
||||
_screen_shape = None
|
||||
if IS_WIN:
|
||||
_screen_shape = _screen_shape_windows
|
||||
if _screen_shape is None:
|
||||
_screen_shape = _screen_shape_tput
|
||||
if IS_NIX:
|
||||
_screen_shape = _screen_shape_linux
|
||||
return _screen_shape
|
||||
|
||||
|
||||
def _screen_shape_windows(fp): # pragma: no cover
|
||||
try:
|
||||
import struct
|
||||
from ctypes import create_string_buffer, windll
|
||||
from sys import stdin, stdout
|
||||
|
||||
io_handle = -12 # assume stderr
|
||||
if fp == stdin:
|
||||
io_handle = -10
|
||||
elif fp == stdout:
|
||||
io_handle = -11
|
||||
|
||||
h = windll.kernel32.GetStdHandle(io_handle)
|
||||
csbi = create_string_buffer(22)
|
||||
res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
|
||||
if res:
|
||||
(_bufx, _bufy, _curx, _cury, _wattr, left, top, right, bottom,
|
||||
_maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)
|
||||
return right - left, bottom - top # +1
|
||||
except Exception: # nosec
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def _screen_shape_tput(*_): # pragma: no cover
|
||||
"""cygwin xterm (windows)"""
|
||||
try:
|
||||
import shlex
|
||||
from subprocess import check_call # nosec
|
||||
return [int(check_call(shlex.split('tput ' + i))) - 1
|
||||
for i in ('cols', 'lines')]
|
||||
except Exception: # nosec
|
||||
pass
|
||||
return None, None
|
||||
|
||||
|
||||
def _screen_shape_linux(fp): # pragma: no cover
|
||||
|
||||
try:
|
||||
from array import array
|
||||
from fcntl import ioctl
|
||||
from termios import TIOCGWINSZ
|
||||
except ImportError:
|
||||
return None, None
|
||||
else:
|
||||
try:
|
||||
rows, cols = array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[:2]
|
||||
return cols, rows
|
||||
except Exception:
|
||||
try:
|
||||
return [int(os.environ[i]) - 1 for i in ("COLUMNS", "LINES")]
|
||||
except (KeyError, ValueError):
|
||||
return None, None
|
||||
|
||||
|
||||
def _environ_cols_wrapper(): # pragma: no cover
|
||||
"""
|
||||
Return a function which returns console width.
|
||||
Supported: linux, osx, windows, cygwin.
|
||||
"""
|
||||
warn("Use `_screen_shape_wrapper()(file)[0]` instead of"
|
||||
" `_environ_cols_wrapper()(file)`", DeprecationWarning, stacklevel=2)
|
||||
shape = _screen_shape_wrapper()
|
||||
if not shape:
|
||||
return None
|
||||
|
||||
@wraps(shape)
|
||||
def inner(fp):
|
||||
return shape(fp)[0]
|
||||
|
||||
return inner
|
||||
|
||||
|
||||
def _term_move_up(): # pragma: no cover
|
||||
return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A'
|
||||
|
||||
|
||||
try:
|
||||
# TODO consider using wcswidth third-party package for 0-width characters
|
||||
from unicodedata import east_asian_width
|
||||
except ImportError:
|
||||
_text_width = len
|
||||
else:
|
||||
def _text_width(s):
|
||||
return sum(2 if east_asian_width(ch) in 'FW' else 1 for ch in _unicode(s))
|
||||
|
||||
|
||||
def disp_len(data):
|
||||
"""
|
||||
Returns the real on-screen length of a string which may contain
|
||||
ANSI control codes and wide chars.
|
||||
"""
|
||||
return _text_width(RE_ANSI.sub('', data))
|
||||
|
||||
|
||||
def disp_trim(data, length):
|
||||
"""
|
||||
Trim a string which may contain ANSI control characters.
|
||||
"""
|
||||
if len(data) == disp_len(data):
|
||||
return data[:length]
|
||||
|
||||
ansi_present = bool(RE_ANSI.search(data))
|
||||
while disp_len(data) > length: # carefully delete one char at a time
|
||||
data = data[:-1]
|
||||
if ansi_present and bool(RE_ANSI.search(data)):
|
||||
# assume ANSI reset is required
|
||||
return data if data.endswith("\033[0m") else data + "\033[0m"
|
||||
return data
|
||||
Reference in New Issue
Block a user