Python os 模块,replace() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.replace()。
def __save(self):
if self.__asynchronous == 0:
state = {
"version" : _BobState.CUR_VERSION,
"byNameDirs" : self.__byNameDirs,
"results" : self.__results,
"inputs" : self.__inputs,
"jenkins" : self.__jenkins,
"dirStates" : self.__dirStates,
"buildState" : self.__buildState,
}
tmpFile = self.__path+".new"
try:
with open(tmpFile, "wb") as f:
pickle.dump(state, f)
f.flush()
os.fsync(f.fileno())
os.replace(tmpFile, self.__path)
except OSError as e:
raise ParseError("Error saving workspace state: " + str(e))
self.__dirty = False
else:
self.__dirty = True
def save_json(self, filename, data):
"""Atomically saves json file"""
rnd = randint(1000, 9999)
path, ext = os.path.splitext(filename)
tmp_file = "{}-{}.tmp".format(path, rnd)
self._save_json(tmp_file, data)
try:
self._read_json(tmp_file)
except json.decoder.JSONDecodeError:
self.logger.exception("Attempted to write file {} but JSON "
"integrity check on tmp file has failed. "
"The original file is unaltered."
"".format(filename))
return False
os.replace(tmp_file, filename)
return True
def zip_file(self):
amount = functions.folder_files(self.parent.directory)
progress = dialog.ProgressWindow(self.parent, title="Zipping Pack", maximum=amount)
count = 0
with zipfile.ZipFile(self.parent.directory + ".zip", "w") as z:
for root, dirs, files in os.walk(self.parent.directory.replace("\\", "/"), topdown=True):
new_root = root.replace("\\", "/").split("/")
# print(root, dirs, files)
for name in files:
z.write(os.path.join(root, name),
"/".join(new_root[new_root.index(self.parent.directory.split("/")[-1]) + 1:]) + "/" + name)
count += 1
progress.variable_name.set("Current File: " + name)
progress.variable_percent.set("{}% Complete".format(round(100 * float(count) / float(amount))))
progress.variable_progress.set(progress.variable_progress.get() + 1)
z.close()
progress.destroy()
messagebox.showinfo(title="Information", message="Zipping complete.")
def __exit__(self, exc_type, exc_value, tback):
# Pass to tempfile, which also closes().
temp_path = self.temp.name
self.temp.__exit__(exc_type, exc_value, tback)
self.temp = None
if exc_type is not None:
# An exception occurred, clean up.
try:
_os.remove(temp_path)
except FileNotFoundError:
pass
else:
# No exception, commit changes
_os.replace(temp_path, self.filename)
return False # Don't cancel the exception.
# Import these, so people can reference 'srctools.Vec' instead of
# 'srctools.vec.Vec'.
# Should be done after other code, so everything's initialised.
# Not all classes are imported, just most-used ones.
def save_json(self, filename, data):
"""Atomically saves json file"""
rnd = randint(1000, 9999)
path, ext = os.path.splitext(filename)
tmp_file = "{}-{}.tmp".format(path, rnd)
self._save_json(tmp_file, data)
try:
self._read_json(tmp_file)
except json.decoder.JSONDecodeError:
self.logger.exception("Attempted to write file {} but JSON "
"integrity check on tmp file has failed. "
"The original file is unaltered."
"".format(filename))
return False
os.replace(tmp_file, filename)
return True
def temp_move_path(path, d):
if os.path.exists(path):
dst = shutil.move(path, d)
try:
yield dst
finally:
try:
os.replace(dst, path)
except OSError: # no cov
shutil.move(dst, path)
else:
try:
yield
finally:
remove_path(path)
def _run_fileset(self, env, file_mapper):
stash_dir = env.format('{root_dir}/.nimp/stash')
nimp.system.safe_makedirs(stash_dir)
stash_file = os.path.join(stash_dir, env.fileset)
nimp.system.safe_delete(stash_file)
with open(stash_file, 'w') as stash:
for src, _ in file_mapper():
src = nimp.system.sanitize_path(src)
if not os.path.isfile(src):
continue
if src.endswith('.stash'):
continue
md5 = hashlib.md5(src.encode('utf8')).hexdigest()
os.replace(src, os.path.join(stash_dir, md5))
logging.info('Stashing %s as %s', src, md5)
stash.write('%s %s\n' % (md5, src))
return True
def _run_fileset(self, env, file_mapper):
stash_dir = env.format('{root_dir}/.nimp/stash')
stash_file = os.path.join(stash_dir, env.fileset)
success = True
with open(stash_file, 'r') as stash:
for dst in stash.readlines():
try:
md5, dst = dst.strip().split()
src = os.path.join(stash_dir, md5)
logging.info('Unstashing %s as %s', md5, dst)
nimp.system.safe_delete(dst)
os.replace(src, dst)
except Exception as ex: #pylint: disable=broad-except
logging.error(ex)
success = False
nimp.system.safe_delete(stash_file)
return success
def atomic_write(filepath, binary=False, fsync=False):
""" Writeable file object that atomically updates a file (using a temporary file). In some cases (namely Python < 3.3 on Windows), this could result in an existing file being temporarily unlinked.
:param filepath: the file path to be opened
:param binary: whether to open the file in a binary mode instead of textual
:param fsync: whether to force write the file to disk
"""
tmppath = filepath + '~'
while os.path.isfile(tmppath):
tmppath += '~'
try:
with open(tmppath, 'wb' if binary else 'w') as file:
yield file
if fsync:
file.flush()
os.fsync(file.fileno())
replace(tmppath, filepath)
finally:
try:
os.remove(tmppath)
except (IOError, OSError):
pass
def save_json(self, filename, data):
"""Atomically saves json file"""
rnd = randint(1000, 9999)
path, ext = os.path.splitext(filename)
tmp_file = "{}-{}.tmp".format(path, rnd)
self._save_json(tmp_file, data)
try:
self._read_json(tmp_file)
except json.decoder.JSONDecodeError:
self.logger.exception("Attempted to write file {} but JSON "
"integrity check on tmp file has failed. "
"The original file is unaltered."
"".format(filename))
return False
os.replace(tmp_file, filename)
return True
def save_json(self, filename, data):
"""Atomically saves json file"""
rnd = randint(1000, 9999)
path, ext = os.path.splitext(filename)
tmp_file = "{}-{}.tmp".format(path, rnd)
self._save_json(tmp_file, data)
try:
self._read_json(tmp_file)
except json.decoder.JSONDecodeError:
self.logger.exception("Attempted to write file {} but JSON "
"integrity check on tmp file has failed. "
"The original file is unaltered."
"".format(filename))
return False
os.replace(tmp_file, filename)
return True
def rename(srcpath, destpath, data):
import glob
import os
episodes = sorted(data.keys())
name_pattern = '{date} Showroom - AKB48 no Myonichi Yoroshiku! #{ep} ({name}).mp4'
long_date_pattern = '20{}-{}-{}'
for file in glob.glob('{}/*.mp4'.format(srcpath)):
match = file_re.match(os.path.basename(file))
date = match.groupdict()['date']
long_date = long_date_pattern.format(*[date[i:i+2] for i in range(0, 6, 2)])
new_file = name_pattern.format(
date=date,
ep=episodes.index(long_date)+1,
name=data[long_date]['engName'],
)
os.replace(
file,
'{}/{}'.format(destpath, new_file)
)
def prune_folder(folder, needed_list):
oldcwd = os.getcwd()
os.chdir(folder)
os.makedirs('unneeded'.format(folder), exist_ok=True)
files = glob.glob('*.mp4'.format(folder))
for file in files:
# print(repr(file))
if file not in needed_list:
# print('{} -> {}'.format(file, 'unneeded/{}'.format(file)))
os.replace(file, 'unneeded/{}'.format(file))
else:
# print('Needed:', repr(file))
pass
os.chdir(oldcwd)
def update_streaming_url_web(self):
"""Updates streaming urls from the showroom website.
Fallback if api changes again"""
r = self._session.get(self._room.long_url)
if r.ok:
match = hls_url_re1.search(r.text)
# TODO: check if there was a match
if not match:
# no url found in the page
# probably the stream has ended but is_live returned true
# just don't update the urls
# except what happens if they are still "" ?
return
hls_url = match.group(0)
rtmps_url = match.group(1).replace('https', 'rtmps')
rtmp_url = "rtmp://{}.{}.{}.{}:1935/liveedge/{}".format(*match.groups()[1:])
with self._lock:
self._rtmp_url = rtmp_url
self._hls_url = hls_url
self._rtmps_url = rtmps_url
def concurrency_safe_rename(src, dst):
"""Renames ``src`` into ``dst`` overwriting ``dst`` if it exists.
On Windows os.replace (or for Python 2.7 its implementation
through MoveFileExW) can yield permission errors if executed by
two different processes.
"""
max_sleep_time = 1
total_sleep_time = 0
sleep_time = 0.001
while total_sleep_time < max_sleep_time:
try:
replace(src, dst)
break
except Exception as exc:
if getattr(exc, 'winerror', None) == error_access_denied:
time.sleep(sleep_time)
total_sleep_time += sleep_time
sleep_time *= 2
else:
raise
else:
raise
def replace(src, dst):
# argument names match stdlib docs, docstring below
try:
# ReplaceFile fails if the dest file does not exist, so
# first try to rename it into position
os.rename(src, dst)
return
except WindowsError as we:
if we.errno == errno.EEXIST:
pass # continue with the ReplaceFile logic below
else:
raise
src = path_to_unicode(src)
dst = path_to_unicode(dst)
res = _ReplaceFile(c_wchar_p(dst), c_wchar_p(src),
None, 0, None, None)
if not res:
raise OSError('failed to replace %r with %r' % (dst, src))
return
def atomic_write(filepath, binary=False, fsync=False):
""" Writeable file object that atomically updates a file (using a temporary file). In some cases (namely Python < 3.3 on Windows), this could result in an existing file being temporarily unlinked.
:param filepath: the file path to be opened
:param binary: whether to open the file in a binary mode instead of textual
:param fsync: whether to force write the file to disk
"""
tmppath = filepath + '~'
while os.path.isfile(tmppath):
tmppath += '~'
try:
with open(tmppath, 'wb' if binary else 'w') as file:
yield file
if fsync:
file.flush()
os.fsync(file.fileno())
replace(tmppath, filepath)
finally:
try:
os.remove(tmppath)
except (IOError, OSError):
pass
def close(self):
try:
if self.__inFile:
self.__inFile.close()
if self.__outFile:
self.__outFile.close()
os.replace(self.__outFile.name, self.__cachePath)
except OSError as e:
raise BuildError("Error closing hash cache: " + str(e))
def __generatePackages(self, nameFormatter, env, cacheKey, sandboxEnabled):
# use separate caches with and without sandbox
if sandboxEnabled:
cacheName = ".bob-packages-sb.pickle"
else:
cacheName = ".bob-packages.pickle"
# try to load the persisted packages
states = { n:s() for (n,s) in self.__states.items() }
rootPkg = Package()
rootPkg.construct("<root>", [], nameFormatter, None, [], [], states,
{}, {}, None, None, [], {}, -1)
try:
with open(cacheName, "rb") as f:
persistedCacheKey = f.read(len(cacheKey))
if cacheKey == persistedCacheKey:
tmp = PackageUnpickler(f, self.getRecipe, self.__plugins,
nameFormatter).load()
return tmp.toStep(nameFormatter, rootPkg).getPackage()
except (EOFError, OSError, pickle.UnpicklingError):
pass
# not cached -> calculate packages
result = self.__rootRecipe.prepare(nameFormatter, env, sandboxEnabled,
states)[0]
# save package tree for next invocation
tmp = CoreStepRef(rootPkg, result.getPackageStep())
try:
newCacheName = cacheName + ".new"
with open(newCacheName, "wb") as f:
f.write(cacheKey)
PackagePickler(f, nameFormatter).dump(tmp)
os.replace(newCacheName, cacheName)
except OSError as e:
print("Error saving internal state:", str(e), file=sys.stderr)
return result
def _make_text_stream(stream, encoding, errors):
if encoding is None:
encoding = get_best_encoding(stream)
if errors is None:
errors = 'replace'
return _NonClosingTextIOWrapper(stream, encoding, errors,
line_buffering=True)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
return value
def _force_correct_text_reader(text_reader, encoding, errors):
if _is_binary_reader(text_reader, False):
binary_reader = text_reader
else:
# If there is no target encoding set, we need to verify that the
# reader is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_reader):
return text_reader
if _is_compatible_text_stream(text_reader, encoding, errors):
return text_reader
# If the reader has no encoding, we try to find the underlying
# binary reader for it. If that fails because the environment is
# misconfigured, we silently go with the same reader because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_reader = _find_binary_reader(text_reader)
if binary_reader is None:
return text_reader
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_reader, encoding, errors)
def _force_correct_text_writer(text_writer, encoding, errors):
if _is_binary_writer(text_writer, False):
binary_writer = text_writer
else:
# If there is no target encoding set, we need to verify that the
# writer is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_writer):
return text_writer
if _is_compatible_text_stream(text_writer, encoding, errors):
return text_writer
# If the writer has no encoding, we try to find the underlying
# binary writer for it. If that fails because the environment is
# misconfigured, we silently go with the same writer because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_writer = _find_binary_writer(text_writer)
if binary_writer is None:
return text_writer
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_writer, encoding, errors)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
else:
value = value.encode('utf-8', 'surrogateescape') \
.decode('utf-8', 'replace')
return value
def _make_text_stream(stream, encoding, errors):
if encoding is None:
encoding = get_best_encoding(stream)
if errors is None:
errors = 'replace'
return _NonClosingTextIOWrapper(stream, encoding, errors,
line_buffering=True)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
return value
def _force_correct_text_reader(text_reader, encoding, errors):
if _is_binary_reader(text_reader, False):
binary_reader = text_reader
else:
# If there is no target encoding set, we need to verify that the
# reader is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_reader):
return text_reader
if _is_compatible_text_stream(text_reader, encoding, errors):
return text_reader
# If the reader has no encoding, we try to find the underlying
# binary reader for it. If that fails because the environment is
# misconfigured, we silently go with the same reader because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_reader = _find_binary_reader(text_reader)
if binary_reader is None:
return text_reader
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_reader, encoding, errors)
def _force_correct_text_writer(text_writer, encoding, errors):
if _is_binary_writer(text_writer, False):
binary_writer = text_writer
else:
# If there is no target encoding set, we need to verify that the
# writer is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_writer):
return text_writer
if _is_compatible_text_stream(text_writer, encoding, errors):
return text_writer
# If the writer has no encoding, we try to find the underlying
# binary writer for it. If that fails because the environment is
# misconfigured, we silently go with the same writer because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_writer = _find_binary_writer(text_writer)
if binary_writer is None:
return text_writer
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_writer, encoding, errors)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
else:
value = value.encode('utf-8', 'surrogateescape') \
.decode('utf-8', 'replace')
return value
def _make_text_stream(stream, encoding, errors):
if encoding is None:
encoding = get_best_encoding(stream)
if errors is None:
errors = 'replace'
return _NonClosingTextIOWrapper(stream, encoding, errors,
line_buffering=True)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
return value
def _force_correct_text_reader(text_reader, encoding, errors):
if _is_binary_reader(text_reader, False):
binary_reader = text_reader
else:
# If there is no target encoding set, we need to verify that the
# reader is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_reader):
return text_reader
if _is_compatible_text_stream(text_reader, encoding, errors):
return text_reader
# If the reader has no encoding, we try to find the underlying
# binary reader for it. If that fails because the environment is
# misconfigured, we silently go with the same reader because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_reader = _find_binary_reader(text_reader)
if binary_reader is None:
return text_reader
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_reader, encoding, errors)
def _force_correct_text_writer(text_writer, encoding, errors):
if _is_binary_writer(text_writer, False):
binary_writer = text_writer
else:
# If there is no target encoding set, we need to verify that the
# writer is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_writer):
return text_writer
if _is_compatible_text_stream(text_writer, encoding, errors):
return text_writer
# If the writer has no encoding, we try to find the underlying
# binary writer for it. If that fails because the environment is
# misconfigured, we silently go with the same writer because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_writer = _find_binary_writer(text_writer)
if binary_writer is None:
return text_writer
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_writer, encoding, errors)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
else:
value = value.encode('utf-8', 'surrogateescape') \
.decode('utf-8', 'replace')
return value
def close(self):
if self.directory_real:
# print(self.directory, self.directory_real)
with zipfile.ZipFile(self.directory_real, "w") as z:
for root, dirs, files in os.walk(self.directory.replace("\\", "/"), topdown=True):
new_root = root.replace("\\", "/").split("/")
# print(root, dirs, files)
for name in files:
z.write(os.path.join(root, name), "/".join(new_root[new_root.index(
self.directory.replace("\\", "/").split("/")[-1]) + 1:]) + "/" + name)
self.d.cleanup()
self.destroy()
def replace_file(self):
old_file = self.widget_tree.item(self.widget_tree.focus())["tags"][0]
file = filedialog.askopenfile()
if file:
new_file = file.name
# os.replace(new_file, old_file)
shutil.copy2(new_file, old_file)
self.cmd.tree_refresh()
def force_rename(oldpath, newpath):
"""
Move the file at ``oldpath`` to ``newpath``, deleting ``newpath``
beforehand if necessary
"""
if hasattr(os, 'replace'): # Python 3.3+
os.replace(oldpath, newpath)
else:
if sys.platform.startswith('win'):
try_unlink(newpath)
os.rename(oldpath, newpath)
def parse_parts(self, parts):
if six.PY2:
parts = _py2_fsencode(parts)
parsed = []
sep = self.sep
altsep = self.altsep
drv = root = ''
it = reversed(parts)
for part in it:
if not part:
continue
if altsep:
part = part.replace(altsep, sep)
drv, root, rel = self.splitroot(part)
if sep in rel:
for x in reversed(rel.split(sep)):
if x and x != '.':
parsed.append(intern(x))
else:
if rel and rel != '.':
parsed.append(intern(rel))
if drv or root:
if not drv:
# If no drive is present, try to find one in the previous
# parts. This makes the result of parsing e.g.
# ("C:", "/", "a") reasonably intuitive.
for part in it:
if not part:
continue
if altsep:
part = part.replace(altsep, sep)
drv = self.splitroot(part)[0]
if drv:
break
break
if drv or root:
parsed.append(drv + root)
parsed.reverse()
return drv, root, parsed
def as_posix(self):
"""Return the string representation of the path with forward (/)
slashes."""
f = self._flavour
return str(self).replace(f.sep, '/')
def replace(self, target):
"""
Rename this path to the given path, clobbering the existing
destination if it exists.
"""
if sys.version_info < (3, 3):
raise NotImplementedError("replace() is only available "
"with Python 3.3 and later")
if self._closed:
self._raise_closed()
self._accessor.replace(self, target)
def _dump(self):
temp = '%s-%s.tmp' % (uuid.uuid4(), self.name)
with open(temp, 'w', encoding='utf-8') as tmp:
json.dump(self._db.copy(), tmp, ensure_ascii=True, cls=self.encoder, separators=(',', ':'))
# atomically move the file
os.replace(temp, self.name)
def _make_text_stream(stream, encoding, errors):
if encoding is None:
encoding = get_best_encoding(stream)
if errors is None:
errors = 'replace'
return _NonClosingTextIOWrapper(stream, encoding, errors,
line_buffering=True)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
return value
def _force_correct_text_reader(text_reader, encoding, errors):
if _is_binary_reader(text_reader, False):
binary_reader = text_reader
else:
# If there is no target encoding set, we need to verify that the
# reader is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_reader):
return text_reader
if _is_compatible_text_stream(text_reader, encoding, errors):
return text_reader
# If the reader has no encoding, we try to find the underlying
# binary reader for it. If that fails because the environment is
# misconfigured, we silently go with the same reader because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_reader = _find_binary_reader(text_reader)
if binary_reader is None:
return text_reader
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_reader, encoding, errors)
def _force_correct_text_writer(text_writer, encoding, errors):
if _is_binary_writer(text_writer, False):
binary_writer = text_writer
else:
# If there is no target encoding set, we need to verify that the
# writer is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_writer):
return text_writer
if _is_compatible_text_stream(text_writer, encoding, errors):
return text_writer
# If the writer has no encoding, we try to find the underlying
# binary writer for it. If that fails because the environment is
# misconfigured, we silently go with the same writer because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_writer = _find_binary_writer(text_writer)
if binary_writer is None:
return text_writer
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_writer, encoding, errors)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
else:
value = value.encode('utf-8', 'surrogateescape') \
.decode('utf-8', 'replace')
return value
def create_final_from_temp(self, temp_parfile_name):
"""Move newly created parfile to its final filename."""
# Python 2 doesn't have os.replace, so use os.rename which is
# not atomic in all cases.
os.chmod(temp_parfile_name, 0o0755)
os.rename(temp_parfile_name, self.output_filename)
def _make_text_stream(stream, encoding, errors):
if encoding is None:
encoding = get_best_encoding(stream)
if errors is None:
errors = 'replace'
return _NonClosingTextIOWrapper(stream, encoding, errors,
line_buffering=True)
def filename_to_ui(value):
if isinstance(value, bytes):
value = value.decode(get_filesystem_encoding(), 'replace')
return value
def _force_correct_text_reader(text_reader, encoding, errors):
if _is_binary_reader(text_reader, False):
binary_reader = text_reader
else:
# If there is no target encoding set, we need to verify that the
# reader is not actually misconfigured.
if encoding is None and not _stream_is_misconfigured(text_reader):
return text_reader
if _is_compatible_text_stream(text_reader, encoding, errors):
return text_reader
# If the reader has no encoding, we try to find the underlying
# binary reader for it. If that fails because the environment is
# misconfigured, we silently go with the same reader because this
# is too common to happen. In that case, mojibake is better than
# exceptions.
binary_reader = _find_binary_reader(text_reader)
if binary_reader is None:
return text_reader
# At this point, we default the errors to replace instead of strict
# because nobody handles those errors anyways and at this point
# we're so fundamentally fucked that nothing can repair it.
if errors is None:
errors = 'replace'
return _make_text_stream(binary_reader, encoding, errors)