From 6d622d5481126f88f72f747c7a0a300e75b1ccf2 Mon Sep 17 00:00:00 2001 From: c-basalt <117849907+c-basalt@users.noreply.github.com> Date: Sun, 29 Dec 2024 20:34:05 -0500 Subject: [PATCH] extract phantomjs --- yt_dlp/jsinterp/common.py | 9 +- yt_dlp/jsinterp/external.py | 239 +++++++++++++++++++----------------- 2 files changed, 133 insertions(+), 115 deletions(-) diff --git a/yt_dlp/jsinterp/common.py b/yt_dlp/jsinterp/common.py index 4a6ba65d28..aae8ea69d4 100644 --- a/yt_dlp/jsinterp/common.py +++ b/yt_dlp/jsinterp/common.py @@ -62,6 +62,7 @@ class JSInterp: @param preferred_order: list of JSI to use. First in list is tested first. @param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback @param timeout: timeout parameter for all chosen JSI + @param user_agent: override user-agent to use for supported JSI """ def __init__( @@ -74,6 +75,7 @@ def __init__( preferred_order: typing.Iterable[str | type[JSI]] = [], fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [], timeout: float | int = 10, + user_agent: str | None = None, ): self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie self._features = set(features) @@ -88,8 +90,8 @@ def __init__( self.write_debug(f'Selected JSI classes for given features: {get_jsi_keys(handler_classes)}, ' f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}') - self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout=timeout, **jsi_params.get(cls.JSI_KEY, {})) - for cls in handler_classes} + self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout=timeout, user_agent=user_agent, + **jsi_params.get(cls.JSI_KEY, {})) for cls in handler_classes} self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi) self._is_test = self._downloader.params.get('test', False) @@ -187,9 +189,10 @@ class JSI(abc.ABC): _SUPPORT_FEATURES: set[str] = set() _BASE_PREFERENCE: int = 0 - def __init__(self, downloader: YoutubeDL, timeout: float | int): + def __init__(self, downloader: YoutubeDL, timeout: float | int, user_agent=None): self._downloader = downloader self.timeout = timeout + self.user_agent: str = user_agent or self._downloader.params['http_headers']['User-Agent'] @abc.abstractmethod def is_available(self) -> bool: diff --git a/yt_dlp/jsinterp/external.py b/yt_dlp/jsinterp/external.py index be09f31315..a944261f4e 100644 --- a/yt_dlp/jsinterp/external.py +++ b/yt_dlp/jsinterp/external.py @@ -1,7 +1,6 @@ from __future__ import annotations import abc -import collections import contextlib import json import os @@ -99,6 +98,12 @@ def cleanup(self): def __del__(self): self.cleanup() + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.cleanup() + class ExternalJSI(JSI, abc.ABC): _EXE_NAME: str @@ -119,8 +124,9 @@ def is_available(cls): @register_jsi class DenoJSI(ExternalJSI): """JS interpreter class using Deno binary""" - _EXE_NAME = 'deno' _SUPPORTED_FEATURES = {'js', 'wasm', 'location'} + _BASE_PREFERENCE = 5 + _EXE_NAME = 'deno' _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check'] _INIT_SCRIPT = 'localStorage.clear(); delete window.Deno; global = window;\n' @@ -144,16 +150,17 @@ def _run_deno(self, cmd): def execute(self, jscode, video_id=None, note='Executing JS in Deno', location=None): self.report_note(video_id, note) - js_file = TempFileWrapper(f'{self._init_script};\n{jscode}', suffix='.js') location_args = ['--location', location] if location else [] - cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name] - return self._run_deno(cmd) + with TempFileWrapper(f'{self._init_script};\n{jscode}', suffix='.js') as js_file: + cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name] + return self._run_deno(cmd) @register_jsi class DenoJITlessJSI(DenoJSI): - _EXE_NAME = DenoJSI._EXE_NAME _SUPPORTED_FEATURES = {'js', 'location'} + _BASE_PREFERENCE = 6 + _EXE_NAME = DenoJSI._EXE_NAME _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check', '--v8-flags=--jitless,--noexpose-wasm'] @classproperty @@ -163,8 +170,9 @@ def exe_version(cls): class DenoJSDomJSI(DenoJSI): _SUPPORTED_FEATURES = {'js', 'wasm', 'location', 'dom', 'cookies'} + _BASE_PREFERENCE = 4 _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check'] - _JSDOM_IMPORT = False + _JSDOM_IMPORT_CHECKED = False @staticmethod def serialize_cookie(cookiejar: YoutubeDLCookieJar | None, url: str): @@ -210,12 +218,12 @@ def apply_cookies(cookiejar: YoutubeDLCookieJar | None, cookies: list[dict]): False, None, None, {})) def _ensure_jsdom(self): - if self._JSDOM_IMPORT: + if self._JSDOM_IMPORT_CHECKED: return - js_file = TempFileWrapper('import jsdom from "https://cdn.esm.sh/jsdom"', suffix='.js') - cmd = [self.exe, 'run', js_file.name] - self._run_deno(cmd) - self._JSDOM_IMPORT = True + with TempFileWrapper('import jsdom from "https://cdn.esm.sh/jsdom"', suffix='.js') as js_file: + cmd = [self.exe, 'run', js_file.name] + self._run_deno(cmd) + self._JSDOM_IMPORT_CHECKED = True def execute(self, jscode, video_id=None, note='Executing JS in Deno', location='', html='', cookiejar=None): self.report_note(video_id, note) @@ -241,11 +249,10 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno', location=' }})().finally(callback); ''' - js_file = TempFileWrapper(script, suffix='.js') - location_args = ['--location', location] if location else [] - cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name] - data = json.loads(self._run_deno(cmd)) + with TempFileWrapper(script, suffix='.js') as js_file: + cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name] + data = json.loads(self._run_deno(cmd)) self.apply_cookies(cookiejar, data['cookies']) return data['stdout'] @@ -327,13 +334,11 @@ def execute(self, jscode, video_id=None, note='Executing JS in Puppeteer', url=' ''') -class PhantomJSwrapper(ExternalJSI): - """PhantomJS wrapper class - - This class is experimental. - """ +@register_jsi +class PhantomJSJSI(ExternalJSI): _EXE_NAME = 'phantomjs' - INSTALL_HINT = 'Please download PhantomJS from https://phantomjs.org/download.html' + _SUPPORTED_FEATURES = {'js', 'location', 'cookies'} + _BASE_PREFERENCE = 3 _BASE_JS = R''' phantom.onError = function(msg, trace) {{ @@ -355,11 +360,11 @@ class PhantomJSwrapper(ExternalJSI): var fs = require('fs'); var read = {{ mode: 'r', charset: 'utf-8' }}; var write = {{ mode: 'w', charset: 'utf-8' }}; - JSON.parse(fs.read("{cookies}", read)).forEach(function(x) {{ + JSON.parse(fs.read({cookies_fn}, read)).forEach(function(x) {{ phantom.addCookie(x); }}); page.settings.resourceTimeout = {timeout}; - page.settings.userAgent = "{ua}"; + page.settings.userAgent = {ua}; page.onLoadStarted = function() {{ page.evaluate(function() {{ delete window._phantom; @@ -367,13 +372,13 @@ class PhantomJSwrapper(ExternalJSI): }}); }}; var saveAndExit = function() {{ - fs.write("{html}", page.content, write); - fs.write("{cookies}", JSON.stringify(phantom.cookies), write); + fs.write({html_fn}, page.content, write); + fs.write({cookies_fn}, JSON.stringify(phantom.cookies), write); phantom.exit(); }}; page.onLoadFinished = function(status) {{ if(page.url === "") {{ - page.setContent(fs.read("{html}", read), "{url}"); + page.setContent(fs.read({html_fn}, read), {url}); }} else {{ {jscode} @@ -382,62 +387,105 @@ class PhantomJSwrapper(ExternalJSI): page.open(""); ''' - _TMP_FILE_NAMES = ['script', 'html', 'cookies'] - - @classmethod - def _version(cls): - return cls.exe_version - - def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000): - self._TMP_FILES = {} - - if not self.exe: - raise ExtractorError(f'PhantomJS not found, {self.INSTALL_HINT}', expected=True) - - self.extractor = extractor - - if required_version: - if is_outdated_version(self.exe_version, required_version): - self.extractor._downloader.report_warning( - 'Your copy of PhantomJS is outdated, update it to version ' - f'{required_version} or newer if you encounter any errors.') - - for name in self._TMP_FILE_NAMES: - tmp = tempfile.NamedTemporaryFile(delete=False) - tmp.close() - self._TMP_FILES[name] = tmp - - self.options = collections.ChainMap({ - 'timeout': timeout, - }, { - x: self._TMP_FILES[x].name.replace('\\', '\\\\').replace('"', '\\"') - for x in self._TMP_FILE_NAMES - }) - - def __del__(self): - for name in self._TMP_FILE_NAMES: - with contextlib.suppress(OSError, KeyError): - os.remove(self._TMP_FILES[name].name) - - def _save_cookies(self, url): - cookies = cookie_jar_to_list(self.extractor.cookiejar) + def _save_cookies(self, url, cookiejar): + cookies = cookie_jar_to_list(cookiejar) if cookiejar else [] for cookie in cookies: if 'path' not in cookie: cookie['path'] = '/' if 'domain' not in cookie: cookie['domain'] = urllib.parse.urlparse(url).netloc - with open(self._TMP_FILES['cookies'].name, 'wb') as f: - f.write(json.dumps(cookies).encode()) + return json.dumps(cookies) - def _load_cookies(self): - with open(self._TMP_FILES['cookies'].name, 'rb') as f: - cookies = json.loads(f.read().decode('utf-8')) + def _load_cookies(self, cookies_json: str, cookiejar): + if not cookiejar: + return + cookies = json.loads(cookies_json) for cookie in cookies: - if cookie['httponly'] is True: - cookie['rest'] = {'httpOnly': None} - if 'expiry' in cookie: - cookie['expire_time'] = cookie['expiry'] - self.extractor._set_cookie(**cookie) + cookiejar.set_cookie(http.cookiejar.Cookie( + 0, cookie['name'], cookie['value'], cookie.get('port'), cookie.get('port') is not None, + cookie['domain'], True, cookie['domain'].startswith('.'), + cookie.get('path', '/'), True, + cookie.get('secure', False), cookie.get('expiry'), + cookie.get('discard', False), None, None, + {'httpOnly': None} if cookie.get('httponly') is True else {} + )) + + def _execute(self, jscode: str, video_id=None, *, note='Executing JS in PhantomJS'): + """Execute JS and return stdout""" + if 'phantom.exit();' not in jscode: + jscode += ';\nphantom.exit();' + jscode = self._BASE_JS + jscode + + self.report_note(video_id, note) + with TempFileWrapper(jscode, suffix='.js') as js_file: + cmd = [self.exe, '--ssl-protocol=any', js_file.name] + self.write_debug(f'PhantomJS command line: {shell_quote(cmd)}') + try: + stdout, stderr, returncode = Popen.run( + cmd, timeout=self.timeout, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + except Exception as e: + raise ExtractorError(f'{note} failed: Unable to run PhantomJS binary', cause=e) + if returncode: + raise ExtractorError(f'{note} failed with returncode {returncode}:\n{stderr.strip()}') + return stdout + + def _execute_html(self, jscode: str, url: str, html: str, cookiejar, video_id=None, note='Executing JS on webpage'): + if 'saveAndExit();' not in jscode: + raise ExtractorError('`saveAndExit();` not found in `jscode`') + + html_file = TempFileWrapper(html, suffix='.html') + cookie_file = TempFileWrapper(self._save_cookies(url, cookiejar), suffix='.json') + + jscode = self._TEMPLATE.format(**{ + 'url': json.dumps(str(url)), + 'ua': json.dumps(str(self.user_agent)), + 'jscode': jscode, + 'html_fn': json.dumps(html_file.name), + 'cookies_fn': json.dumps(cookie_file.name), + 'timeout': int(self.timeout * 1000), + }) + + stdout = self._execute(jscode, video_id, note=note) + self._load_cookies(cookie_file.read(), cookiejar) + new_html = html_file.read() + + return new_html, stdout + + def execute(self, jscode, video_id=None, + note='Executing JS in PhantomJS', location=None, html='', cookiejar=None): + if location: + return self._execute_html(jscode, location, html, cookiejar, video_id=video_id, note=note)[1] + if html: + self.report_warning('`location` is required to use `html`') + if cookiejar: + self.report_warning('`location` and `html` are required to use `cookiejar`') + return self._execute(jscode, video_id, note=note) + + +class PhantomJSwrapper: + """PhantomJS wrapper class + + This class is experimental. + """ + INSTALL_HINT = 'Please download PhantomJS from https://phantomjs.org/download.html' + + @classmethod + def _version(cls): + return PhantomJSJSI.exe_version + + def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000): + self._jsi = PhantomJSJSI(extractor._downloader, timeout=timeout / 1000) + + if not self._jsi.is_available(): + raise ExtractorError(f'PhantomJS not found, {self.INSTALL_HINT}', expected=True) + + self.extractor = extractor + + if required_version: + if is_outdated_version(self._jsi.exe_version, required_version): + self._jsi.report_warning( + 'Your copy of PhantomJS is outdated, update it to version ' + f'{required_version} or newer if you encounter any errors.') def get(self, url, html=None, video_id=None, note=None, note2='Executing JS on webpage', headers={}, jscode='saveAndExit();'): """ @@ -479,47 +527,14 @@ def get(self, url, html=None, video_id=None, note=None, note2='Executing JS on w raise ExtractorError('`saveAndExit();` not found in `jscode`') if not html: html = self.extractor._download_webpage(url, video_id, note=note, headers=headers) - with open(self._TMP_FILES['html'].name, 'wb') as f: - f.write(html.encode()) - self._save_cookies(url) + self._jsi.user_agent = headers.get('User-Agent') or self.extractor.get_param('http_headers')['User-Agent'] - user_agent = headers.get('User-Agent') or self.extractor.get_param('http_headers')['User-Agent'] - jscode = self._TEMPLATE.format_map(self.options.new_child({ - 'url': url, - 'ua': user_agent.replace('"', '\\"'), - 'jscode': jscode, - })) - - stdout = self.execute(jscode, video_id, note=note2) - - with open(self._TMP_FILES['html'].name, 'rb') as f: - html = f.read().decode('utf-8') - self._load_cookies() - - return html, stdout + return self._jsi._execute_html(jscode, url, html, self.extractor.cookiejar, video_id=video_id, note=note2) def execute(self, jscode, video_id=None, *, note='Executing JS in PhantomJS'): """Execute JS and return stdout""" - if 'phantom.exit();' not in jscode: - jscode += ';\nphantom.exit();' - jscode = self._BASE_JS + jscode - - with open(self._TMP_FILES['script'].name, 'w', encoding='utf-8') as f: - f.write(jscode) - self.extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') - - cmd = [self.exe, '--ssl-protocol=any', self._TMP_FILES['script'].name] - self.extractor.write_debug(f'PhantomJS command line: {shell_quote(cmd)}') - try: - stdout, stderr, returncode = Popen.run(cmd, timeout=self.options['timeout'] / 1000, - text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - except Exception as e: - raise ExtractorError(f'{note} failed: Unable to run PhantomJS binary', cause=e) - if returncode: - raise ExtractorError(f'{note} failed with returncode {returncode}:\n{stderr.strip()}') - - return stdout + return self._jsi.execute(jscode, video_id=video_id, note=note) if typing.TYPE_CHECKING: