From fdd98ba6e0f430f89d41293a3c779b4f3d5a796c Mon Sep 17 00:00:00 2001 From: c-basalt <117849907+c-basalt@users.noreply.github.com> Date: Sun, 29 Dec 2024 02:56:58 -0500 Subject: [PATCH] update --- yt_dlp/extractor/douyutv.py | 13 ++--- yt_dlp/jsinterp/common.py | 57 +++++++++++++------- yt_dlp/jsinterp/external.py | 102 +++++++++++++++++++++--------------- 3 files changed, 101 insertions(+), 71 deletions(-) diff --git a/yt_dlp/extractor/douyutv.py b/yt_dlp/extractor/douyutv.py index c6f816bc6..b413f33d2 100644 --- a/yt_dlp/extractor/douyutv.py +++ b/yt_dlp/extractor/douyutv.py @@ -4,7 +4,7 @@ import uuid from .common import InfoExtractor -from ..jsinterp import DenoJSI, PhantomJSwrapper +from ..jsinterp import PhantomJSwrapper from ..utils import ( ExtractorError, UserNotLive, @@ -43,14 +43,9 @@ def _calc_sign(self, sign_func, video_id, a): b = uuid.uuid4().hex c = round(time.time()) js_script = f'{self._get_cryptojs_md5(video_id)};{sign_func};console.log(ub98484234("{a}","{b}","{c}"))' - if DenoJSI.is_available: - jsi = DenoJSI(self) - elif PhantomJSwrapper.is_available: - jsi = PhantomJSwrapper(self) - else: - raise ExtractorError('You need to install either Deno or PhantomJS. ' - f'{DenoJSI.INSTALL_HINT}. {PhantomJSwrapper.INSTALL_HINT}', expected=True) - result = jsi.execute(js_script, video_id, note='Executing JS signing script').strip() + phantom = PhantomJSwrapper(self) + result = phantom.execute(js_script, video_id, + note='Executing JS signing script').strip() return {i: v[0] for i, v in urllib.parse.parse_qs(result).items()} def _search_js_sign_func(self, webpage, fatal=True): diff --git a/yt_dlp/jsinterp/common.py b/yt_dlp/jsinterp/common.py index d6594fd42..8f49fb2a3 100644 --- a/yt_dlp/jsinterp/common.py +++ b/yt_dlp/jsinterp/common.py @@ -4,16 +4,16 @@ import typing import functools -from ..utils import classproperty, variadic, ExtractorError +from ..utils import classproperty, format_field, variadic, ExtractorError from ..extractor.common import InfoExtractor -DEFAULT_TIMEOUT = 10000 _JSI_HANDLERS: dict[str, type[JSI]] = {} _JSI_PREFERENCES: set[JSIPreference] = set() _ALL_FEATURES = { 'js', 'wasm', + 'location', 'dom', } @@ -60,8 +60,9 @@ class JSInterp: @param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key. @param preferred_order: list of JSI to use. First in list is tested first. @param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback - @param timeout: explicit timeout parameter in miliseconds for all chosen JSI + @param timeout: timeout parameter for all chosen JSI """ + def __init__( self, dl_or_ie: YoutubeDL | InfoExtractor, @@ -71,7 +72,7 @@ def __init__( jsi_params: dict[str, dict] = {}, preferred_order: typing.Iterable[str | type[JSI]] = [], fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [], - timeout: float | None = None, + timeout: float | int = 10, ): self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie self._features = set(features) @@ -86,7 +87,7 @@ def __init__( self.write_debug(f'Selected JSI classes for given features: {get_jsi_keys(handler_classes)}, ' f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}') - self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout, **jsi_params.get(cls.JSI_KEY, {})) + self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout=timeout, **jsi_params.get(cls.JSI_KEY, {})) for cls in handler_classes} self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi) @@ -166,40 +167,56 @@ def _dispatch_request(self, method_name: str, *args, **kwargs): msg = f'{msg}. You can try installing one of unavailable JSI: {join_jsi_name(unavailable)}' raise ExtractorError(msg) - @require_features({'html': 'dom'}) - def execute(self, jscode: str, url: str | None = None, html: str | None = None) -> str: + @require_features({'url': 'location', 'html': 'dom'}) + def execute(self, jscode: str, video_id: str | None, **kwargs) -> str: """ Execute JS code and return stdout from console.log - `html` requires `dom` feature - """ - return self._dispatch_request('execute', jscode, url=url, html=html) - @require_features({'html': 'dom'}) - def evaluate(self, jscode: str, url: str | None = None, html: str | None = None) -> typing.Any: + @param {str} jscode: JS code to execute + @param video_id: video id + @param note: note + @param {str} url: url to set location to, requires `location` feature + @param {str} html: html to load as document, requires `dom` feature + """ + return self._dispatch_request('execute', jscode, video_id, **kwargs) + + @require_features({'url': 'location', 'html': 'dom'}) + def evaluate(self, jscode: str, video_id: str | None, **kwargs) -> typing.Any: """ Evaluate JS code and return result - `html` requires `dom` feature + + @param {str} jscode: JS code to execute + @param video_id: video id + @param note: note + @param {str} url: url to set location to, requires `location` feature + @param {str} html: html to load as document, requires `dom` feature """ - return self._dispatch_request('evaluate', jscode, url=url, html=html) + return self._dispatch_request('evaluate', jscode, video_id, **kwargs) class JSI(abc.ABC): _SUPPORT_FEATURES: set[str] = set() _BASE_PREFERENCE: int = 0 - def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None): + def __init__(self, downloader: YoutubeDL, timeout: float | int): self._downloader = downloader - self.timeout = float(timeout or DEFAULT_TIMEOUT) + self.timeout = timeout @abc.abstractmethod def is_available(self) -> bool: raise NotImplementedError - def write_debug(self, message, only_once=False): - return self._downloader.write_debug(f'[{self.JSI_KEY}] {message}', only_once=only_once) + def write_debug(self, message, *args, **kwargs): + self._downloader.write_debug(f'[{self.JSI_KEY}] {message}', *args, **kwargs) - def report_warning(self, message, only_once=False): - return self._downloader.report_warning(f'[{self.JSI_KEY}] {message}', only_once=only_once) + def report_warning(self, message, *args, **kwargs): + self._downloader.report_warning(f'[{self.JSI_KEY}] {message}', *args, **kwargs) + + def to_screen(self, msg, *args, **kwargs): + self._downloader.to_screen(f'[{self.JSI_KEY}] {msg}', *args, **kwargs) + + def report_note(self, video_id, note): + self.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') @classproperty def JSI_NAME(cls) -> str: diff --git a/yt_dlp/jsinterp/external.py b/yt_dlp/jsinterp/external.py index 36c668938..f0ddf74ea 100644 --- a/yt_dlp/jsinterp/external.py +++ b/yt_dlp/jsinterp/external.py @@ -98,21 +98,17 @@ def __del__(self): class ExternalJSI(JSI, abc.ABC): - _EXE_NAME: str = None + _EXE_NAME: str @classproperty(cache=True) - def version(cls): + def exe_version(cls): return get_exe_version(cls._EXE_NAME, args=getattr(cls, 'V_ARGS', ['--version']), version_re=r'([0-9.]+)') - @classproperty - def full_version(cls): - return cls.version - @classproperty def exe(cls): - return cls._EXE_NAME if cls.version else None + return cls._EXE_NAME if cls.exe_version else None - @classproperty + @classmethod def is_available(cls): return bool(cls.exe) @@ -121,55 +117,78 @@ def is_available(cls): class DenoJSI(ExternalJSI): """JS interpreter class using Deno binary""" _EXE_NAME = 'deno' - INSTALL_HINT = 'Please install Deno from https://docs.deno.com/runtime/manual/getting_started/installation/ or download binary from https://github.com/denoland/deno/releases' - _SUPPORTED_FEATURES = {'js', 'wasm'} + _SUPPORTED_FEATURES = {'js', 'wasm', 'location'} + _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check'] + _INIT_SCRIPT = 'localStorage.clear(); delete window.Deno; global = window;\n' - def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None, required_version=None): + def __init__(self, downloader: YoutubeDL, timeout=None, flags=[], replace_flags=False, init_script=None): super().__init__(downloader, timeout) + self._flags = flags if replace_flags else [*self._DENO_FLAGS, *flags] + self._init_script = self._INIT_SCRIPT if init_script is None else init_script - @classmethod - def _execute(cls, jscode, downloader: YoutubeDL | None = None, video_id=None, note='', flags=[], timeout=10000): - js_file = TempFileWrapper(jscode, suffix='.js') - if note and downloader: - downloader.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') - cmd = [cls.exe, 'run', *flags, js_file.name] + def _run_deno(self, cmd, video_id=None): + self.write_debug(f'Deno command line: {shell_quote(cmd)}') try: stdout, stderr, returncode = Popen.run( - cmd, timeout=timeout / 1000, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + cmd, timeout=self.timeout, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except Exception as e: raise ExtractorError('Unable to run Deno binary', cause=e) if returncode: raise ExtractorError(f'Failed with returncode {returncode}:\n{stderr}') - elif stderr and downloader: - downloader.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id) + elif stderr: + self.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id) return stdout.strip() - def execute(self, jscode, video_id=None, note='Executing JS in Deno', flags=[], base_js=None): - """Execute JS directly in Deno runtime and return stdout""" - - base_js = 'delete window.Deno; global = window;\n' if base_js is None else base_js - - return self._execute(base_js + jscode, downloader=self._downloader, video_id=video_id, note=note, - flags=flags, timeout=self.timeout) + def execute(self, jscode, video_id=None, note='Executing JS in Deno', url=None): + self.report_note(video_id, note) + js_file = TempFileWrapper(f'{self._init_script};\n{jscode}', suffix='.js') + location_args = ['--location', url] if url else [] + cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name] + return self._run_deno(cmd, video_id=video_id) @register_jsi class DenoJITlessJSI(DenoJSI): _EXE_NAME = DenoJSI._EXE_NAME - INSTALL_HINT = DenoJSI.INSTALL_HINT - _SUPPORTED_FEATURES = {'js'} + _SUPPORTED_FEATURES = {'js', 'location'} + _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check', '--v8-flags=--jitless,--noexpose-wasm'] @classproperty - def version(cls): - return DenoJSI.version - - def execute(self, jscode, video_id=None, note='Executing JS in Deno', flags=[], base_js=None): - # JIT-less mode does not support Wasm - return super().execute(jscode, video_id, note=note, - flags=[*flags, '--v8-flags=--jitless,--noexpose-wasm'], base_js=base_js) + def exe_version(cls): + return DenoJSI.exe_version + + +class DenoJSDomJSI(DenoJSI): + _SUPPORTED_FEATURES = {'js', 'wasm', 'dom'} + _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check'] + _JSDOM_IMPORT = False + + def _ensure_jsdom(self): + if self._JSDOM_IMPORT: + return + js_file = TempFileWrapper('import { JSDOM } from "https://cdn.esm.sh/jsdom"', suffix='.js') + cmd = [self.exe, 'run', js_file.name] + self._run_deno(cmd) + self._JSDOM_IMPORT = True + + def execute(self, jscode, video_id=None, note='Executing JS in Deno', url=None, html=None): + self.report_note(video_id, note) + if html: + self._ensure_jsdom() + init_script = '''%s; + import { JSDOM } from "https://cdn.esm.sh/jsdom"; + const dom = new JSDOM(%s); + Object.keys(dom.window).forEach((key) => {try {window[key] = dom.window[key]} catch (e) {}}); + ''' % (self._init_script, json.dumps(html)) + else: + init_script = self._init_script + js_file = TempFileWrapper(f'{init_script};\n{jscode}', suffix='.js') + + location_args = ['--location', url] if url else [] + cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name] + return self._run_deno(cmd, video_id=video_id) -@register_jsi class PuppeteerJSI(ExternalJSI): _PACKAGE_VERSION = '16.2.0' _HEADLESS = False @@ -200,8 +219,8 @@ def full_version(cls): return None @classproperty - def version(cls): - return DenoJSI.version if cls.full_version else None + def exe_version(cls): + return DenoJSI.exe_version if cls.full_version else None def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None): super().__init__(downloader, timeout) @@ -306,7 +325,7 @@ class PhantomJSwrapper(ExternalJSI): @classmethod def _version(cls): - return cls.version + return cls.exe_version def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000): self._TMP_FILES = {} @@ -317,7 +336,7 @@ def __init__(self, extractor: InfoExtractor, required_version=None, timeout=1000 self.extractor = extractor if required_version: - if is_outdated_version(self.version, required_version): + if is_outdated_version(self.exe_version, required_version): self.extractor._downloader.report_warning( 'Your copy of PhantomJS is outdated, update it to version ' f'{required_version} or newer if you encounter any errors.') @@ -444,5 +463,4 @@ def execute(self, jscode, video_id=None, *, note='Executing JS in PhantomJS'): if typing.TYPE_CHECKING: from ..YoutubeDL import YoutubeDL - # from .common import JSIRequest, JSIResponse from ..extractor.common import InfoExtractor