mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2025-03-09 12:50:23 -05:00
204 lines
8.1 KiB
Python
204 lines
8.1 KiB
Python
![]() |
from __future__ import annotations
|
||
|
|
||
|
import abc
|
||
|
import typing
|
||
|
# import dataclasses
|
||
|
|
||
|
from ..utils import classproperty
|
||
|
|
||
|
|
||
|
DEFAULT_TIMEOUT = 10000
|
||
|
_JSI_HANDLERS: dict[str, type[JSI]] = {}
|
||
|
_JSI_PREFERENCES: set[JSIPreference] = set()
|
||
|
_ALL_FEATURES = {
|
||
|
'js',
|
||
|
'wasm',
|
||
|
'dom',
|
||
|
}
|
||
|
|
||
|
|
||
|
def get_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]:
|
||
|
return [jok if isinstance(jok, str) else jok.JSI_KEY for jok in jsi_or_keys]
|
||
|
|
||
|
|
||
|
def order_to_pref(jsi_order: typing.Iterable[str | type[JSI] | JSI], multiplier: int) -> JSIPreference:
|
||
|
jsi_order = reversed(get_jsi_keys(jsi_order))
|
||
|
pref_score = {jsi_cls: (i + 1) * multiplier for i, jsi_cls in enumerate(jsi_order)}
|
||
|
|
||
|
def _pref(jsi: JSI, *args):
|
||
|
return pref_score.get(jsi.JSI_KEY, 0)
|
||
|
return _pref
|
||
|
|
||
|
|
||
|
def join_jsi_name(jsi_list: typing.Iterable[str | type[JSI] | JSI], sep=', '):
|
||
|
return sep.join(get_jsi_keys(jok if isinstance(jok, str) else jok.JSI_NAME for jok in jsi_list))
|
||
|
|
||
|
|
||
|
class JSIExec(typing.Protocol):
|
||
|
@abc.abstractmethod
|
||
|
def execute(self, jscode: str) -> str:
|
||
|
"""Execute JS code and return console.log contents, using `html` requires `dom` feature"""
|
||
|
|
||
|
|
||
|
class JSIDirector(JSIExec):
|
||
|
"""JSIDirector class
|
||
|
|
||
|
Helper class to forward JS interpretation need to a JSI that supports it.
|
||
|
|
||
|
@param downloader: downloader instance.
|
||
|
@param features: list of features that JSI must support.
|
||
|
@param only_include: list of JSI to choose from.
|
||
|
@param exclude: list of JSI to avoid using.
|
||
|
@param jsi_params: extra parameters to pass to `JSI.__init__()`.
|
||
|
@param preferred_order: list of JSI to use. First in list is tested first.
|
||
|
@param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback
|
||
|
@param timeout: timeout in miliseconds for JS interpretation
|
||
|
"""
|
||
|
def __init__(
|
||
|
self,
|
||
|
downloader: YoutubeDL,
|
||
|
features: typing.Iterable[str] = [],
|
||
|
only_include: typing.Iterable[str | type[JSI]] = [],
|
||
|
exclude: typing.Iterable[str | type[JSI]] = [],
|
||
|
jsi_params: dict[str, dict] = {},
|
||
|
preferred_order: typing.Iterable[str | type[JSI]] = [],
|
||
|
fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [],
|
||
|
timeout: float | None = None,
|
||
|
verbose=False,
|
||
|
):
|
||
|
self._downloader = downloader
|
||
|
self._verbose = verbose
|
||
|
|
||
|
jsi_keys = set(get_jsi_keys(only_include or _JSI_HANDLERS)) - set(get_jsi_keys(exclude))
|
||
|
handler_classes = [_JSI_HANDLERS[key] for key in jsi_keys
|
||
|
if _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)]
|
||
|
if not handler_classes:
|
||
|
raise Exception(f'No JSI can be selected for features: {features}, '
|
||
|
f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}')
|
||
|
|
||
|
self._handler_dict = {cls.JSI_KEY: cls(downloader, timeout, **jsi_params.get(cls.JSI_KEY, {}))
|
||
|
for cls in handler_classes}
|
||
|
self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES
|
||
|
self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi)
|
||
|
|
||
|
def add_handler(self, handler: JSI):
|
||
|
"""Add a handler. If a handler of the same JSI_KEY exists, it will overwrite it"""
|
||
|
assert isinstance(handler, JSI), 'handler must be a JSI instance'
|
||
|
self._handler_dict[handler.JSI_KEY] = handler
|
||
|
|
||
|
@property
|
||
|
def write_debug(self):
|
||
|
return self._downloader.write_debug
|
||
|
|
||
|
def _get_handlers(self, method: str, *args, **kwargs) -> list[JSI]:
|
||
|
handlers = [h for h in self._handler_dict.values() if getattr(h, method, None)]
|
||
|
self.write_debug(f'JSIDirector has handlers for `{method}`: {handlers}')
|
||
|
if not handlers:
|
||
|
raise Exception(f'No JSI supports method `{method}`, '
|
||
|
f'included handlers: {[handler.JSI_KEY for handler in self._handler_dict.values()]}')
|
||
|
|
||
|
preferences = {
|
||
|
handler: sum(pref_func(handler, method, args, kwargs) for pref_func in self.preferences)
|
||
|
for handler in handlers
|
||
|
}
|
||
|
self._downloader.write_debug('JSI preferences for this request: {}'.format(', '.join(
|
||
|
f'{jsi.JSI_NAME}={pref}' for jsi, pref in preferences.items())))
|
||
|
|
||
|
return sorted(self._handler_dict.values(), key=preferences.get, reverse=True)
|
||
|
|
||
|
# def _send(self, request: JSIRequest):
|
||
|
# unavailable_handlers = []
|
||
|
# exec_errors = []
|
||
|
# for handler in self._get_handlers(request):
|
||
|
# if not handler.is_available:
|
||
|
# unavailable_handlers.append(handler)
|
||
|
# continue
|
||
|
# try:
|
||
|
# return handler.handle(request)
|
||
|
# except Exception as e:
|
||
|
# exec_errors.append(e)
|
||
|
# if not request.fallback:
|
||
|
# raise
|
||
|
# raise EvaluationError
|
||
|
|
||
|
def _get_handler_method(method_name: str):
|
||
|
def handler(self: JSIDirector, *args, **kwargs):
|
||
|
unavailable: list[JSI] = []
|
||
|
exceptions: list[tuple[JSI, Exception]] = []
|
||
|
for handler in self._get_handlers(method_name, *args, **kwargs):
|
||
|
if not handler.is_available:
|
||
|
self.write_debug(f'{handler.JSI_NAME} is not available')
|
||
|
unavailable.append(handler)
|
||
|
continue
|
||
|
try:
|
||
|
self.write_debug(f'Dispatching `{method_name}` task to {handler.JSI_NAME}')
|
||
|
return getattr(handler, method_name)(*args, **kwargs)
|
||
|
except Exception as e:
|
||
|
if handler.JSI_KEY not in self._fallback_jsi:
|
||
|
raise
|
||
|
else:
|
||
|
exceptions.append((handler, e))
|
||
|
if not exceptions:
|
||
|
raise Exception(f'No available JSI installed, please install one of: {join_jsi_name(unavailable)}')
|
||
|
raise Exception(f'Failed to perform {method_name}, total {len(exceptions)} errors. Following JSI have been skipped and you can try installing one of them: {join_jsi_name(unavailable)}')
|
||
|
return handler
|
||
|
|
||
|
execute = _get_handler_method('execute')
|
||
|
evaluate = _get_handler_method('evaluate')
|
||
|
|
||
|
|
||
|
class JSI(abc.ABC):
|
||
|
_SUPPORTED_FEATURES: set[str] = set()
|
||
|
_BASE_PREFERENCE: int = 0
|
||
|
|
||
|
def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None):
|
||
|
self._downloader = downloader
|
||
|
self.timeout = float(timeout or DEFAULT_TIMEOUT)
|
||
|
|
||
|
@property
|
||
|
@abc.abstractmethod
|
||
|
def is_available(self) -> bool:
|
||
|
raise NotImplementedError
|
||
|
|
||
|
@classproperty
|
||
|
def JSI_NAME(cls) -> str:
|
||
|
return cls.__name__[:-3]
|
||
|
|
||
|
@classproperty
|
||
|
def JSI_KEY(cls) -> str:
|
||
|
assert cls.__name__.endswith('JSI'), 'JSI class names must end with "JSI"'
|
||
|
return cls.__name__[:-3]
|
||
|
|
||
|
|
||
|
def register_jsi(handler_cls: TYPE_JSI) -> TYPE_JSI:
|
||
|
"""Register a JS interpreter class"""
|
||
|
assert issubclass(handler_cls, JSI), f'{handler_cls} must be a subclass of JSI'
|
||
|
assert handler_cls.JSI_KEY not in _JSI_HANDLERS, f'JSI {handler_cls.JSI_KEY} already registered'
|
||
|
assert handler_cls._SUPPORTED_FEATURES.issubset(_ALL_FEATURES), f'{handler_cls._SUPPORTED_FEATURES - _ALL_FEATURES} is not declared in `_All_FEATURES`'
|
||
|
_JSI_HANDLERS[handler_cls.JSI_KEY] = handler_cls
|
||
|
return handler_cls
|
||
|
|
||
|
|
||
|
def register_jsi_preference(*handlers: type[JSI]):
|
||
|
assert all(issubclass(handler, JSI) for handler in handlers), f'{handlers} must all be a subclass of JSI'
|
||
|
|
||
|
def outer(pref_func: JSIPreference) -> JSIPreference:
|
||
|
def inner(handler: JSI, *args):
|
||
|
if not handlers or isinstance(handler, handlers):
|
||
|
return pref_func(handler, *args)
|
||
|
return 0
|
||
|
_JSI_PREFERENCES.add(inner)
|
||
|
return inner
|
||
|
return outer
|
||
|
|
||
|
|
||
|
@register_jsi_preference()
|
||
|
def _base_preference(handler: JSI, *args):
|
||
|
return getattr(handler, '_BASE_PREFERENCE', 0)
|
||
|
|
||
|
|
||
|
if typing.TYPE_CHECKING:
|
||
|
from ..YoutubeDL import YoutubeDL
|
||
|
JSIPreference = typing.Callable[[JSI, str, list, dict], int]
|
||
|
TYPE_JSI = typing.TypeVar('TYPE_JSI')
|