1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00
This commit is contained in:
c-basalt 2024-12-30 17:09:47 -05:00
parent 96c6cdeec4
commit fb474064ee
4 changed files with 83 additions and 84 deletions

View file

@ -0,0 +1,53 @@
#!/usr/bin/env python3
# Allow direct execution
import os
import sys
import unittest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test.helper import (
FakeYDL,
)
from yt_dlp.jsinterp.common import ExternalJSI
from yt_dlp.jsinterp._deno import DenoJSI, DenoJITlessJSI, DenoJSDomJSI
from yt_dlp.jsinterp._phantomjs import PhantomJSJSI
class Base:
class TestExternalJSI(unittest.TestCase):
_JSI_CLASS: type[ExternalJSI] = None
def setUp(self):
self.ydl = FakeYDL()
self.jsi = self._JSI_CLASS(self.ydl, 19, {})
if not self.jsi_available():
self.skipTest('Not available')
def jsi_available(self):
return self._JSI_CLASS and self._JSI_CLASS.exe_version
def test_execute(self):
self.assertEqual(self.jsi.execute('console.log("Hello, world!");'), 'Hello, world!')
class TestDeno(Base.TestExternalJSI):
_JSI_CLASS = DenoJSI
class TestDenoJITless(Base.TestExternalJSI):
_JSI_CLASS = DenoJITlessJSI
class TestDenoDom(Base.TestExternalJSI):
_JSI_CLASS = DenoJSDomJSI
class TestPhantomJS(Base.TestExternalJSI):
_JSI_CLASS = PhantomJSJSI
if __name__ == '__main__':
unittest.main()

View file

