diff --git a/yt_dlp/extractor/douyutv.py b/yt_dlp/extractor/douyutv.py index 6417c63a5..c6f816bc6 100644 --- a/yt_dlp/extractor/douyutv.py +++ b/yt_dlp/extractor/douyutv.py @@ -4,7 +4,7 @@ import uuid from .common import InfoExtractor -from ..jsinterp import DenoWrapper, PhantomJSwrapper +from ..jsinterp import DenoJSI, PhantomJSwrapper from ..utils import ( ExtractorError, UserNotLive, @@ -43,13 +43,13 @@ def _calc_sign(self, sign_func, video_id, a): b = uuid.uuid4().hex c = round(time.time()) js_script = f'{self._get_cryptojs_md5(video_id)};{sign_func};console.log(ub98484234("{a}","{b}","{c}"))' - if DenoWrapper.is_available: - jsi = DenoWrapper(self) + if DenoJSI.is_available: + jsi = DenoJSI(self) elif PhantomJSwrapper.is_available: jsi = PhantomJSwrapper(self) else: raise ExtractorError('You need to install either Deno or PhantomJS. ' - f'{DenoWrapper.INSTALL_HINT}. {PhantomJSwrapper.INSTALL_HINT}', expected=True) + f'{DenoJSI.INSTALL_HINT}. {PhantomJSwrapper.INSTALL_HINT}', expected=True) result = jsi.execute(js_script, video_id, note='Executing JS signing script').strip() return {i: v[0] for i, v in urllib.parse.parse_qs(result).items()} diff --git a/yt_dlp/jsinterp/__init__.py b/yt_dlp/jsinterp/__init__.py index 282385f0c..3daaba894 100644 --- a/yt_dlp/jsinterp/__init__.py +++ b/yt_dlp/jsinterp/__init__.py @@ -1,10 +1,14 @@ from .native import JSInterpreter as NativeJSI -from .external import PhantomJSwrapper, DenoWrapper, PuppeteerWrapper +from .external import PhantomJSwrapper, DenoJSI, PuppeteerJSI +from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSIDirector __all__ = [ NativeJSI, PhantomJSwrapper, - DenoWrapper, - PuppeteerWrapper, + DenoJSI, + PuppeteerJSI, + _JSI_HANDLERS, + _JSI_PREFERENCES, + JSIDirector, ] diff --git a/yt_dlp/jsinterp/common.py b/yt_dlp/jsinterp/common.py new file mode 100644 index 000000000..934dd5122 --- /dev/null +++ b/yt_dlp/jsinterp/common.py @@ -0,0 +1,203 @@ +from __future__ import annotations + +import abc +import typing +# import dataclasses + +from ..utils import classproperty + + +DEFAULT_TIMEOUT = 10000 +_JSI_HANDLERS: dict[str, type[JSI]] = {} +_JSI_PREFERENCES: set[JSIPreference] = set() +_ALL_FEATURES = { + 'js', + 'wasm', + 'dom', +} + + +def get_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]: + return [jok if isinstance(jok, str) else jok.JSI_KEY for jok in jsi_or_keys] + + +def order_to_pref(jsi_order: typing.Iterable[str | type[JSI] | JSI], multiplier: int) -> JSIPreference: + jsi_order = reversed(get_jsi_keys(jsi_order)) + pref_score = {jsi_cls: (i + 1) * multiplier for i, jsi_cls in enumerate(jsi_order)} + + def _pref(jsi: JSI, *args): + return pref_score.get(jsi.JSI_KEY, 0) + return _pref + + +def join_jsi_name(jsi_list: typing.Iterable[str | type[JSI] | JSI], sep=', '): + return sep.join(get_jsi_keys(jok if isinstance(jok, str) else jok.JSI_NAME for jok in jsi_list)) + + +class JSIExec(typing.Protocol): + @abc.abstractmethod + def execute(self, jscode: str) -> str: + """Execute JS code and return console.log contents, using `html` requires `dom` feature""" + + +class JSIDirector(JSIExec): + """JSIDirector class + + Helper class to forward JS interpretation need to a JSI that supports it. + + @param downloader: downloader instance. + @param features: list of features that JSI must support. + @param only_include: list of JSI to choose from. + @param exclude: list of JSI to avoid using. + @param jsi_params: extra parameters to pass to `JSI.__init__()`. + @param preferred_order: list of JSI to use. First in list is tested first. + @param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback + @param timeout: timeout in miliseconds for JS interpretation + """ + def __init__( + self, + downloader: YoutubeDL, + features: typing.Iterable[str] = [], + only_include: typing.Iterable[str | type[JSI]] = [], + exclude: typing.Iterable[str | type[JSI]] = [], + jsi_params: dict[str, dict] = {}, + preferred_order: typing.Iterable[str | type[JSI]] = [], + fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [], + timeout: float | None = None, + verbose=False, + ): + self._downloader = downloader + self._verbose = verbose + + jsi_keys = set(get_jsi_keys(only_include or _JSI_HANDLERS)) - set(get_jsi_keys(exclude)) + handler_classes = [_JSI_HANDLERS[key] for key in jsi_keys + if _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)] + if not handler_classes: + raise Exception(f'No JSI can be selected for features: {features}, ' + f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}') + + self._handler_dict = {cls.JSI_KEY: cls(downloader, timeout, **jsi_params.get(cls.JSI_KEY, {})) + for cls in handler_classes} + self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES + self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi) + + def add_handler(self, handler: JSI): + """Add a handler. If a handler of the same JSI_KEY exists, it will overwrite it""" + assert isinstance(handler, JSI), 'handler must be a JSI instance' + self._handler_dict[handler.JSI_KEY] = handler + + @property + def write_debug(self): + return self._downloader.write_debug + + def _get_handlers(self, method: str, *args, **kwargs) -> list[JSI]: + handlers = [h for h in self._handler_dict.values() if getattr(h, method, None)] + self.write_debug(f'JSIDirector has handlers for `{method}`: {handlers}') + if not handlers: + raise Exception(f'No JSI supports method `{method}`, ' + f'included handlers: {[handler.JSI_KEY for handler in self._handler_dict.values()]}') + + preferences = { + handler: sum(pref_func(handler, method, args, kwargs) for pref_func in self.preferences) + for handler in handlers + } + self._downloader.write_debug('JSI preferences for this request: {}'.format(', '.join( + f'{jsi.JSI_NAME}={pref}' for jsi, pref in preferences.items()))) + + return sorted(self._handler_dict.values(), key=preferences.get, reverse=True) + + # def _send(self, request: JSIRequest): + # unavailable_handlers = [] + # exec_errors = [] + # for handler in self._get_handlers(request): + # if not handler.is_available: + # unavailable_handlers.append(handler) + # continue + # try: + # return handler.handle(request) + # except Exception as e: + # exec_errors.append(e) + # if not request.fallback: + # raise + # raise EvaluationError + + def _get_handler_method(method_name: str): + def handler(self: JSIDirector, *args, **kwargs): + unavailable: list[JSI] = [] + exceptions: list[tuple[JSI, Exception]] = [] + for handler in self._get_handlers(method_name, *args, **kwargs): + if not handler.is_available: + self.write_debug(f'{handler.JSI_NAME} is not available') + unavailable.append(handler) + continue + try: + self.write_debug(f'Dispatching `{method_name}` task to {handler.JSI_NAME}') + return getattr(handler, method_name)(*args, **kwargs) + except Exception as e: + if handler.JSI_KEY not in self._fallback_jsi: + raise + else: + exceptions.append((handler, e)) + if not exceptions: + raise Exception(f'No available JSI installed, please install one of: {join_jsi_name(unavailable)}') + raise Exception(f'Failed to perform {method_name}, total {len(exceptions)} errors. Following JSI have been skipped and you can try installing one of them: {join_jsi_name(unavailable)}') + return handler + + execute = _get_handler_method('execute') + evaluate = _get_handler_method('evaluate') + + +class JSI(abc.ABC): + _SUPPORTED_FEATURES: set[str] = set() + _BASE_PREFERENCE: int = 0 + + def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None): + self._downloader = downloader + self.timeout = float(timeout or DEFAULT_TIMEOUT) + + @property + @abc.abstractmethod + def is_available(self) -> bool: + raise NotImplementedError + + @classproperty + def JSI_NAME(cls) -> str: + return cls.__name__[:-3] + + @classproperty + def JSI_KEY(cls) -> str: + assert cls.__name__.endswith('JSI'), 'JSI class names must end with "JSI"' + return cls.__name__[:-3] + + +def register_jsi(handler_cls: TYPE_JSI) -> TYPE_JSI: + """Register a JS interpreter class""" + assert issubclass(handler_cls, JSI), f'{handler_cls} must be a subclass of JSI' + assert handler_cls.JSI_KEY not in _JSI_HANDLERS, f'JSI {handler_cls.JSI_KEY} already registered' + assert handler_cls._SUPPORTED_FEATURES.issubset(_ALL_FEATURES), f'{handler_cls._SUPPORTED_FEATURES - _ALL_FEATURES} is not declared in `_All_FEATURES`' + _JSI_HANDLERS[handler_cls.JSI_KEY] = handler_cls + return handler_cls + + +def register_jsi_preference(*handlers: type[JSI]): + assert all(issubclass(handler, JSI) for handler in handlers), f'{handlers} must all be a subclass of JSI' + + def outer(pref_func: JSIPreference) -> JSIPreference: + def inner(handler: JSI, *args): + if not handlers or isinstance(handler, handlers): + return pref_func(handler, *args) + return 0 + _JSI_PREFERENCES.add(inner) + return inner + return outer + + +@register_jsi_preference() +def _base_preference(handler: JSI, *args): + return getattr(handler, '_BASE_PREFERENCE', 0) + + +if typing.TYPE_CHECKING: + from ..YoutubeDL import YoutubeDL + JSIPreference = typing.Callable[[JSI, str, list, dict], int] + TYPE_JSI = typing.TypeVar('TYPE_JSI') diff --git a/yt_dlp/jsinterp/external.py b/yt_dlp/jsinterp/external.py index bd022bc56..b02b5e25c 100644 --- a/yt_dlp/jsinterp/external.py +++ b/yt_dlp/jsinterp/external.py @@ -1,3 +1,6 @@ +from __future__ import annotations + +import abc import collections import contextlib import json @@ -5,8 +8,9 @@ import subprocess import tempfile import urllib.parse +import typing + -from ..extractor.common import InfoExtractor from ..utils import ( ExtractorError, Popen, @@ -16,6 +20,7 @@ is_outdated_version, shell_quote, ) +from .common import JSI, register_jsi def cookie_to_dict(cookie): @@ -50,13 +55,15 @@ def cookie_jar_to_list(cookie_jar): class TempFileWrapper: """Wrapper for NamedTemporaryFile, auto closes file after io and deletes file upon wrapper object gc""" - def __init__(self, content=None, text=True, encoding='utf-8', suffix=None): + def __init__(self, content: str | bytes | None = None, text: bool = True, + encoding='utf-8', suffix: str | None = None): self.encoding = None if not text else encoding self.text = text - self._file = tempfile.NamedTemporaryFile('wb', suffix=suffix, delete=False) - self._file.close() + self._file = tempfile.NamedTemporaryFile('w' if text else 'wb', encoding=self.encoding, + suffix=suffix, delete=False) if content: - self.write(content) + self._file.write(content) + self._file.close() @property def name(self): @@ -90,7 +97,9 @@ def __del__(self): self.cleanup() -class ExternalJSI: +class ExternalJSI(JSI, abc.ABC): + _EXE_NAME: str = None + @classproperty(cache=True) def version(cls): return get_exe_version(cls._EXE_NAME, args=getattr(cls, 'V_ARGS', ['--version']), version_re=r'([0-9.]+)') @@ -104,30 +113,25 @@ def exe(cls): return cls._EXE_NAME if cls.version else None @classproperty - def is_available(cls): - return bool(cls.exe) + def is_available(self): + return bool(self.exe) -class DenoWrapper(ExternalJSI): +@register_jsi +class DenoJSI(ExternalJSI): + """JS interpreter class using Deno binary""" _EXE_NAME = 'deno' INSTALL_HINT = 'Please install Deno from https://docs.deno.com/runtime/manual/getting_started/installation/ or download binary from https://github.com/denoland/deno/releases' + _SUPPORTED_FEATURES = {'js', 'wasm'} - def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000): - self.extractor = extractor - self.timeout = timeout - - if not self.exe: - raise ExtractorError(f'Deno not found, {self.INSTALL_HINT}', expected=True) - if required_version: - if is_outdated_version(self.version, required_version): - self.extractor.report_warning( - f'Deno is outdated, update it to version {required_version} or newer if you encounter any errors.') + def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None, required_version=None): + super().__init__(downloader, timeout) @classmethod - def _execute(cls, jscode, extractor=None, video_id=None, note='', flags=[], timeout=10000): + def _execute(cls, jscode, downloader: YoutubeDL | None = None, video_id=None, note='', flags=[], timeout=10000): js_file = TempFileWrapper(jscode, suffix='.js') - if note and extractor: - extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') + if note and downloader: + downloader.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') cmd = [cls.exe, 'run', *flags, js_file.name] try: stdout, stderr, returncode = Popen.run( @@ -136,46 +140,57 @@ def _execute(cls, jscode, extractor=None, video_id=None, note='', flags=[], time raise ExtractorError('Unable to run Deno binary', cause=e) if returncode: raise ExtractorError(f'Failed with returncode {returncode}:\n{stderr}') - elif stderr and extractor: - extractor.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id) + elif stderr and downloader: + downloader.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id) return stdout.strip() - def execute(self, jscode, video_id=None, *, note='Executing JS in Deno', flags=[], base_js=None): + def execute(self, jscode, video_id=None, note='Executing JS in Deno', flags=[], base_js=None): """Execute JS directly in Deno runtime and return stdout""" - base_js = base_js if base_js is not None else 'delete window.Deno; global = window;' + base_js = 'delete window.Deno; global = window;\n' if base_js is None else base_js - return self._execute(base_js + jscode, extractor=self.extractor, video_id=video_id, note=note, + return self._execute(base_js + jscode, downloader=self._downloader, video_id=video_id, note=note, flags=flags, timeout=self.timeout) -class DenoJITlessJSI(DenoWrapper): - def execute(self, jscode, video_id=None, *, note='Executing JS in Deno', flags=[], base_js=None): - return super().execute(jscode, video_id, note=note, base_js=base_js, - flags=[*flags, '--v8-flags=--jitless,--noexpose-wasm']) +@register_jsi +class DenoJITlessJSI(DenoJSI): + _EXE_NAME = DenoJSI._EXE_NAME + INSTALL_HINT = DenoJSI.INSTALL_HINT + _SUPPORTED_FEATURES = {'js'} + + @classproperty + def version(cls): + return DenoJSI.version + + def execute(self, jscode, video_id=None, note='Executing JS in Deno', flags=[], base_js=None): + # JIT-less mode does not support Wasm + return super().execute(jscode, video_id, note=note, + flags=[*flags, '--v8-flags=--jitless,--noexpose-wasm'], base_js=base_js) -class PuppeteerWrapper: +@register_jsi +class PuppeteerJSI(ExternalJSI): _PACKAGE_VERSION = '16.2.0' _HEADLESS = False + _EXE_NAME = DenoJSI._EXE_NAME @classproperty def INSTALL_HINT(cls): msg = f'Run "deno run -A https://deno.land/x/puppeteer@{cls._PACKAGE_VERSION}/install.ts" to install puppeteer' - if not DenoWrapper.is_available: - msg = f'{DenoWrapper.INSTALL_HINT}. Then {msg}' + if not DenoJSI.is_available: + msg = f'{DenoJSI.INSTALL_HINT}. Then {msg}' return msg @classproperty(cache=True) def full_version(cls): - if not DenoWrapper.is_available: + if not DenoJSI.is_available: return try: - browser_version = DenoWrapper._execute(f''' - import puppeteer from "https://deno.land/x/puppeteer@16.2.0/mod.ts"; + browser_version = DenoJSI._execute(f''' + import puppeteer from "https://deno.land/x/puppeteer@{cls._PACKAGE_VERSION}/mod.ts"; const browser = await puppeteer.launch({{headless: {json.dumps(bool(cls._HEADLESS))}}}); try {{ - //await (new ) console.log(await browser.version()) }} finally {{ await browser.close(); @@ -186,15 +201,11 @@ def full_version(cls): @classproperty def version(cls): - return cls._PACKAGE_VERSION if cls.full_version else None + return DenoJSI.version if cls.full_version else None - def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000): - self.deno = DenoWrapper(extractor, timeout=(timeout + 30000)) - self.timeout = timeout - self.extractor = extractor - - if required_version: - self.extractor.report_warning(f'required_version is not supported on {self.__class__.__name__}') + def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None): + super().__init__(downloader, timeout) + self.deno = DenoJSI(downloader, timeout=(self.timeout + 30000)) def _deno_execute(self, jscode, note=None): return self.deno.execute(f''' @@ -208,7 +219,7 @@ def _deno_execute(self, jscode, note=None): }}''', note=note, flags=['--allow-all'], base_js='') def execute(self, jscode, video_id=None, note='Executing JS in Puppeteer', url='about:blank'): - self.extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') + self._downloader.to_screen(f'{format_field(video_id, None, "%s: ")}{note}') return self._deno_execute(f''' const page = await browser.newPage(); window.setTimeout(async () => {{ @@ -297,7 +308,7 @@ class PhantomJSwrapper(ExternalJSI): def _version(cls): return cls.version - def __init__(self, extractor, required_version=None, timeout=10000): + def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000): self._TMP_FILES = {} if not self.exe: @@ -429,3 +440,9 @@ def execute(self, jscode, video_id=None, *, note='Executing JS in PhantomJS'): raise ExtractorError(f'{note} failed with returncode {returncode}:\n{stderr.strip()}') return stdout + + +if typing.TYPE_CHECKING: + from ..YoutubeDL import YoutubeDL + # from .common import JSIRequest, JSIResponse + from ..extractor.common import InfoExtractor