From 567fb56a1ce7180c45af1912f7448422fe5835e8 Mon Sep 17 00:00:00 2001 From: c-basalt <117849907+c-basalt@users.noreply.github.com> Date: Sun, 11 Aug 2024 16:23:58 -0400 Subject: [PATCH] temp file wrapper --- yt_dlp/jsinterp/external.py | 96 ++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/yt_dlp/jsinterp/external.py b/yt_dlp/jsinterp/external.py index ea67b6309..3751941c8 100644 --- a/yt_dlp/jsinterp/external.py +++ b/yt_dlp/jsinterp/external.py @@ -47,39 +47,45 @@ def cookie_jar_to_list(cookie_jar): return [cookie_to_dict(cookie) for cookie in cookie_jar] -@contextlib.contextmanager -def _temp_file(content, *, mode='wt', encoding='utf-8', suffix=None, close=True): - if 'b' in mode: - encoding = None - temp_file_handle = tempfile.NamedTemporaryFile(mode, encoding=encoding, suffix=suffix, delete=False) - try: - temp_file_handle.write(content) - if close: - temp_file_handle.close() - yield temp_file_handle - finally: +class TempFileWrapper: + def __init__(self, content=None, text=True, encoding='utf-8', suffix=None): + self.encoding = None if not text else encoding + self.text = text + self._file = tempfile.NamedTemporaryFile('wb', suffix=suffix, delete=False) + self._file.close() + if content: + self.write(content) + + @property + def name(self): + return self._file.name + + @contextlib.contextmanager + def opened_file(self, mode, *, seek=None, seek_whence=0): + mode = mode if (self.text or 'b' in mode) else mode + 'b' + with open(self._file.name, mode, encoding=self.encoding) as f: + if seek is not None: + self._file.seek(seek, seek_whence) + yield f + + def write(self, s, seek=None, seek_whence=0): + with self.opened_file('w', seek=seek, seek_whence=seek_whence) as f: + f.write(s) + + def append_write(self, s, seek=None, seek_whence=0): + with self.opened_file('a', seek=seek, seek_whence=seek_whence) as f: + f.write(s) + + def read(self, n=-1, seek=None, seek_whence=0): + with self.opened_file('r', seek=seek, seek_whence=seek_whence) as f: + return f.read(n) + + def cleanup(self): with contextlib.suppress(OSError): - os.remove(temp_file_handle.name) + os.remove(self._file.name) - -@contextlib.contextmanager -def _tempfile_context(): - handles = [] - - def _creater(content, *, mode='wt', encoding='utf-8', suffix=None, close=True): - encoding = None if 'b' in mode else encoding - handle = tempfile.NamedTemporaryFile(mode, encoding=encoding, suffix=suffix, delete=False) - handles.append(handle) - handle.write(content) - if close: - handle.close() - return handle - try: - yield _creater - finally: - for handle in handles: - with contextlib.suppress(OSError): - os.remove(handle.name) + def __del__(self): + self.cleanup() class ExternalJSI: @@ -117,20 +123,20 @@ def __init__(self, extractor: InfoExtractor, required_version=None, timeout=1000 @classmethod def _execute(cls, jscode, extractor=None, video_id=None, note='', flags=[], timeout=10000): - with _temp_file(jscode, suffix='.js') as js_file: - if note and extractor: - extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') - cmd = [cls.exe, 'run', *flags, js_file.name] - try: - stdout, stderr, returncode = Popen.run( - cmd, timeout=timeout / 1000, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except Exception as e: - raise ExtractorError('Unable to run Deno binary', cause=e) - if returncode: - raise ExtractorError(f'Failed with returncode {returncode}:\n{stderr}') - elif stderr and extractor: - extractor.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id) - return stdout.strip() + js_file = TempFileWrapper(jscode) + if note and extractor: + extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') + cmd = [cls.exe, 'run', *flags, js_file.name] + try: + stdout, stderr, returncode = Popen.run( + cmd, timeout=timeout / 1000, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except Exception as e: + raise ExtractorError('Unable to run Deno binary', cause=e) + if returncode: + raise ExtractorError(f'Failed with returncode {returncode}:\n{stderr}') + elif stderr and extractor: + extractor.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id) + return stdout.strip() def execute(self, jscode, video_id=None, *, note='Executing JS in Deno', flags=[], base_js=None): """Execute JS directly in Deno runtime and return stdout"""