@ -2,6 +2,7 @@
import http.cookiejar import http.cookiejar
import json import json
import re
import subprocess import subprocess
import typing import typing
import urllib.parse import urllib.parse
@ -28,11 +29,23 @@ class DenoJSI(ExternalJSI):
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check'] _DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_INIT_SCRIPT = 'localStorage.clear(); delete window.Deno; global = window;\n' _INIT_SCRIPT = 'localStorage.clear(); delete window.Deno; global = window;\n'
def __init__(self, downloader: YoutubeDL, timeout=None, features: set[str] = {}, flags=[], replace_flags=False, init_script=None): def __init__(self, *args, flags=[], replace_flags=False, init_script=None, **kwargs):
super().__init__(downloader, timeout, features) super().__init__(*args, **kwargs)
self._flags = flags if replace_flags else [*self._DENO_FLAGS, *flags] self._flags = flags if replace_flags else [*self._DENO_FLAGS, *flags]
self._init_script = self._INIT_SCRIPT if init_script is None else init_script self._init_script = self._INIT_SCRIPT if init_script is None else init_script
@property
def _override_navigator_js(self):
return '\n'.join([
'Object.defineProperty(navigator, "%s", { value: %s, configurable: true });' % (k, json.dumps(v))
for k, v in {
'userAgent': self.user_agent,
'language': 'en-US',
'languages': ['en-US'],
'webdriver': False,
}.items()
])
def _run_deno(self, cmd): def _run_deno(self, cmd):
self.write_debug(f'Deno command line: {shell_quote(cmd)}') self.write_debug(f'Deno command line: {shell_quote(cmd)}')
try: try:
@ -49,7 +62,7 @@ def _run_deno(self, cmd):
def execute(self, jscode, video_id=None, note='Executing JS in Deno', location=None): def execute(self, jscode, video_id=None, note='Executing JS in Deno', location=None):
self.report_note(video_id, note) self.report_note(video_id, note)
location_args = ['--location', location] if location else [] location_args = ['--location', location] if location else []
with TempFileWrapper(f'{self._init_script};\n{jscode}', suffix='.js') as js_file: with TempFileWrapper(f'{self._init_script};\n{self._override_navigator_js}\n{jscode}', suffix='.js') as js_file:
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name] cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
return self._run_deno(cmd) return self._run_deno(cmd)
@ -128,14 +141,20 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno', location='
self.report_note(video_id, note) self.report_note(video_id, note)
self._ensure_jsdom() self._ensure_jsdom()
callback_varname = f'__callback_{random_string()}' callback_varname = f'__callback_{random_string()}'
inline_scripts = '\n'.join([
'try { %s } catch (e) {}' % script
for script in re.findall(r'<script[^>]*>(.+?)</script>', html, re.DOTALL)
])
script = f'''{self._init_script}; script = f'''{self._init_script};
{self._override_navigator_js};
import jsdom from "{self._JSDOM_URL}"; import jsdom from "{self._JSDOM_URL}";
const {callback_varname} = (() => {{ const {callback_varname} = (() => {{
const jar = jsdom.CookieJar.deserializeSync({json.dumps(self.serialize_cookie(cookiejar, location))}); const jar = jsdom.CookieJar.deserializeSync({json.dumps(self.serialize_cookie(cookiejar, location))});
const dom = new jsdom.JSDOM({json.dumps(str(html))}, {{ const dom = new jsdom.JSDOM({json.dumps(str(html))}, {{
{'url: %s,' % json.dumps(str(location)) if location else ''} {'url: %s,' % json.dumps(str(location)) if location else ''}
cookieJar: jar, cookieJar: jar,
runScripts: 'dangerously',
}}); }});
Object.keys(dom.window).forEach((key) => {{try {{window[key] = dom.window[key]}} catch (e) {{}}}}); Object.keys(dom.window).forEach((key) => {{try {{window[key] = dom.window[key]}} catch (e) {{}}}});
delete window.jsdom; delete window.jsdom;
@ -145,6 +164,9 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno', location='
return () => {{ origLog(JSON.stringify({{ return () => {{ origLog(JSON.stringify({{
stdout: stdout.join('\\n'), cookies: jar.serializeSync().cookies}})); }} stdout: stdout.join('\\n'), cookies: jar.serializeSync().cookies}})); }}
}})(); }})();
await (async () => {{
{inline_scripts}
}})();
await (async () => {{ await (async () => {{
{jscode} {jscode}
}})().finally({callback_varname}); }})().finally({callback_varname});
@ -158,83 +180,5 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno', location='
return data['stdout'] return data['stdout']
class PuppeteerJSI(ExternalJSI):
_PACKAGE_VERSION = '16.2.0'
_HEADLESS = False
_EXE_NAME = DenoJSI._EXE_NAME
@classproperty
def INSTALL_HINT(cls):
msg = f'Run "deno run -A https://deno.land/x/puppeteer@{cls._PACKAGE_VERSION}/install.ts" to install puppeteer'
if not DenoJSI.is_available:
msg = f'{DenoJSI.INSTALL_HINT}. Then {msg}'
return msg
@classproperty(cache=True)
def full_version(cls):
if not DenoJSI.is_available:
return
try:
browser_version = DenoJSI._execute(f'''
import puppeteer from "https://deno.land/x/puppeteer@{cls._PACKAGE_VERSION}/mod.ts";
const browser = await puppeteer.launch({{headless: {json.dumps(bool(cls._HEADLESS))}}});
try {{
console.log(await browser.version())
}} finally {{
await browser.close();
}}''', flags=['--allow-all'])
return f'puppeteer={cls._PACKAGE_VERSION} browser={browser_version}'
except ExtractorError:
return None
@classproperty
def exe_version(cls):
return DenoJSI.exe_version if cls.full_version else None
def __init__(self, downloader: YoutubeDL, timeout: float | int | None = None):
super().__init__(downloader, timeout)
self.deno = DenoJSI(downloader, timeout=(self.timeout + 30000))
def _deno_execute(self, jscode, note=None):
return self.deno.execute(f'''
import puppeteer from "https://deno.land/x/puppeteer@{self._PACKAGE_VERSION}/mod.ts";
const browser = await puppeteer.launch({{
headless: {json.dumps(bool(self._HEADLESS))}, args: ["--disable-web-security"]}});
try {{
{jscode}
}} finally {{
await browser.close();
}}''', note=note, flags=['--allow-all'], base_js='')
def execute(self, jscode, video_id=None, note='Executing JS in Puppeteer', url='about:blank'):
self.report_note(video_id, note)
return self._deno_execute(f'''
const page = await browser.newPage();
window.setTimeout(async () => {{
console.error('Puppeteer execution timed out');
await browser.close();
Deno.exit(1);
}}, {int(self.timeout)});
page.resourceTimeout = {int(self.timeout)};
// drop network requests
await page.setRequestInterception(true);
page.on("request", request => request.abort());
// capture console output
page.on("console", msg => {{
msg.type() === 'log' && console.log(msg.text());
msg.type() === 'error' && console.error(msg.text());
}});
const url = {json.dumps(str(url))};
await page.evaluate(`window.history.replaceState('', '', ${{JSON.stringify(url)}})`);
await page.evaluate({json.dumps(str(jscode))});
await browser.close();
Deno.exit(0);
''')
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from ..YoutubeDL import YoutubeDL
from ..cookies import YoutubeDLCookieJar from ..cookies import YoutubeDLCookieJar

View file

@ -173,12 +173,12 @@ def execute(self, jscode, video_id=None,
'std_var': f'__stdout__values_{random_string()}', 'std_var': f'__stdout__values_{random_string()}',
'jscode': jscode, 'jscode': jscode,
} }
return self._execute_html(jscode, location, html, cookiejar, video_id=video_id, note=note)[1] return self._execute_html(jscode, location, html, cookiejar, video_id=video_id, note=note)[1].strip()
if html: if html:
self.report_warning('`location` is required to use `html`') self.report_warning('`location` is required to use `html`')
if cookiejar: if cookiejar:
self.report_warning('`location` and `html` are required to use `cookiejar`') self.report_warning('`location` and `html` are required to use `cookiejar`')
return self._execute(jscode, video_id, note=note) return self._execute(jscode, video_id, note=note).strip()
class PhantomJSwrapper: class PhantomJSwrapper:

View file

@ -197,6 +197,8 @@ class JSI(abc.ABC):
_BASE_PREFERENCE: int = 0 _BASE_PREFERENCE: int = 0
def __init__(self, downloader: YoutubeDL, timeout: float | int, features: set[str], user_agent=None): def __init__(self, downloader: YoutubeDL, timeout: float | int, features: set[str], user_agent=None):
if not self._SUPPORT_FEATURES.issuperset(features):
raise ExtractorError(f'{self.JSI_NAME} does not support all required features: {features}')
self._downloader = downloader self._downloader = downloader
self.timeout = timeout self.timeout = timeout
self.features = features self.features = features