1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-05-20 17:45:45 -05:00

extract phantomjs

This commit is contained in:
c-basalt 2024-12-29 20:34:05 -05:00
parent 03a99eefae
commit 6d622d5481
2 changed files with 133 additions and 115 deletions

View file

@ -62,6 +62,7 @@ class JSInterp:
@param preferred_order: list of JSI to use. First in list is tested first.
@param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback
@param timeout: timeout parameter for all chosen JSI
@param user_agent: override user-agent to use for supported JSI
"""
def __init__(
@ -74,6 +75,7 @@ def __init__(
preferred_order: typing.Iterable[str | type[JSI]] = [],
fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [],
timeout: float | int = 10,
user_agent: str | None = None,
):
self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie
self._features = set(features)
@ -88,8 +90,8 @@ def __init__(
self.write_debug(f'Selected JSI classes for given features: {get_jsi_keys(handler_classes)}, '
f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}')
self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout=timeout, **jsi_params.get(cls.JSI_KEY, {}))
for cls in handler_classes}
self._handler_dict = {cls.JSI_KEY: cls(self._downloader, timeout=timeout, user_agent=user_agent,
**jsi_params.get(cls.JSI_KEY, {})) for cls in handler_classes}
self.preferences: set[JSIPreference] = {order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES
self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi)
self._is_test = self._downloader.params.get('test', False)
@ -187,9 +189,10 @@ class JSI(abc.ABC):
_SUPPORT_FEATURES: set[str] = set()
_BASE_PREFERENCE: int = 0
def __init__(self, downloader: YoutubeDL, timeout: float | int):
def __init__(self, downloader: YoutubeDL, timeout: float | int, user_agent=None):
self._downloader = downloader
self.timeout = timeout
self.user_agent: str = user_agent or self._downloader.params['http_headers']['User-Agent']
@abc.abstractmethod
def is_available(self) -> bool:

View file

@ -1,7 +1,6 @@
from __future__ import annotations
import abc
import collections
import contextlib
import json
import os
@ -99,6 +98,12 @@ def cleanup(self):
def __del__(self):
self.cleanup()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
class ExternalJSI(JSI, abc.ABC):
_EXE_NAME: str
@ -119,8 +124,9 @@ def is_available(cls):
@register_jsi
class DenoJSI(ExternalJSI):
"""JS interpreter class using Deno binary"""
_EXE_NAME = 'deno'
_SUPPORTED_FEATURES = {'js', 'wasm', 'location'}
_BASE_PREFERENCE = 5
_EXE_NAME = 'deno'
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_INIT_SCRIPT = 'localStorage.clear(); delete window.Deno; global = window;\n'
@ -144,16 +150,17 @@ def _run_deno(self, cmd):
def execute(self, jscode, video_id=None, note='Executing JS in Deno', location=None):
self.report_note(video_id, note)
js_file = TempFileWrapper(f'{self._init_script};\n{jscode}', suffix='.js')
location_args = ['--location', location] if location else []
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
return self._run_deno(cmd)
with TempFileWrapper(f'{self._init_script};\n{jscode}', suffix='.js') as js_file:
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
return self._run_deno(cmd)
@register_jsi
class DenoJITlessJSI(DenoJSI):
_EXE_NAME = DenoJSI._EXE_NAME
_SUPPORTED_FEATURES = {'js', 'location'}
_BASE_PREFERENCE = 6
_EXE_NAME = DenoJSI._EXE_NAME
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check', '--v8-flags=--jitless,--noexpose-wasm']
@classproperty
@ -163,8 +170,9 @@ def exe_version(cls):
class DenoJSDomJSI(DenoJSI):
_SUPPORTED_FEATURES = {'js', 'wasm', 'location', 'dom', 'cookies'}
_BASE_PREFERENCE = 4
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_JSDOM_IMPORT = False
_JSDOM_IMPORT_CHECKED = False
@staticmethod
def serialize_cookie(cookiejar: YoutubeDLCookieJar | None, url: str):
@ -210,12 +218,12 @@ def apply_cookies(cookiejar: YoutubeDLCookieJar | None, cookies: list[dict]):
False, None, None, {}))
def _ensure_jsdom(self):
if self._JSDOM_IMPORT:
if self._JSDOM_IMPORT_CHECKED:
return
js_file = TempFileWrapper('import jsdom from "https://cdn.esm.sh/jsdom"', suffix='.js')
cmd = [self.exe, 'run', js_file.name]
self._run_deno(cmd)
self._JSDOM_IMPORT = True
with TempFileWrapper('import jsdom from "https://cdn.esm.sh/jsdom"', suffix='.js') as js_file:
cmd = [self.exe, 'run', js_file.name]
self._run_deno(cmd)
self._JSDOM_IMPORT_CHECKED = True
def execute(self, jscode, video_id=None, note='Executing JS in Deno', location='', html='', cookiejar=None):
self.report_note(video_id, note)
@ -241,11 +249,10 @@ def execute(self, jscode, video_id=None, note='Executing JS in Deno', location='
}})().finally(callback);
'''
js_file = TempFileWrapper(script, suffix='.js')
location_args = ['--location', location] if location else []
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
data = json.loads(self._run_deno(cmd))
with TempFileWrapper(script, suffix='.js') as js_file:
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
data = json.loads(self._run_deno(cmd))
self.apply_cookies(cookiejar, data['cookies'])
return data['stdout']
@ -327,13 +334,11 @@ def execute(self, jscode, video_id=None, note='Executing JS in Puppeteer', url='
''')
class PhantomJSwrapper(ExternalJSI):
"""PhantomJS wrapper class
This class is experimental.
"""
@register_jsi
class PhantomJSJSI(ExternalJSI):
_EXE_NAME = 'phantomjs'
INSTALL_HINT = 'Please download PhantomJS from https://phantomjs.org/download.html'
_SUPPORTED_FEATURES = {'js', 'location', 'cookies'}
_BASE_PREFERENCE = 3
_BASE_JS = R'''
phantom.onError = function(msg, trace) {{
@ -355,11 +360,11 @@ class PhantomJSwrapper(ExternalJSI):
var fs = require('fs');
var read = {{ mode: 'r', charset: 'utf-8' }};
var write = {{ mode: 'w', charset: 'utf-8' }};
JSON.parse(fs.read("{cookies}", read)).forEach(function(x) {{
JSON.parse(fs.read({cookies_fn}, read)).forEach(function(x) {{
phantom.addCookie(x);
}});
page.settings.resourceTimeout = {timeout};
page.settings.userAgent = "{ua}";
page.settings.userAgent = {ua};
page.onLoadStarted = function() {{
page.evaluate(function() {{
delete window._phantom;
@ -367,13 +372,13 @@ class PhantomJSwrapper(ExternalJSI):
}});
}};
var saveAndExit = function() {{
fs.write("{html}", page.content, write);
fs.write("{cookies}", JSON.stringify(phantom.cookies), write);
fs.write({html_fn}, page.content, write);
fs.write({cookies_fn}, JSON.stringify(phantom.cookies), write);
phantom.exit();
}};
page.onLoadFinished = function(status) {{
if(page.url === "") {{
page.setContent(fs.read("{html}", read), "{url}");
page.setContent(fs.read({html_fn}, read), {url});
}}
else {{
{jscode}
@ -382,62 +387,105 @@ class PhantomJSwrapper(ExternalJSI):
page.open("");
'''
_TMP_FILE_NAMES = ['script', 'html', 'cookies']
@classmethod
def _version(cls):
return cls.exe_version
def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000):
self._TMP_FILES = {}
if not self.exe:
raise ExtractorError(f'PhantomJS not found, {self.INSTALL_HINT}', expected=True)
self.extractor = extractor
if required_version:
if is_outdated_version(self.exe_version, required_version):
self.extractor._downloader.report_warning(
'Your copy of PhantomJS is outdated, update it to version '
f'{required_version} or newer if you encounter any errors.')
for name in self._TMP_FILE_NAMES:
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.close()
self._TMP_FILES[name] = tmp
self.options = collections.ChainMap({
'timeout': timeout,
}, {
x: self._TMP_FILES[x].name.replace('\\', '\\\\').replace('"', '\\"')
for x in self._TMP_FILE_NAMES
})
def __del__(self):
for name in self._TMP_FILE_NAMES:
with contextlib.suppress(OSError, KeyError):
os.remove(self._TMP_FILES[name].name)
def _save_cookies(self, url):
cookies = cookie_jar_to_list(self.extractor.cookiejar)
def _save_cookies(self, url, cookiejar):
cookies = cookie_jar_to_list(cookiejar) if cookiejar else []
for cookie in cookies:
if 'path' not in cookie:
cookie['path'] = '/'
if 'domain' not in cookie:
cookie['domain'] = urllib.parse.urlparse(url).netloc
with open(self._TMP_FILES['cookies'].name, 'wb') as f:
f.write(json.dumps(cookies).encode())
return json.dumps(cookies)
def _load_cookies(self):
with open(self._TMP_FILES['cookies'].name, 'rb') as f:
cookies = json.loads(f.read().decode('utf-8'))
def _load_cookies(self, cookies_json: str, cookiejar):
if not cookiejar:
return
cookies = json.loads(cookies_json)
for cookie in cookies:
if cookie['httponly'] is True:
cookie['rest'] = {'httpOnly': None}
if 'expiry' in cookie:
cookie['expire_time'] = cookie['expiry']
self.extractor._set_cookie(**cookie)
cookiejar.set_cookie(http.cookiejar.Cookie(
0, cookie['name'], cookie['value'], cookie.get('port'), cookie.get('port') is not None,
cookie['domain'], True, cookie['domain'].startswith('.'),
cookie.get('path', '/'), True,
cookie.get('secure', False), cookie.get('expiry'),
cookie.get('discard', False), None, None,
{'httpOnly': None} if cookie.get('httponly') is True else {}
))
def _execute(self, jscode: str, video_id=None, *, note='Executing JS in PhantomJS'):
"""Execute JS and return stdout"""
if 'phantom.exit();' not in jscode:
jscode += ';\nphantom.exit();'
jscode = self._BASE_JS + jscode
self.report_note(video_id, note)
with TempFileWrapper(jscode, suffix='.js') as js_file:
cmd = [self.exe, '--ssl-protocol=any', js_file.name]
self.write_debug(f'PhantomJS command line: {shell_quote(cmd)}')
try:
stdout, stderr, returncode = Popen.run(
cmd, timeout=self.timeout, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
raise ExtractorError(f'{note} failed: Unable to run PhantomJS binary', cause=e)
if returncode:
raise ExtractorError(f'{note} failed with returncode {returncode}:\n{stderr.strip()}')
return stdout
def _execute_html(self, jscode: str, url: str, html: str, cookiejar, video_id=None, note='Executing JS on webpage'):
if 'saveAndExit();' not in jscode:
raise ExtractorError('`saveAndExit();` not found in `jscode`')
html_file = TempFileWrapper(html, suffix='.html')
cookie_file = TempFileWrapper(self._save_cookies(url, cookiejar), suffix='.json')
jscode = self._TEMPLATE.format(**{
'url': json.dumps(str(url)),
'ua': json.dumps(str(self.user_agent)),
'jscode': jscode,
'html_fn': json.dumps(html_file.name),
'cookies_fn': json.dumps(cookie_file.name),
'timeout': int(self.timeout * 1000),
})
stdout = self._execute(jscode, video_id, note=note)
self._load_cookies(cookie_file.read(), cookiejar)
new_html = html_file.read()
return new_html, stdout
def execute(self, jscode, video_id=None,
note='Executing JS in PhantomJS', location=None, html='', cookiejar=None):
if location:
return self._execute_html(jscode, location, html, cookiejar, video_id=video_id, note=note)[1]
if html:
self.report_warning('`location` is required to use `html`')
if cookiejar:
self.report_warning('`location` and `html` are required to use `cookiejar`')
return self._execute(jscode, video_id, note=note)
class PhantomJSwrapper:
"""PhantomJS wrapper class
This class is experimental.
"""
INSTALL_HINT = 'Please download PhantomJS from https://phantomjs.org/download.html'
@classmethod
def _version(cls):
return PhantomJSJSI.exe_version
def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000):
self._jsi = PhantomJSJSI(extractor._downloader, timeout=timeout / 1000)
if not self._jsi.is_available():
raise ExtractorError(f'PhantomJS not found, {self.INSTALL_HINT}', expected=True)
self.extractor = extractor
if required_version:
if is_outdated_version(self._jsi.exe_version, required_version):
self._jsi.report_warning(
'Your copy of PhantomJS is outdated, update it to version '
f'{required_version} or newer if you encounter any errors.')
def get(self, url, html=None, video_id=None, note=None, note2='Executing JS on webpage', headers={}, jscode='saveAndExit();'):
"""
@ -479,47 +527,14 @@ def get(self, url, html=None, video_id=None, note=None, note2='Executing JS on w
raise ExtractorError('`saveAndExit();` not found in `jscode`')
if not html:
html = self.extractor._download_webpage(url, video_id, note=note, headers=headers)
with open(self._TMP_FILES['html'].name, 'wb') as f:
f.write(html.encode())
self._save_cookies(url)
self._jsi.user_agent = headers.get('User-Agent') or self.extractor.get_param('http_headers')['User-Agent']
user_agent = headers.get('User-Agent') or self.extractor.get_param('http_headers')['User-Agent']
jscode = self._TEMPLATE.format_map(self.options.new_child({
'url': url,
'ua': user_agent.replace('"', '\\"'),
'jscode': jscode,
}))
stdout = self.execute(jscode, video_id, note=note2)
with open(self._TMP_FILES['html'].name, 'rb') as f:
html = f.read().decode('utf-8')
self._load_cookies()
return html, stdout
return self._jsi._execute_html(jscode, url, html, self.extractor.cookiejar, video_id=video_id, note=note2)
def execute(self, jscode, video_id=None, *, note='Executing JS in PhantomJS'):
"""Execute JS and return stdout"""
if 'phantom.exit();' not in jscode:
jscode += ';\nphantom.exit();'
jscode = self._BASE_JS + jscode
with open(self._TMP_FILES['script'].name, 'w', encoding='utf-8') as f:
f.write(jscode)
self.extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}')
cmd = [self.exe, '--ssl-protocol=any', self._TMP_FILES['script'].name]
self.extractor.write_debug(f'PhantomJS command line: {shell_quote(cmd)}')
try:
stdout, stderr, returncode = Popen.run(cmd, timeout=self.options['timeout'] / 1000,
text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
raise ExtractorError(f'{note} failed: Unable to run PhantomJS binary', cause=e)
if returncode:
raise ExtractorError(f'{note} failed with returncode {returncode}:\n{stderr.strip()}')
return stdout
return self._jsi.execute(jscode, video_id=video_id, note=note)
if typing.TYPE_CHECKING: