1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00

temp file wrapper

This commit is contained in:
c-basalt 2024-08-11 16:23:58 -04:00
parent 8e39820adc
commit 567fb56a1c

View file

@ -47,39 +47,45 @@ def cookie_jar_to_list(cookie_jar):
return [cookie_to_dict(cookie) for cookie in cookie_jar]
@contextlib.contextmanager
def _temp_file(content, *, mode='wt', encoding='utf-8', suffix=None, close=True):
if 'b' in mode:
encoding = None
temp_file_handle = tempfile.NamedTemporaryFile(mode, encoding=encoding, suffix=suffix, delete=False)
try:
temp_file_handle.write(content)
if close:
temp_file_handle.close()
yield temp_file_handle
finally:
class TempFileWrapper:
def __init__(self, content=None, text=True, encoding='utf-8', suffix=None):
self.encoding = None if not text else encoding
self.text = text
self._file = tempfile.NamedTemporaryFile('wb', suffix=suffix, delete=False)
self._file.close()
if content:
self.write(content)
@property
def name(self):
return self._file.name
@contextlib.contextmanager
def opened_file(self, mode, *, seek=None, seek_whence=0):
mode = mode if (self.text or 'b' in mode) else mode + 'b'
with open(self._file.name, mode, encoding=self.encoding) as f:
if seek is not None:
self._file.seek(seek, seek_whence)
yield f
def write(self, s, seek=None, seek_whence=0):
with self.opened_file('w', seek=seek, seek_whence=seek_whence) as f:
f.write(s)
def append_write(self, s, seek=None, seek_whence=0):
with self.opened_file('a', seek=seek, seek_whence=seek_whence) as f:
f.write(s)
def read(self, n=-1, seek=None, seek_whence=0):
with self.opened_file('r', seek=seek, seek_whence=seek_whence) as f:
return f.read(n)
def cleanup(self):
with contextlib.suppress(OSError):
os.remove(temp_file_handle.name)
os.remove(self._file.name)
@contextlib.contextmanager
def _tempfile_context():
handles = []
def _creater(content, *, mode='wt', encoding='utf-8', suffix=None, close=True):
encoding = None if 'b' in mode else encoding
handle = tempfile.NamedTemporaryFile(mode, encoding=encoding, suffix=suffix, delete=False)
handles.append(handle)
handle.write(content)
if close:
handle.close()
return handle
try:
yield _creater
finally:
for handle in handles:
with contextlib.suppress(OSError):
os.remove(handle.name)
def __del__(self):
self.cleanup()
class ExternalJSI:
@ -117,7 +123,7 @@ def __init__(self, extractor: InfoExtractor, required_version=None, timeout=1000
@classmethod
def _execute(cls, jscode, extractor=None, video_id=None, note='', flags=[], timeout=10000):
with _temp_file(jscode, suffix='.js') as js_file:
js_file = TempFileWrapper(jscode)
if note and extractor:
extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}')
cmd = [cls.exe, 'run', *flags, js_file.name]