Implementing compat function codecs_open

This commit is contained in:
Miroslav Stampar
2025-12-26 00:04:31 +01:00
parent 137687e120
commit 2b44aa1fbb
4 changed files with 119 additions and 3 deletions

View File

@@ -7,8 +7,10 @@ See the file 'LICENSE' for copying permission
from __future__ import division
import codecs
import binascii
import functools
import io
import math
import os
import random
@@ -312,3 +314,116 @@ def LooseVersion(version):
result = float("NaN")
return result
# NOTE: codecs.open re-implementation (deprecated in Python 3.14)
try:
# Py2
_text_type = unicode
_bytes_types = (str, bytearray)
except NameError:
# Py3
_text_type = str
_bytes_types = (bytes, bytearray, memoryview)
_WRITE_CHARS = ("w", "a", "x", "+")
def _is_write_mode(mode):
return any(ch in mode for ch in _WRITE_CHARS)
class MixedWriteTextIO(object):
"""
Text-ish stream wrapper that accepts both text and bytes in write().
Bytes are decoded using the file's (encoding, errors) before writing.
Optionally approximates line-buffering by flushing when a newline is written.
"""
def __init__(self, fh, encoding, errors, line_buffered=False):
self._fh = fh
self._encoding = encoding
self._errors = errors
self._line_buffered = line_buffered
def write(self, data):
# bytes-like but not text -> decode
if isinstance(data, _bytes_types) and not isinstance(data, _text_type):
data = bytes(data).decode(self._encoding, self._errors)
elif not isinstance(data, _text_type):
data = _text_type(data)
n = self._fh.write(data)
# Approximate "line buffering" behavior if requested
if self._line_buffered and u"\n" in data:
try:
self._fh.flush()
except Exception:
pass
return n
def writelines(self, lines):
for x in lines:
self.write(x)
def __iter__(self):
return iter(self._fh)
def __next__(self):
return next(self._fh)
def next(self): # Py2
return self.__next__()
def __getattr__(self, name):
return getattr(self._fh, name)
def __enter__(self):
self._fh.__enter__()
return self
def __exit__(self, exc_type, exc, tb):
return self._fh.__exit__(exc_type, exc, tb)
def _codecs_open(filename, mode="r", encoding=None, errors="strict", buffering=-1):
"""
Replacement for deprecated codecs.open() entry point with sqlmap-friendly behavior.
- If encoding is None: return io.open(...) as-is.
- If encoding is set: force underlying binary mode and wrap via StreamReaderWriter
(like codecs.open()).
- For write-ish modes: return a wrapper that also accepts bytes on .write().
- Handles buffering=1 in binary mode by downgrading underlying buffering to -1,
while optionally preserving "flush on newline" behavior in the wrapper.
"""
if encoding is None:
return io.open(filename, mode, buffering=buffering)
bmode = mode
if "b" not in bmode:
bmode += "b"
# Avoid line-buffering warnings/errors on binary streams
line_buffered = (buffering == 1)
if line_buffered:
buffering = -1
f = io.open(filename, bmode, buffering=buffering)
try:
info = codecs.lookup(encoding)
srw = codecs.StreamReaderWriter(f, info.streamreader, info.streamwriter, errors)
srw.encoding = encoding
if _is_write_mode(mode):
return MixedWriteTextIO(srw, encoding, errors, line_buffered=line_buffered)
return srw
except Exception:
try:
f.close()
finally:
raise
codecs_open = _codecs_open if sys.version_info >= (3, 14) else codecs.open