""" Open compressed files transparently. """ __all__ = [ "xopen", "PipedGzipReader", "PipedGzipWriter", "PipedIGzipReader", "PipedIGzipWriter", "PipedPigzReader", "PipedPigzWriter", "PipedPBzip2Reader", "PipedPBzip2Writer", "PipedXzReader", "PipedXzWriter", "PipedZstdReader", "PipedZstdWriter", "PipedPythonIsalReader", "PipedPythonIsalWriter", "__version__", ] import gzip import sys import io import os import bz2 import lzma import stat import signal import pathlib import subprocess import tempfile import time from abc import ABC, abstractmethod from subprocess import Popen, PIPE, DEVNULL from typing import Optional, Union, TextIO, AnyStr, IO, List, Set, overload, BinaryIO if sys.version_info >= (3, 8): from typing import Literal else: from typing_extensions import Literal from ._version import version as __version__ # 128K buffer size also used by cat, pigz etc. It is faster than the 8K default. BUFFER_SIZE = max(io.DEFAULT_BUFFER_SIZE, 128 * 1024) try: from isal import igzip, isal_zlib # type: ignore except ImportError: igzip = None isal_zlib = None try: import zstandard # type: ignore except ImportError: zstandard = None try: import fcntl # fcntl.F_SETPIPE_SZ will be available in python 3.10. # https://github.com/python/cpython/pull/21921 # If not available: set it to the correct value for known platforms. if not hasattr(fcntl, "F_SETPIPE_SZ") and sys.platform == "linux": setattr(fcntl, "F_SETPIPE_SZ", 1031) except ImportError: fcntl = None # type: ignore _MAX_PIPE_SIZE_PATH = pathlib.Path("/proc/sys/fs/pipe-max-size") try: _MAX_PIPE_SIZE = int( _MAX_PIPE_SIZE_PATH.read_text(encoding="ascii") ) # type: Optional[int] except OSError: # Catches file not found and permission errors. Possible other errors too. _MAX_PIPE_SIZE = None FilePath = Union[str, bytes, os.PathLike] def _available_cpu_count() -> int: """ Number of available virtual or physical CPUs on this system Adapted from http://stackoverflow.com/a/1006301/715090 """ try: return len(os.sched_getaffinity(0)) except AttributeError: pass import re try: with open("/proc/self/status") as f: status = f.read() m = re.search(r"(?m)^Cpus_allowed:\s*(.*)$", status) if m: res = bin(int(m.group(1).replace(",", ""), 16)).count("1") if res > 0: return res except OSError: pass count = os.cpu_count() return 1 if count is None else count def _set_pipe_size_to_max(fd: int) -> None: """ Set pipe size to maximum on platforms that support it. :param fd: The file descriptor to increase the pipe size for. """ if not hasattr(fcntl, "F_SETPIPE_SZ") or not _MAX_PIPE_SIZE: return try: fcntl.fcntl(fd, fcntl.F_SETPIPE_SZ, _MAX_PIPE_SIZE) # type: ignore except OSError: pass def _can_read_concatenated_gz(program: str) -> bool: """ Check if a concatenated gzip file can be read properly. Not all deflate programs handle this properly. """ fd, temp_path = tempfile.mkstemp(suffix=".gz", prefix="xopen.") try: # Create a concatenated gzip file. gzip.compress recreates the contents # of a gzip file including header and trailer. with open(temp_path, "wb") as temp_file: temp_file.write(gzip.compress(b"AB") + gzip.compress(b"CD")) try: result = subprocess.run( [program, "-c", "-d", temp_path], check=True, stderr=PIPE, stdout=PIPE ) return result.stdout == b"ABCD" except subprocess.CalledProcessError: # Program can't read zip return False finally: os.close(fd) os.remove(temp_path) class Closing(ABC): """ Inherit from this class and implement a close() method to offer context manager functionality. """ def __enter__(self): return self def __exit__(self, *exc_info): self.close() def __del__(self): try: self.close() except Exception: pass @abstractmethod def close(self): """Called when exiting the context manager""" class PipedCompressionWriter(Closing): """ Write Compressed files by running an external process and piping into it. """ def __init__( self, path: FilePath, program_args: List[str], mode="wt", compresslevel: Optional[int] = None, threads_flag: Optional[str] = None, threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): """ mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' compresslevel -- compression level threads_flag -- which flag is used to denote the number of threads in the program. If set to none, program will be called without threads flag. threads (int) -- number of threads. If this is set to None, a reasonable default is used. At the moment, this means that the number of available CPU cores is used, capped at four to avoid creating too many threads. Use 0 to use all available cores. """ if mode not in ("w", "wt", "wb", "a", "at", "ab"): raise ValueError( "Mode is '{}', but it must be 'w', 'wt', 'wb', 'a', 'at' or 'ab'".format( mode ) ) # TODO use a context manager self.outfile = open(path, mode[0] + "b") self.closed: bool = False self.name: str = str(os.fspath(path)) self._mode: str = mode self._program_args: List[str] = program_args self._threads_flag: Optional[str] = threads_flag if threads is None: threads = min(_available_cpu_count(), 4) self._threads = threads try: self.process = self._open_process( mode, compresslevel, threads, self.outfile ) except OSError: self.outfile.close() raise assert self.process.stdin is not None _set_pipe_size_to_max(self.process.stdin.fileno()) if "b" not in mode: self._file = io.TextIOWrapper( self.process.stdin, encoding=encoding, errors=errors, newline=newline ) # type: IO else: self._file = self.process.stdin def __repr__(self): return "{}('{}', mode='{}', program='{}', threads={})".format( self.__class__.__name__, self.name, self._mode, " ".join(self._program_args), self._threads, ) def _open_process( self, mode: str, compresslevel: Optional[int], threads: int, outfile: TextIO, ) -> Popen: program_args: List[str] = self._program_args[:] # prevent list aliasing if threads != 0 and self._threads_flag is not None: program_args += [f"{self._threads_flag}{threads}"] extra_args = [] if "w" in mode and compresslevel is not None: extra_args += ["-" + str(compresslevel)] kwargs = dict(stdin=PIPE, stdout=outfile, stderr=DEVNULL) # Setting close_fds to True in the Popen arguments is necessary due to # . # However, close_fds is not supported on Windows. See # . if sys.platform != "win32": kwargs["close_fds"] = True process = Popen(program_args + extra_args, **kwargs) # type: ignore return process def write(self, arg: AnyStr) -> None: self._file.write(arg) def close(self) -> None: if self.closed: return self.closed = True self._file.close() retcode = self.process.wait() self.outfile.close() if retcode != 0: try: cause = ( f". Possible cause: {os.strerror(retcode)}" if retcode > 1 else "" ) except ValueError: cause = "" raise OSError( "Output process '{}' terminated with exit code {}{}".format( " ".join(self._program_args), retcode, cause, ) ) def __iter__(self): # For compatibility with Pandas, which checks for an __iter__ method # to determine whether an object is file-like. return self def __next__(self): raise io.UnsupportedOperation("not readable") class PipedCompressionReader(Closing): """ Open a pipe to a process for reading a compressed file. """ # This exit code is not interpreted as an error when terminating the process _allowed_exit_code: Optional[int] = -signal.SIGTERM # If this message is printed on stderr on terminating the process, # it is not interpreted as an error _allowed_exit_message: Optional[bytes] = None def __init__( self, path: FilePath, program_args: List[Union[str, bytes]], mode: str = "r", threads_flag: Optional[str] = None, threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): """ Raise an OSError when pigz could not be found. """ if mode not in ("r", "rt", "rb"): raise ValueError( "Mode is '{}', but it must be 'r', 'rt' or 'rb'".format(mode) ) self._program_args = program_args path = os.fspath(path) if isinstance(path, bytes) and sys.platform == "win32": path = path.decode() program_args = program_args + ["-cd", path] if threads_flag is not None: if threads is None: # Single threaded behaviour by default because: # - Using a single thread to read a file is the least unexpected # behaviour. (For users of xopen, who do not know which backend is used.) # - There is quite a substantial overhead (+25% CPU time) when # using multiple threads while there is only a 10% gain in wall # clock time. threads = 1 program_args += [f"{threads_flag}{threads}"] self._threads = threads self.process = Popen(program_args, stdout=PIPE, stderr=PIPE) self.name = path assert self.process.stdout is not None _set_pipe_size_to_max(self.process.stdout.fileno()) self._mode = mode if "b" not in mode: self._file: IO = io.TextIOWrapper( self.process.stdout, encoding=encoding, errors=errors, newline=newline ) else: self._file = self.process.stdout self.closed = False self._wait_for_output_or_process_exit() self._raise_if_error() def __repr__(self): return "{}('{}', mode='{}', program='{}', threads={})".format( self.__class__.__name__, self.name, self._mode, " ".join(self._program_args), self._threads, ) def close(self) -> None: if self.closed: return self.closed = True retcode = self.process.poll() check_allowed_code_and_message = False if retcode is None: # still running self.process.terminate() check_allowed_code_and_message = True _, stderr_message = self.process.communicate() self._file.close() self._raise_if_error(check_allowed_code_and_message, stderr_message) def __iter__(self): return self def __next__(self) -> AnyStr: return self._file.__next__() def _wait_for_output_or_process_exit(self): """ Wait for the process to produce at least some output, or has exited. """ # The program may crash due to a non-existing file, internal error etc. # In that case we need to check. However the 'time-to-crash' differs # between programs. Some crash faster than others. # Therefore we peek the first character(s) of stdout. Peek will return at # least one byte of data, unless the buffer is empty or at EOF. If at EOF, # we should wait for the program to exit. This way we ensure the program # has at least decompressed some output, or stopped before we continue. # stdout is io.BufferedReader if set to PIPE while True: first_output = self.process.stdout.peek(1) if first_output or self.process.poll() is not None: break time.sleep(0.01) def _raise_if_error( self, check_allowed_code_and_message: bool = False, stderr_message: bytes = b"" ) -> None: """ Raise OSError if process is not running anymore and the exit code is nonzero. If check_allowed_code_and_message is set, OSError is not raised when (1) the exit value of the process is equal to the value of the allowed_exit_code attribute or (2) the allowed_exit_message attribute is set and it matches with stderr_message. """ retcode = self.process.poll() if sys.platform == "win32" and retcode == 1 and stderr_message == b"": # Special case for Windows. Winapi terminates processes with exit code 1 # and an empty error message. return if retcode is None: # process still running return if retcode == 0: # process terminated successfully return if check_allowed_code_and_message: if retcode == self._allowed_exit_code: # terminated with allowed exit code return if self._allowed_exit_message and stderr_message.startswith( self._allowed_exit_message ): # terminated with another exit code, but message is allowed return assert self.process.stderr is not None if not stderr_message: stderr_message = self.process.stderr.read() self._file.close() raise OSError("{!r} (exit code {})".format(stderr_message, retcode)) def read(self, *args) -> AnyStr: return self._file.read(*args) def readinto(self, *args): return self._file.readinto(*args) def readline(self, *args) -> AnyStr: return self._file.readline(*args) def seekable(self) -> bool: return self._file.seekable() def tell(self) -> int: return self._file.tell() def seek(self, offset, whence=0) -> int: return self._file.seek(offset, whence) def peek(self, n: int = None): if hasattr(self._file, "peek"): return self._file.peek(n) # type: ignore else: raise AttributeError("Peek is not available when 'b' not in mode") def readable(self) -> bool: return self._file.readable() def writable(self) -> bool: return self._file.writable() def flush(self) -> None: return None class PipedGzipReader(PipedCompressionReader): """ Open a pipe to gzip for reading a gzipped file. """ def __init__( self, path, mode: str = "r", *, encoding="utf-8", errors=None, newline=None ): super().__init__( path, ["gzip"], mode, encoding=encoding, errors=errors, newline=newline ) class PipedGzipWriter(PipedCompressionWriter): """ Write gzip-compressed files by running an external gzip process and piping into it. On Python 3, gzip.GzipFile is on par with gzip itself, but running an external gzip can still reduce wall-clock time because the compression happens in a separate process. """ def __init__( self, path, mode: str = "wt", compresslevel: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): """ mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' compresslevel -- compression level threads (int) -- number of pigz threads. If this is set to None, a reasonable default is used. At the moment, this means that the number of available CPU cores is used, capped at four to avoid creating too many threads. Use 0 to let pigz use all available cores. """ if compresslevel is not None and compresslevel not in range(1, 10): raise ValueError("compresslevel must be between 1 and 9") super().__init__( path, ["gzip", "--no-name"], mode, compresslevel, None, encoding=encoding, errors=errors, newline=newline, ) class PipedPigzReader(PipedCompressionReader): """ Open a pipe to pigz for reading a gzipped file. Even though pigz is mostly used to speed up writing by using many compression threads, it is also faster when reading, even when forced to use a single thread (ca. 2x speedup). """ def __init__( self, path, mode: str = "r", threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): super().__init__( path, ["pigz"], mode, "-p", threads, encoding=encoding, errors=errors, newline=newline, ) class PipedPigzWriter(PipedCompressionWriter): """ Write gzip-compressed files by running an external pigz process and piping into it. pigz can compress using multiple cores. It is also more efficient than gzip on only one core. (But then igzip is even faster and should be preferred if the compression level allows it.) """ _accepted_compression_levels: Set[int] = set(list(range(10)) + [11]) def __init__( self, path, mode: str = "wt", compresslevel: Optional[int] = None, threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): """ mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' compresslevel -- compression level threads (int) -- number of pigz threads. If this is set to None, a reasonable default is used. At the moment, this means that the number of available CPU cores is used, capped at four to avoid creating too many threads. Use 0 to let pigz use all available cores. """ if ( compresslevel is not None and compresslevel not in self._accepted_compression_levels ): raise ValueError("compresslevel must be between 0 and 9 or 11") super().__init__( path, ["pigz", "--no-name"], mode, compresslevel, "-p", threads, encoding=encoding, errors=errors, newline=newline, ) class PipedPBzip2Reader(PipedCompressionReader): """ Open a pipe to pbzip2 for reading a bzipped file. """ _allowed_exit_code = None _allowed_exit_message = b"\n *Control-C or similar caught [sig=15], quitting..." def __init__( self, path, mode: str = "r", threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): super().__init__( path, ["pbzip2"], mode, "-p", threads, encoding=encoding, errors=errors, newline=newline, ) class PipedPBzip2Writer(PipedCompressionWriter): """ Write bzip2-compressed files by running an external pbzip2 process and piping into it. pbzip2 can compress using multiple cores. """ def __init__( self, path, mode: str = "wt", threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): # Use default compression level for pbzip2: 9 super().__init__( path, ["pbzip2"], mode, 9, "-p", threads, encoding=encoding, errors=errors, newline=newline, ) class PipedXzReader(PipedCompressionReader): """ Open a pipe to xz for reading an xz-compressed file. A future version of xz will be able to decompress using multiple cores. (N.B. As of 21 March 2022, this feature is only implemented in xz's master branch.) """ def __init__( self, path, mode: str = "r", threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): super().__init__( path, ["xz"], mode, "-T", threads, encoding=encoding, errors=errors, newline=newline, ) class PipedXzWriter(PipedCompressionWriter): """ Write xz-compressed files by running an external xz process and piping into it. xz can compress using multiple cores. """ _accepted_compression_levels: Set[int] = set(range(10)) def __init__( self, path, mode: str = "wt", compresslevel: Optional[int] = None, threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): """ mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' compresslevel -- compression level threads (int) -- number of xz threads. If this is set to None, a reasonable default is used. At the moment, this means that the number of available CPU cores is used, capped at four to avoid creating too many threads. Use 0 to let xz use all available cores. """ if ( compresslevel is not None and compresslevel not in self._accepted_compression_levels ): raise ValueError("compresslevel must be between 0 and 9") super().__init__( path, ["xz"], mode, compresslevel, "-T", threads, encoding=encoding, errors=errors, newline=newline, ) class PipedIGzipReader(PipedCompressionReader): """ Uses igzip for reading of a gzipped file. This is much faster than either gzip or pigz which were written to run on a wide array of systems. igzip can only run on x86 and ARM architectures, but is able to use more architecture-specific optimizations as a result. """ def __init__( self, path, mode: str = "r", *, encoding="utf-8", errors=None, newline=None ): if not _can_read_concatenated_gz("igzip"): # Instead of elaborate version string checking once the problem is # fixed, it is much easier to use this, "proof in the pudding" type # of evaluation. raise ValueError( "This version of igzip does not support reading " "concatenated gzip files and is therefore not " "safe to use. See: https://github.com/intel/isa-l/issues/143" ) super().__init__( path, ["igzip"], mode, encoding=encoding, errors=errors, newline=newline ) class PipedZstdReader(PipedCompressionReader): """ Open a pipe to zstd for reading a zstandard-compressed file (.zst). """ def __init__( self, path, mode: str = "r", *, encoding="utf-8", errors=None, newline=None, ): super().__init__( path, ["zstd"], mode, encoding=encoding, errors=errors, newline=newline, ) class PipedZstdWriter(PipedCompressionWriter): """ Write Zstandard-compressed files by running an external xz process and piping into it. xz can compress using multiple cores. """ _accepted_compression_levels: Set[int] = set(range(1, 20)) def __init__( self, path, mode: str = "wt", compresslevel: Optional[int] = None, threads: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): """ mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab' compresslevel -- compression level threads (int) -- number of zstd threads. If this is set to None, a reasonable default is used. At the moment, this means that the number of available CPU cores is used, capped at four to avoid creating too many threads. Use 0 to let zstd use all available cores. """ if ( compresslevel is not None and compresslevel not in self._accepted_compression_levels ): raise ValueError("compresslevel must be between 1 and 19") super().__init__( path, ["zstd"], mode, compresslevel, "-T", threads, encoding=encoding, errors=errors, newline=newline, ) class PipedIGzipWriter(PipedCompressionWriter): """ Uses igzip for writing a gzipped file. This is much faster than either gzip or pigz which were written to run on a wide array of systems. igzip can only run on x86 and ARM architectures, but is able to use more architecture-specific optimizations as a result. Threads are supported by a flag, but do not add any speed. Also on some distro version (isal package in debian buster) the thread flag is not present. For these reason threads are omitted from the interface. Only compresslevel 0-3 are supported and these output slightly different filesizes from their pigz/gzip counterparts. See: https://gist.github.com/rhpvorderman/4f1201c3f39518ff28dde45409eb696b """ def __init__( self, path, mode: str = "wt", compresslevel: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): if compresslevel is not None and compresslevel not in range(0, 4): raise ValueError("compresslevel must be between 0 and 3") super().__init__( path, ["igzip", "--no-name"], mode, compresslevel, encoding=encoding, errors=errors, newline=newline, ) class PipedPythonIsalReader(PipedCompressionReader): def __init__( self, path, mode: str = "r", *, encoding="utf-8", errors=None, newline=None ): super().__init__( path, [sys.executable, "-m", "isal.igzip"], mode, encoding=encoding, errors=errors, newline=newline, ) class PipedPythonIsalWriter(PipedCompressionWriter): def __init__( self, path, mode: str = "wt", compresslevel: Optional[int] = None, *, encoding="utf-8", errors=None, newline=None, ): if compresslevel is not None and compresslevel not in range(0, 4): raise ValueError("compresslevel must be between 0 and 3") super().__init__( path, [sys.executable, "-m", "isal.igzip", "--no-name"], mode, compresslevel, encoding=encoding, errors=errors, newline=newline, ) def _open_stdin_or_out(mode: str, **text_mode_kwargs) -> IO: # Do not return sys.stdin or sys.stdout directly as we want the returned object # to be closable without closing sys.stdout. std = dict(r=sys.stdin, w=sys.stdout)[mode[0]] return open(std.fileno(), mode=mode, closefd=False, **text_mode_kwargs) def _open_bz2(filename, mode: str, threads: Optional[int], **text_mode_kwargs): if threads != 0: try: if "r" in mode: return PipedPBzip2Reader(filename, mode, threads, **text_mode_kwargs) else: return PipedPBzip2Writer(filename, mode, threads, **text_mode_kwargs) except OSError: pass # We try without threads. return bz2.open(filename, mode, **text_mode_kwargs) def _open_xz( filename, mode: str, compresslevel: Optional[int], threads: Optional[int], **text_mode_kwargs, ): if compresslevel is None: compresslevel = 6 if threads != 0: try: if "r" in mode: return PipedXzReader(filename, mode, threads, **text_mode_kwargs) else: return PipedXzWriter( filename, mode, compresslevel, threads, **text_mode_kwargs ) except OSError: pass # We try without threads. return lzma.open( filename, mode, preset=compresslevel if "w" in mode else None, **text_mode_kwargs, ) def _open_zst( # noqa: C901 filename, mode: str, compresslevel: Optional[int], threads: Optional[int], **text_mode_kwargs, ): assert compresslevel != 0 if compresslevel is None: compresslevel = 3 if threads != 0: try: if "r" in mode: return PipedZstdReader(filename, mode, **text_mode_kwargs) else: return PipedZstdWriter( filename, mode, compresslevel, threads, **text_mode_kwargs ) except OSError: if zstandard is None: # No fallback available raise if zstandard is None: raise ImportError("zstandard module (python-zstandard) not available") if compresslevel is not None and "w" in mode: cctx = zstandard.ZstdCompressor(level=compresslevel) else: cctx = None f = zstandard.open( filename, mode, cctx=cctx, **text_mode_kwargs, ) if mode == "rb": return io.BufferedReader(f) elif mode == "wb": return io.BufferedWriter(f) return f def _open_external_gzip_reader( filename, mode, compresslevel, threads, **text_mode_kwargs ): assert mode in ("rt", "rb") try: return PipedIGzipReader(filename, mode, **text_mode_kwargs) except (OSError, ValueError): # No igzip installed or version does not support reading # concatenated files. pass if igzip: return PipedPythonIsalReader(filename, mode, **text_mode_kwargs) try: return PipedPigzReader(filename, mode, threads=threads, **text_mode_kwargs) except OSError: return PipedGzipReader(filename, mode, **text_mode_kwargs) def _open_external_gzip_writer( filename, mode, compresslevel, threads, **text_mode_kwargs ): assert mode in ("wt", "wb", "at", "ab") try: return PipedIGzipWriter(filename, mode, compresslevel, **text_mode_kwargs) except (OSError, ValueError): # No igzip installed or compression level higher than 3 pass if igzip: # We can use the CLI from isal.igzip try: return PipedPythonIsalWriter( filename, mode, compresslevel, **text_mode_kwargs ) except ValueError: # Wrong compression level pass try: return PipedPigzWriter( filename, mode, compresslevel, threads=threads, **text_mode_kwargs ) except OSError: return PipedGzipWriter(filename, mode, compresslevel, **text_mode_kwargs) def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): assert mode in ("rt", "rb", "wt", "wb", "at", "ab") if threads != 0: try: if "r" in mode: return _open_external_gzip_reader( filename, mode, compresslevel, threads, **text_mode_kwargs ) else: return _open_external_gzip_writer( filename, mode, compresslevel, threads, **text_mode_kwargs ) except OSError: pass # We try without threads. if "r" in mode: if igzip is not None: return igzip.open(filename, mode, **text_mode_kwargs) return gzip.open(filename, mode, **text_mode_kwargs) g = _open_reproducible_gzip( filename, mode=mode[0] + "b", compresslevel=compresslevel, ) if "t" in mode: return io.TextIOWrapper(g, **text_mode_kwargs) return g def _open_reproducible_gzip(filename, mode, compresslevel): """ Open a gzip file for writing (without external processes) that has neither mtime nor the file name in the header (equivalent to gzip --no-name) """ assert mode in ("rb", "wb", "ab") # Neither gzip.open nor igzip.open have an mtime option, and they will # always write the file name, so we need to open the file separately # and pass it to gzip.GzipFile/igzip.IGzipFile. binary_file = open(filename, mode=mode) kwargs = dict( fileobj=binary_file, filename="", mode=mode, mtime=0, ) gzip_file = None if igzip is not None: try: gzip_file = igzip.IGzipFile( **kwargs, compresslevel=isal_zlib.ISAL_DEFAULT_COMPRESSION if compresslevel is None else compresslevel, ) except ValueError: # Compression level not supported, move to built-in gzip. pass if gzip_file is None: gzip_file = gzip.GzipFile( **kwargs, # Override gzip.open's default of 9 for consistency # with command-line gzip. compresslevel=6 if compresslevel is None else compresslevel, ) # When (I)GzipFile is created with a fileobj instead of a filename, # the passed file object is not closed when (I)GzipFile.close() # is called. This forces it to be closed. gzip_file.myfileobj = binary_file return gzip_file def _detect_format_from_content(filename: FilePath) -> Optional[str]: """ Attempts to detect file format from the content by reading the first 6 bytes. Returns None if no format could be detected. """ try: if stat.S_ISREG(os.stat(filename).st_mode): with open(filename, "rb") as fh: bs = fh.read(6) if bs[:2] == b"\x1f\x8b": # https://tools.ietf.org/html/rfc1952#page-6 return "gz" elif bs[:3] == b"\x42\x5a\x68": # https://en.wikipedia.org/wiki/List_of_file_signatures return "bz2" elif bs[:6] == b"\xfd\x37\x7a\x58\x5a\x00": # https://tukaani.org/xz/xz-file-format.txt return "xz" elif bs[:4] == b"\x28\xb5\x2f\xfd": # https://datatracker.ietf.org/doc/html/rfc8478#section-3.1.1 return "zst" except OSError: pass return None def _detect_format_from_extension(filename: Union[str, bytes]) -> Optional[str]: """ Attempt to detect file format from the filename extension. Return None if no format could be detected. """ for ext in ("bz2", "xz", "gz", "zst"): if isinstance(filename, bytes): if filename.endswith(b"." + ext.encode()): return ext else: if filename.endswith("." + ext): return ext return None @overload def xopen( filename: FilePath, mode: Literal["r", "w", "a", "rt", "wt", "at"] = ..., compresslevel: Optional[int] = ..., threads: Optional[int] = ..., *, encoding: str = ..., errors: Optional[str] = ..., newline: Optional[str] = ..., format: Optional[str] = ..., ) -> TextIO: ... @overload def xopen( filename: FilePath, mode: Literal["rb", "wb", "ab"], compresslevel: Optional[int] = ..., threads: Optional[int] = ..., *, encoding: str = ..., errors: None = ..., newline: None = ..., format: Optional[str] = ..., ) -> BinaryIO: ... def xopen( # noqa: C901 # The function is complex, but readable. filename: FilePath, mode: Literal["r", "w", "a", "rt", "rb", "wt", "wb", "at", "ab"] = "r", compresslevel: Optional[int] = None, threads: Optional[int] = None, *, encoding: str = "utf-8", errors: Optional[str] = None, newline: Optional[str] = None, format: Optional[str] = None, ) -> IO: """ A replacement for the "open" function that can also read and write compressed files transparently. The supported compression formats are gzip, bzip2, xz and zstandard. If the filename is '-', standard output (mode 'w') or standard input (mode 'r') is returned. When writing, the file format is chosen based on the file name extension: - .gz uses gzip compression - .bz2 uses bzip2 compression - .xz uses xz/lzma compression - .zst uses zstandard compression - otherwise, no compression is used When reading, if a file name extension is available, the format is detected using it, but if not, the format is detected from the contents. mode can be: 'rt', 'rb', 'at', 'ab', 'wt', or 'wb'. Also, the 't' can be omitted, so instead of 'rt', 'wt' and 'at', the abbreviations 'r', 'w' and 'a' can be used. compresslevel is the compression level for writing to gzip, xz and zst files. This parameter is ignored for the other compression formats. If set to None, a default depending on the format is used: gzip: 6, xz: 6, zstd: 3. When threads is None (the default), compressed file formats are read or written using a pipe to a subprocess running an external tool such as ``igzip``, ``pbzip2``, ``pigz`` etc., see PipedIGzipWriter, PipedIGzipReader etc. If the external tool supports multiple threads, *threads* can be set to an int specifying the number of threads to use. If no external tool supporting the compression format is available, the file is opened calling the appropriate Python function (that is, no subprocess is spawned). Set threads to 0 to force opening the file without using a subprocess. encoding, errors and newline are used when opening a file in text mode. The parameters have the same meaning as in the built-in open function, except that the default encoding is always UTF-8 instead of the preferred locale encoding. format overrides the autodetection of input and output formats. This can be useful when compressed output needs to be written to a file without an extension. Possible values are "gz", "xz", "bz2", "zst". """ if mode in ("r", "w", "a"): mode += "t" # type: ignore if mode not in ("rt", "rb", "wt", "wb", "at", "ab"): raise ValueError("Mode '{}' not supported".format(mode)) filename = os.fspath(filename) if "b" in mode: # Do not pass encoding etc. in binary mode as this raises errors. text_mode_kwargs = dict() else: text_mode_kwargs = dict(encoding=encoding, errors=errors, newline=newline) if filename == "-": return _open_stdin_or_out(mode, **text_mode_kwargs) if format not in (None, "gz", "xz", "bz2", "zst"): raise ValueError( f"Format not supported: {format}. " f"Choose one of: 'gz', 'xz', 'bz2', 'zst'" ) detected_format = format or _detect_format_from_extension(filename) if detected_format is None and "w" not in mode: detected_format = _detect_format_from_content(filename) if detected_format == "gz": opened_file = _open_gz( filename, mode, compresslevel, threads, **text_mode_kwargs ) elif detected_format == "xz": opened_file = _open_xz( filename, mode, compresslevel, threads, **text_mode_kwargs ) elif detected_format == "bz2": opened_file = _open_bz2(filename, mode, threads, **text_mode_kwargs) elif detected_format == "zst": opened_file = _open_zst( filename, mode, compresslevel, threads, **text_mode_kwargs ) else: opened_file = open(filename, mode, **text_mode_kwargs) # type: ignore # The "write" method for GzipFile is very costly. Lots of python calls are # made. To a lesser extent this is true for LzmaFile and BZ2File. By # putting a buffer in between, the expensive write method is called much # less. The effect is very noticeable when writing small units such as # lines or FASTQ records. if ( isinstance(opened_file, (gzip.GzipFile, bz2.BZ2File, lzma.LZMAFile)) # FIXME and "w" in mode ): opened_file = io.BufferedWriter( opened_file, buffer_size=BUFFER_SIZE # type: ignore ) return opened_file