1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00
This commit is contained in:
c-basalt 2024-12-29 02:56:58 -05:00
parent 65e238c45d
commit fdd98ba6e0
3 changed files with 101 additions and 71 deletions

View file

@ -4,7 +4,7 @@
import uuid import uuid
from .common import InfoExtractor from .common import InfoExtractor
from ..jsinterp import DenoJSI, PhantomJSwrapper from ..jsinterp import PhantomJSwrapper
from ..utils import ( from ..utils import (
ExtractorError, ExtractorError,
UserNotLive, UserNotLive,
@ -43,14 +43,9 @@ def _calc_sign(self, sign_func, video_id, a):
b = uuid.uuid4().hex b = uuid.uuid4().hex
c = round(time.time()) c = round(time.time())
js_script = f'{self._get_cryptojs_md5(video_id)};{sign_func};console.log(ub98484234("{a}","{b}","{c}"))' js_script = f'{self._get_cryptojs_md5(video_id)};{sign_func};console.log(ub98484234("{a}","{b}","{c}"))'
if DenoJSI.is_available: phantom = PhantomJSwrapper(self)
jsi = DenoJSI(self) result = phantom.execute(js_script, video_id,
elif PhantomJSwrapper.is_available: note='Executing JS signing script').strip()
jsi = PhantomJSwrapper(self)
else:
raise ExtractorError('You need to install either Deno or PhantomJS. '
f'{DenoJSI.INSTALL_HINT}. {PhantomJSwrapper.INSTALL_HINT}', expected=True)
result = jsi.execute(js_script, video_id, note='Executing JS signing script').strip()
return {i: v[0] for i, v in urllib.parse.parse_qs(result).items()} return {i: v[0] for i, v in urllib.parse.parse_qs(result).items()}
def _search_js_sign_func(self, webpage, fatal=True): def _search_js_sign_func(self, webpage, fatal=True):

View file

@ -4,16 +4,16 @@
import typing import typing
import functools import functools
from ..utils import classproperty, variadic, ExtractorError from ..utils import classproperty, format_field, variadic, ExtractorError
from ..extractor.common import InfoExtractor from ..extractor.common import InfoExtractor
DEFAULT_TIMEOUT = 10000
_JSI_HANDLERS: dict[str, type[JSI]] = {} _JSI_HANDLERS: dict[str, type[JSI]] = {}
_JSI_PREFERENCES: set[JSIPreference] = set() _JSI_PREFERENCES: set[JSIPreference] = set()
_ALL_FEATURES = { _ALL_FEATURES = {
'js', 'js',
'wasm', 'wasm',
'location',
'dom', 'dom',
} }
@ -60,8 +60,9 @@ class JSInterp:
@param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key. @param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key.
@param preferred_order: list of JSI to use. First in list is tested first. @param preferred_order: list of JSI to use. First in list is tested first.
@param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback @param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback
@param timeout: explicit timeout parameter in miliseconds for all chosen JSI @param timeout: timeout parameter for all chosen JSI
""" """
def __init__( def __init__(
self, self,
dl_or_ie: YoutubeDL | InfoExtractor, dl_or_ie: YoutubeDL | InfoExtractor,
@ -71,7 +72,7 @@ def __init__(
jsi_params: dict[str, dict] = {}, jsi_params: dict[str, dict] = {},
preferred_order: typing.Iterable[str | type[JSI]] = [], preferred_order: typing.Iterable[str | type[JSI]] = [],
fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [], fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [],
timeout: float | None = None, timeout: float | int = 10,
): ):
self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie
self._features = set(features) self._features = set(features)
@ -86,7 +87,7 @@ def __init__(
self.write_debug(f'Selected JSI classes for given features: {get_jsi_keys(handler_classes)}, ' self.write_debug(f'Selected JSI classes for given features: {get_jsi_keys(handler_classes)}, '
f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}') f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}')
self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout, **jsi_params.get(cls.JSI_KEY, {})) self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout=timeout, **jsi_params.get(cls.JSI_KEY, {}))
for cls in handler_classes} for cls in handler_classes}
self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES
self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi) self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi)
@ -166,40 +167,56 @@ def _dispatch_request(self, method_name: str, *args, **kwargs):
msg = f'{msg}. You can try installing one of unavailable JSI: {join_jsi_name(unavailable)}' msg = f'{msg}. You can try installing one of unavailable JSI: {join_jsi_name(unavailable)}'
raise ExtractorError(msg) raise ExtractorError(msg)
@require_features({'html': 'dom'}) @require_features({'url': 'location', 'html': 'dom'})
def execute(self, jscode: str, url: str | None = None, html: str | None = None) -> str: def execute(self, jscode: str, video_id: str | None, **kwargs) -> str:
""" """
Execute JS code and return stdout from console.log Execute JS code and return stdout from console.log
`html` requires `dom` feature
"""
return self._dispatch_request('execute', jscode, url=url, html=html)
@require_features({'html': 'dom'}) @param {str} jscode: JS code to execute
def evaluate(self, jscode: str, url: str | None = None, html: str | None = None) -> typing.Any: @param video_id: video id
@param note: note
@param {str} url: url to set location to, requires `location` feature
@param {str} html: html to load as document, requires `dom` feature
"""
return self._dispatch_request('execute', jscode, video_id, **kwargs)
@require_features({'url': 'location', 'html': 'dom'})
def evaluate(self, jscode: str, video_id: str | None, **kwargs) -> typing.Any:
""" """
Evaluate JS code and return result Evaluate JS code and return result
`html` requires `dom` feature
@param {str} jscode: JS code to execute
@param video_id: video id
@param note: note
@param {str} url: url to set location to, requires `location` feature
@param {str} html: html to load as document, requires `dom` feature
""" """
return self._dispatch_request('evaluate', jscode, url=url, html=html) return self._dispatch_request('evaluate', jscode, video_id, **kwargs)
class JSI(abc.ABC): class JSI(abc.ABC):
_SUPPORT_FEATURES: set[str] = set() _SUPPORT_FEATURES: set[str] = set()
_BASE_PREFERENCE: int = 0 _BASE_PREFERENCE: int = 0
def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None): def __init__(self, downloader: YoutubeDL, timeout: float | int):
self._downloader = downloader self._downloader = downloader
self.timeout = float(timeout or DEFAULT_TIMEOUT) self.timeout = timeout
@abc.abstractmethod @abc.abstractmethod
def is_available(self) -> bool: def is_available(self) -> bool:
raise NotImplementedError raise NotImplementedError
def write_debug(self, message, only_once=False): def write_debug(self, message, *args, **kwargs):
return self._downloader.write_debug(f'[{self.JSI_KEY}] {message}', only_once=only_once) self._downloader.write_debug(f'[{self.JSI_KEY}] {message}', *args, **kwargs)
def report_warning(self, message, only_once=False): def report_warning(self, message, *args, **kwargs):
return self._downloader.report_warning(f'[{self.JSI_KEY}] {message}', only_once=only_once) self._downloader.report_warning(f'[{self.JSI_KEY}] {message}', *args, **kwargs)
def to_screen(self, msg, *args, **kwargs):
self._downloader.to_screen(f'[{self.JSI_KEY}] {msg}', *args, **kwargs)
def report_note(self, video_id, note):
self.to_screen(f'{format_field(video_id, None, "%s: ")}{note}')
@classproperty @classproperty
def JSI_NAME(cls) -> str: def JSI_NAME(cls) -> str:

View file

@ -98,21 +98,17 @@ def __del__(self):
class ExternalJSI(JSI, abc.ABC): class ExternalJSI(JSI, abc.ABC):
_EXE_NAME: str = None _EXE_NAME: str
@classproperty(cache=True) @classproperty(cache=True)
def version(cls): def exe_version(cls):
return get_exe_version(cls._EXE_NAME, args=getattr(cls, 'V_ARGS', ['--version']), version_re=r'([0-9.]+)') return get_exe_version(cls._EXE_NAME, args=getattr(cls, 'V_ARGS', ['--version']), version_re=r'([0-9.]+)')
@classproperty
def full_version(cls):
return cls.version
@classproperty @classproperty
def exe(cls): def exe(cls):
return cls._EXE_NAME if cls.version else None return cls._EXE_NAME if cls.exe_version else None
@classproperty @classmethod
def is_available(cls): def is_available(cls):
return bool(cls.exe) return bool(cls.exe)
@ -121,55 +117,78 @@ def is_available(cls):
class DenoJSI(ExternalJSI): class DenoJSI(ExternalJSI):
"""JS interpreter class using Deno binary""" """JS interpreter class using Deno binary"""
_EXE_NAME = 'deno' _EXE_NAME = 'deno'
INSTALL_HINT = 'Please install Deno from https://docs.deno.com/runtime/manual/getting_started/installation/ or download binary from https://github.com/denoland/deno/releases' _SUPPORTED_FEATURES = {'js', 'wasm', 'location'}
_SUPPORTED_FEATURES = {'js', 'wasm'} _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_INIT_SCRIPT = 'localStorage.clear(); delete window.Deno; global = window;\n'
def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None, required_version=None): def __init__(self, downloader: YoutubeDL, timeout=None, flags=[], replace_flags=False, init_script=None):
super().__init__(downloader, timeout) super().__init__(downloader, timeout)
self._flags = flags if replace_flags else [*self._DENO_FLAGS, *flags]
self._init_script = self._INIT_SCRIPT if init_script is None else init_script
@classmethod def _run_deno(self, cmd, video_id=None):
def _execute(cls, jscode, downloader: YoutubeDL | None = None, video_id=None, note='', flags=[], timeout=10000): self.write_debug(f'Deno command line: {shell_quote(cmd)}')
js_file = TempFileWrapper(jscode, suffix='.js')
if note and downloader:
downloader.to_screen(f'{format_field(video_id, None, "%s: ")}{note}')
cmd = [cls.exe, 'run', *flags, js_file.name]
try: try:
stdout, stderr, returncode = Popen.run( stdout, stderr, returncode = Popen.run(
cmd, timeout=timeout / 1000, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) cmd, timeout=self.timeout, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e: except Exception as e:
raise ExtractorError('Unable to run Deno binary', cause=e) raise ExtractorError('Unable to run Deno binary', cause=e)
if returncode: if returncode:
raise ExtractorError(f'Failed with returncode {returncode}:\n{stderr}') raise ExtractorError(f'Failed with returncode {returncode}:\n{stderr}')
elif stderr and downloader: elif stderr:
downloader.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id) self.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id)
return stdout.strip() return stdout.strip()
def execute(self, jscode, video_id=None, note='Executing JS in Deno', flags=[], base_js=None): def execute(self, jscode, video_id=None, note='Executing JS in Deno', url=None):
"""Execute JS directly in Deno runtime and return stdout""" self.report_note(video_id, note)
js_file = TempFileWrapper(f'{self._init_script};\n{jscode}', suffix='.js')
base_js = 'delete window.Deno; global = window;\n' if base_js is None else base_js location_args = ['--location', url] if url else []
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
return self._execute(base_js + jscode, downloader=self._downloader, video_id=video_id, note=note, return self._run_deno(cmd, video_id=video_id)
flags=flags, timeout=self.timeout)
@register_jsi @register_jsi
class DenoJITlessJSI(DenoJSI): class DenoJITlessJSI(DenoJSI):
_EXE_NAME = DenoJSI._EXE_NAME _EXE_NAME = DenoJSI._EXE_NAME
INSTALL_HINT = DenoJSI.INSTALL_HINT _SUPPORTED_FEATURES = {'js', 'location'}
_SUPPORTED_FEATURES = {'js'} _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check', '--v8-flags=--jitless,--noexpose-wasm']
@classproperty @classproperty
def version(cls): def exe_version(cls):
return DenoJSI.version return DenoJSI.exe_version
def execute(self, jscode, video_id=None, note='Executing JS in Deno', flags=[], base_js=None):
# JIT-less mode does not support Wasm class DenoJSDomJSI(DenoJSI):
return super().execute(jscode, video_id, note=note, _SUPPORTED_FEATURES = {'js', 'wasm', 'dom'}
flags=[*flags, '--v8-flags=--jitless,--noexpose-wasm'], base_js=base_js) _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_JSDOM_IMPORT = False
def _ensure_jsdom(self):
if self._JSDOM_IMPORT:
return
js_file = TempFileWrapper('import { JSDOM } from "https://cdn.esm.sh/jsdom"', suffix='.js')
cmd = [self.exe, 'run', js_file.name]
self._run_deno(cmd)
self._JSDOM_IMPORT = True
def execute(self, jscode, video_id=None, note='Executing JS in Deno', url=None, html=None):
self.report_note(video_id, note)
if html:
self._ensure_jsdom()
init_script = '''%s;
import { JSDOM } from "https://cdn.esm.sh/jsdom";
const dom = new JSDOM(%s);
Object.keys(dom.window).forEach((key) => {try {window[key] = dom.window[key]} catch (e) {}});
''' % (self._init_script, json.dumps(html))
else:
init_script = self._init_script
js_file = TempFileWrapper(f'{init_script};\n{jscode}', suffix='.js')
location_args = ['--location', url] if url else []
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
return self._run_deno(cmd, video_id=video_id)
@register_jsi
class PuppeteerJSI(ExternalJSI): class PuppeteerJSI(ExternalJSI):
_PACKAGE_VERSION = '16.2.0' _PACKAGE_VERSION = '16.2.0'
_HEADLESS = False _HEADLESS = False
@ -200,8 +219,8 @@ def full_version(cls):
return None return None
@classproperty @classproperty
def version(cls): def exe_version(cls):
return DenoJSI.version if cls.full_version else None return DenoJSI.exe_version if cls.full_version else None
def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None): def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None):
super().__init__(downloader, timeout) super().__init__(downloader, timeout)
@ -306,7 +325,7 @@ class PhantomJSwrapper(ExternalJSI):
@classmethod @classmethod
def _version(cls): def _version(cls):
return cls.version return cls.exe_version
def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000): def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000):
self._TMP_FILES = {} self._TMP_FILES = {}
@ -317,7 +336,7 @@ def __init__(self, extractor: InfoExtractor, required_version=None, timeout=1000
self.extractor = extractor self.extractor = extractor
if required_version: if required_version:
if is_outdated_version(self.version, required_version): if is_outdated_version(self.exe_version, required_version):
self.extractor._downloader.report_warning( self.extractor._downloader.report_warning(
'Your copy of PhantomJS is outdated, update it to version ' 'Your copy of PhantomJS is outdated, update it to version '
f'{required_version} or newer if you encounter any errors.') f'{required_version} or newer if you encounter any errors.')
@ -444,5 +463,4 @@ def execute(self, jscode, video_id=None, *, note='Executing JS in PhantomJS'):
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from ..YoutubeDL import YoutubeDL from ..YoutubeDL import YoutubeDL
# from .common import JSIRequest, JSIResponse
from ..extractor.common import InfoExtractor from ..extractor.common import InfoExtractor