1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00
This commit is contained in:
c-basalt 2025-03-07 22:15:20 +01:00 committed by GitHub
commit e61d717560
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 1461 additions and 272 deletions

View file

@ -212,7 +212,7 @@ ### Metadata
### Misc ### Misc
* [**pycryptodomex**](https://github.com/Legrandin/pycryptodome)\* - For decrypting AES-128 HLS streams and various other data. Licensed under [BSD-2-Clause](https://github.com/Legrandin/pycryptodome/blob/master/LICENSE.rst) * [**pycryptodomex**](https://github.com/Legrandin/pycryptodome)\* - For decrypting AES-128 HLS streams and various other data. Licensed under [BSD-2-Clause](https://github.com/Legrandin/pycryptodome/blob/master/LICENSE.rst)
* [**phantomjs**](https://github.com/ariya/phantomjs) - Used in extractors where javascript needs to be run. Licensed under [BSD-3-Clause](https://github.com/ariya/phantomjs/blob/master/LICENSE.BSD) * [**phantomjs**](https://github.com/ariya/phantomjs), [**deno**](https://github.com/denoland/deno/) - Used in extractors where javascript needs to be run. Licensed under [BSD-3-Clause](https://github.com/ariya/phantomjs/blob/master/LICENSE.BSD) and [MIT](https://github.com/xattr/xattr/blob/master/LICENSE.txt) respectively
* [**secretstorage**](https://github.com/mitya57/secretstorage)\* - For `--cookies-from-browser` to access the **Gnome** keyring while decrypting cookies of **Chromium**-based browsers on **Linux**. Licensed under [BSD-3-Clause](https://github.com/mitya57/secretstorage/blob/master/LICENSE) * [**secretstorage**](https://github.com/mitya57/secretstorage)\* - For `--cookies-from-browser` to access the **Gnome** keyring while decrypting cookies of **Chromium**-based browsers on **Linux**. Licensed under [BSD-3-Clause](https://github.com/mitya57/secretstorage/blob/master/LICENSE)
* Any external downloader that you want to use with `--downloader` * Any external downloader that you want to use with `--downloader`
@ -791,6 +791,9 @@ ## Workarounds:
be used along with --min-sleep-interval be used along with --min-sleep-interval
--sleep-subtitles SECONDS Number of seconds to sleep before each --sleep-subtitles SECONDS Number of seconds to sleep before each
subtitle download subtitle download
--jsi-preference JSI Preferred JS interpreters to use during
extraction. Can be given as comma-separated
values
## Video Format Options: ## Video Format Options:
-f, --format FORMAT Video format code, see "FORMAT SELECTION" -f, --format FORMAT Video format code, see "FORMAT SELECTION"

View file

@ -25,12 +25,14 @@
import yt_dlp.YoutubeDL # isort: split import yt_dlp.YoutubeDL # isort: split
from yt_dlp.extractor import get_info_extractor from yt_dlp.extractor import get_info_extractor
from yt_dlp.jsinterp.common import filter_jsi_keys
from yt_dlp.networking.exceptions import HTTPError, TransportError from yt_dlp.networking.exceptions import HTTPError, TransportError
from yt_dlp.utils import ( from yt_dlp.utils import (
DownloadError, DownloadError,
ExtractorError, ExtractorError,
UnavailableVideoError, UnavailableVideoError,
YoutubeDLError, YoutubeDLError,
filter_dict,
format_bytes, format_bytes,
join_nonempty, join_nonempty,
) )
@ -82,6 +84,28 @@ def __str__(self):
# Dynamically generate tests # Dynamically generate tests
def generator(test_case, tname): def generator(test_case, tname):
# setting `jsi_matrix` to True, `jsi_matrix_features` to list, or
# setting `jsi_matrix_only_include` or `jsi_matrix_exclude` to non-empty list
# to trigger matrix behavior for JSI
if isinstance(test_case.get('jsi_matrix_features'), list) or any(test_case.get(key) for key in [
'jsi_matrix', 'jsi_matrix_only_include', 'jsi_matrix_exclude',
]):
jsi_keys = filter_jsi_keys(
test_case.get('jsi_matrix_features'), test_case.get('jsi_matrix_only_include'),
test_case.get('jsi_matrix_exclude'))
def generate_jsi_sub_case(jsi_key):
sub_case = filter_dict(test_case, lambda k, _: not k.startswith('jsi_matrix'))
sub_case['params'] = {**test_case.get('params', {}), 'jsi_preference': [jsi_key]}
return generator(sub_case, f'{tname}_{jsi_key}')
def run_sub_cases(self):
for i, jsi_key in enumerate(jsi_keys):
print(f'Running case {tname} using JSI: {jsi_key} ({i + 1}/{len(jsi_keys)})')
generate_jsi_sub_case(jsi_key)(self)
return run_sub_cases
def test_template(self): def test_template(self):
if self.COMPLETED_TESTS.get(tname): if self.COMPLETED_TESTS.get(tname):
return return

223
test/test_jsi_external.py Normal file
View file

@ -0,0 +1,223 @@
#!/usr/bin/env python3
from __future__ import annotations
import os
import dataclasses
import datetime
import time
import sys
import unittest
import http.cookiejar
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from test.helper import (
FakeYDL,
)
from yt_dlp.utils import (
variadic,
)
from yt_dlp.cookies import YoutubeDLCookieJar
from yt_dlp.jsinterp import _JSI_HANDLERS
assert set(_JSI_HANDLERS) == {'Deno', 'DenoJSDom', 'PhantomJS'}
from yt_dlp.jsinterp.common import ExternalJSI, _ALL_FEATURES
from yt_dlp.jsinterp._deno import DenoJSI, DenoJSDomJSI
from yt_dlp.jsinterp._phantomjs import PhantomJSJSI
from yt_dlp.jsinterp._helper import prepare_wasm_jsmodule
@dataclasses.dataclass
class NetscapeFields:
name: str
value: str
domain: str
path: str
secure: bool
expires: int | None
def to_cookie(self):
return http.cookiejar.Cookie(
0, self.name, self.value,
None, False,
self.domain, True, self.domain.startswith('.'),
self.path, True,
self.secure, self.expires, False,
None, None, {},
)
def expire_str(self):
return datetime.datetime.fromtimestamp(
self.expires, datetime.timezone.utc).strftime('%a, %d %b %Y %H:%M:%S GMT')
def __eq__(self, other: NetscapeFields | http.cookiejar.Cookie):
return all(getattr(self, attr) == getattr(other, attr) for attr in ['name', 'value', 'domain', 'path', 'secure', 'expires'])
covered_features = set()
def requires_feature(features):
covered_features.update(variadic(features))
def outer(func):
def wrapper(self, *args, **kwargs):
if not self.jsi._SUPPORTED_FEATURES.issuperset(variadic(features)):
print(f'{self._JSI_CLASS.__name__} does not support {features!r}, skipping')
self.skipTest(f'{"&".join(variadic(features))} not supported')
return func(self, *args, **kwargs)
return wrapper
return outer
class Base:
class TestExternalJSI(unittest.TestCase):
_JSI_CLASS: type[ExternalJSI] = None
_TESTDATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'testdata', 'jsi_external')
maxDiff = 2000
def setUp(self):
print()
self.ydl = FakeYDL()
self.url_param = ''
if not self._JSI_CLASS.exe_version:
print(f'{self._JSI_CLASS.__name__} is not installed, skipping')
self.skipTest('Not available')
@property
def jsi(self):
return self._JSI_CLASS(self.ydl, self.url_param, 10, {})
def test_execute(self):
self.assertEqual(self.jsi.execute('console.log("Hello, world!");'), 'Hello, world!')
def test_user_agent(self):
ua = self.ydl.params['http_headers']['User-Agent']
self.assertEqual(self.jsi.execute('console.log(navigator.userAgent);'), ua)
self.assertNotEqual(self.jsi.execute('console.log(JSON.stringify(navigator.webdriver));'), 'true')
jsi = self._JSI_CLASS(self.ydl, self.url_param, 10, {}, user_agent='test/ua')
self.assertEqual(jsi.execute('console.log(navigator.userAgent);'), 'test/ua')
@requires_feature('location')
def test_location(self):
self.url_param = 'https://example.com/123/456'
self.assertEqual(self.jsi.execute('console.log(JSON.stringify([location.href, location.hostname]));'),
'["https://example.com/123/456","example.com"]')
@requires_feature('dom')
def test_execute_dom_parse(self):
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='<html><body><div id="test-div">Hello, world!</div></body></html>'),
'Hello, world!')
@requires_feature('dom')
def test_execute_dom_script(self):
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='''<html><head><title>Hello, world!</title><body>
<div id="test-div"></div>
<script src="https://example.com/script.js"></script>
<script type="text/javascript">
document.getElementById("test-div").innerHTML = document.title;
console.log('this should not show up');
a = b; // Errors should be ignored
</script>
</body></html>'''),
'Hello, world!')
@requires_feature(['dom', 'location'])
def test_dom_location(self):
self.url_param = 'https://example.com/123/456'
self.assertEqual(self.jsi.execute(
'console.log(document.getElementById("test-div").innerHTML);',
html='''<html><head><script>
document.querySelector("#test-div").innerHTML = document.domain</script></head>
<body><div id="test-div">Hello, world!</div></body></html>'''),
'example.com')
@requires_feature('cookies')
def test_execute_cookiejar(self):
cookiejar = YoutubeDLCookieJar()
ref_cookiejar = YoutubeDLCookieJar()
def _assert_expected_execute(cookie_str, ref_cookie_str):
self.assertEqual(set(cookie_str.split('; ')), set(ref_cookie_str.split('; ')))
for cookie in cookiejar:
ref_cookie = next((c for c in ref_cookiejar if c.name == cookie.name
and c.domain == cookie.domain), None)
self.assertEqual(repr(cookie), repr(ref_cookie))
for test_cookie in [
NetscapeFields('test1', 'test1', '.example.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test2', 'test2', '.example.com', '/', True, int(time.time()) + 1000),
NetscapeFields('test3', 'test3', '.example.com', '/123', False, int(time.time()) + 1000),
NetscapeFields('test4', 'test4', '.example.com', '/456', False, int(time.time()) + 1000),
NetscapeFields('test5', 'test5', '.example.com', '/123', True, int(time.time()) + 1000),
NetscapeFields('test6', 'test6', '.example.com', '/456', True, int(time.time()) + 1000),
NetscapeFields('test1', 'other1', '.other.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test2', 'other2', '.other.com', '/', False, int(time.time()) + 1000),
NetscapeFields('test7', 'other7', '.other.com', '/', False, int(time.time()) + 1000),
]:
cookiejar.set_cookie(test_cookie.to_cookie())
ref_cookiejar.set_cookie(test_cookie.to_cookie())
# test identity without modification from js
self.url_param = 'http://example.com/123/456'
_assert_expected_execute(self.jsi.execute(
'console.log(document.cookie);', cookiejar=cookiejar),
'test1=test1; test3=test3')
# test modification of existing cookie from js
new_cookie_1 = NetscapeFields('test1', 'new1', '.example.com', '/', True, int(time.time()) + 900)
new_cookie_2 = NetscapeFields('test2', 'new2', '.example.com', '/', True, int(time.time()) + 900)
ref_cookiejar.set_cookie(new_cookie_1.to_cookie())
ref_cookiejar.set_cookie(new_cookie_2.to_cookie())
self.url_param = 'https://example.com/123/456'
_assert_expected_execute(self.jsi.execute(
f'''document.cookie = "test1=new1; secure; expires={new_cookie_1.expire_str()}; domain=.example.com; path=/";
console.log(document.cookie);''',
html=f'''<html><body><div id="test-div">Hello, world!</div>
<script>
document.cookie = "test2=new2; secure; expires={new_cookie_2.expire_str()}; domain=.example.com; path=/";
</script>
</body></html>''',
cookiejar=cookiejar),
'test1=new1; test2=new2; test3=test3; test5=test5')
@requires_feature('wasm')
def test_wasm(self):
with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm.js')) as f:
js_mod = f.read()
with open(os.path.join(self._TESTDATA_DIR, 'hello_wasm_bg.wasm'), 'rb') as f:
wasm = f.read()
js_base = prepare_wasm_jsmodule(js_mod, wasm)
js_code = js_base + ''';
console.log(add(1, 2));
greet('world');
'''
self.assertEqual(self.jsi.execute(js_code), '3\nHello, world!')
class TestDeno(Base.TestExternalJSI):
_JSI_CLASS = DenoJSI
class TestDenoDom(Base.TestExternalJSI):
_JSI_CLASS = DenoJSDomJSI
class TestPhantomJS(Base.TestExternalJSI):
_JSI_CLASS = PhantomJSJSI
expect_covered_features = set(_ALL_FEATURES)
assert covered_features.issuperset(expect_covered_features), f'Missing tests for features: {expect_covered_features - covered_features}'
if __name__ == '__main__':
unittest.main()

View file

@ -9,7 +9,7 @@
import math import math
from yt_dlp.jsinterp import JS_Undefined, JSInterpreter, js_number_to_string from yt_dlp.jsinterp.native import JS_Undefined, JSInterpreter, js_number_to_string
class NaN: class NaN:

234
test/testdata/jsi_external/hello_wasm.js vendored Normal file
View file

@ -0,0 +1,234 @@
// wasm-pack build --target web
/* lib.rs
use wasm_bindgen::prelude::*;
#[wasm_bindgen]
extern "C" {
pub fn eval(s: &str);
}
#[wasm_bindgen]
pub fn greet(name: &str) {
eval(&format!("console.log('Hello, {}!')", name));
}
#[wasm_bindgen]
pub fn add(left: i32, right: i32) -> i32 {
left + right
}
*/
let wasm;
const cachedTextDecoder = (typeof TextDecoder !== 'undefined' ? new TextDecoder('utf-8', { ignoreBOM: true, fatal: true }) : { decode: () => { throw Error('TextDecoder not available') } } );
if (typeof TextDecoder !== 'undefined') { cachedTextDecoder.decode(); };
let cachedUint8ArrayMemory0 = null;
function getUint8ArrayMemory0() {
if (cachedUint8ArrayMemory0 === null || cachedUint8ArrayMemory0.byteLength === 0) {
cachedUint8ArrayMemory0 = new Uint8Array(wasm.memory.buffer);
}
return cachedUint8ArrayMemory0;
}
function getStringFromWasm0(ptr, len) {
ptr = ptr >>> 0;
return cachedTextDecoder.decode(getUint8ArrayMemory0().subarray(ptr, ptr + len));
}
let WASM_VECTOR_LEN = 0;
const cachedTextEncoder = (typeof TextEncoder !== 'undefined' ? new TextEncoder('utf-8') : { encode: () => { throw Error('TextEncoder not available') } } );
const encodeString = (typeof cachedTextEncoder.encodeInto === 'function'
? function (arg, view) {
return cachedTextEncoder.encodeInto(arg, view);
}
: function (arg, view) {
const buf = cachedTextEncoder.encode(arg);
view.set(buf);
return {
read: arg.length,
written: buf.length
};
});
function passStringToWasm0(arg, malloc, realloc) {
if (realloc === undefined) {
const buf = cachedTextEncoder.encode(arg);
const ptr = malloc(buf.length, 1) >>> 0;
getUint8ArrayMemory0().subarray(ptr, ptr + buf.length).set(buf);
WASM_VECTOR_LEN = buf.length;
return ptr;
}
let len = arg.length;
let ptr = malloc(len, 1) >>> 0;
const mem = getUint8ArrayMemory0();
let offset = 0;
for (; offset < len; offset++) {
const code = arg.charCodeAt(offset);
if (code > 0x7F) break;
mem[ptr + offset] = code;
}
if (offset !== len) {
if (offset !== 0) {
arg = arg.slice(offset);
}
ptr = realloc(ptr, len, len = offset + arg.length * 3, 1) >>> 0;
const view = getUint8ArrayMemory0().subarray(ptr + offset, ptr + len);
const ret = encodeString(arg, view);
offset += ret.written;
ptr = realloc(ptr, len, offset, 1) >>> 0;
}
WASM_VECTOR_LEN = offset;
return ptr;
}
/**
* @param {string} name
*/
export function greet(name) {
const ptr0 = passStringToWasm0(name, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc);
const len0 = WASM_VECTOR_LEN;
wasm.greet(ptr0, len0);
}
/**
* @param {number} left
* @param {number} right
* @returns {number}
*/
export function add(left, right) {
const ret = wasm.add(left, right);
return ret;
}
async function __wbg_load(module, imports) {
if (typeof Response === 'function' && module instanceof Response) {
if (typeof WebAssembly.instantiateStreaming === 'function') {
try {
return await WebAssembly.instantiateStreaming(module, imports);
} catch (e) {
if (module.headers.get('Content-Type') != 'application/wasm') {
console.warn("`WebAssembly.instantiateStreaming` failed because your server does not serve Wasm with `application/wasm` MIME type. Falling back to `WebAssembly.instantiate` which is slower. Original error:\n", e);
} else {
throw e;
}
}
}
const bytes = await module.arrayBuffer();
return await WebAssembly.instantiate(bytes, imports);
} else {
const instance = await WebAssembly.instantiate(module, imports);
if (instance instanceof WebAssembly.Instance) {
return { instance, module };
} else {
return instance;
}
}
}
function __wbg_get_imports() {
const imports = {};
imports.wbg = {};
imports.wbg.__wbg_eval_d1c6d8ede79fdfce = function(arg0, arg1) {
eval(getStringFromWasm0(arg0, arg1));
};
imports.wbg.__wbindgen_init_externref_table = function() {
const table = wasm.__wbindgen_export_0;
const offset = table.grow(4);
table.set(0, undefined);
table.set(offset + 0, undefined);
table.set(offset + 1, null);
table.set(offset + 2, true);
table.set(offset + 3, false);
;
};
return imports;
}
function __wbg_init_memory(imports, memory) {
}
function __wbg_finalize_init(instance, module) {
wasm = instance.exports;
__wbg_init.__wbindgen_wasm_module = module;
cachedUint8ArrayMemory0 = null;
wasm.__wbindgen_start();
return wasm;
}
function initSync(module) {
if (wasm !== undefined) return wasm;
if (typeof module !== 'undefined') {
if (Object.getPrototypeOf(module) === Object.prototype) {
({module} = module)
} else {
console.warn('using deprecated parameters for `initSync()`; pass a single object instead')
}
}
const imports = __wbg_get_imports();
__wbg_init_memory(imports);
if (!(module instanceof WebAssembly.Module)) {
module = new WebAssembly.Module(module);
}
const instance = new WebAssembly.Instance(module, imports);
return __wbg_finalize_init(instance, module);
}
async function __wbg_init(module_or_path) {
if (wasm !== undefined) return wasm;
if (typeof module_or_path !== 'undefined') {
if (Object.getPrototypeOf(module_or_path) === Object.prototype) {
({module_or_path} = module_or_path)
} else {
console.warn('using deprecated parameters for the initialization function; pass a single object instead')
}
}
if (typeof module_or_path === 'undefined') {
module_or_path = new URL('hello_wasm_bg.wasm', import.meta.url);
}
const imports = __wbg_get_imports();
if (typeof module_or_path === 'string' || (typeof Request === 'function' && module_or_path instanceof Request) || (typeof URL === 'function' && module_or_path instanceof URL)) {
module_or_path = fetch(module_or_path);
}
__wbg_init_memory(imports);
const { instance, module } = await __wbg_load(await module_or_path, imports);
return __wbg_finalize_init(instance, module);
}
export { initSync };
export default __wbg_init;

Binary file not shown.

View file

@ -32,7 +32,7 @@
from .downloader.rtmp import rtmpdump_version from .downloader.rtmp import rtmpdump_version
from .extractor import gen_extractor_classes, get_info_extractor, import_extractors from .extractor import gen_extractor_classes, get_info_extractor, import_extractors
from .extractor.common import UnsupportedURLIE from .extractor.common import UnsupportedURLIE
from .extractor.openload import PhantomJSwrapper from .jsinterp import PhantomJSwrapper
from .globals import ( from .globals import (
IN_CLI, IN_CLI,
LAZY_EXTRACTORS, LAZY_EXTRACTORS,
@ -445,6 +445,8 @@ class YoutubeDL:
Actual sleep time will be a random float from range Actual sleep time will be a random float from range
[sleep_interval; max_sleep_interval]. [sleep_interval; max_sleep_interval].
sleep_interval_subtitles: Number of seconds to sleep before each subtitle download sleep_interval_subtitles: Number of seconds to sleep before each subtitle download
jsi_preference: Preferred JS interpreters to use during extraction. Can be
given as comma-separated values.
listformats: Print an overview of available video formats and exit. listformats: Print an overview of available video formats and exit.
list_thumbnails: Print a table of all thumbnails and exit. list_thumbnails: Print a table of all thumbnails and exit.
match_filter: A function that gets called for every video with the signature match_filter: A function that gets called for every video with the signature

View file

@ -946,6 +946,7 @@ def parse_options(argv=None):
'sleep_interval': opts.sleep_interval, 'sleep_interval': opts.sleep_interval,
'max_sleep_interval': opts.max_sleep_interval, 'max_sleep_interval': opts.max_sleep_interval,
'sleep_interval_subtitles': opts.sleep_interval_subtitles, 'sleep_interval_subtitles': opts.sleep_interval_subtitles,
'jsi_preference': opts.jsi_preference,
'external_downloader': opts.external_downloader, 'external_downloader': opts.external_downloader,
'download_ranges': opts.download_ranges, 'download_ranges': opts.download_ranges,
'force_keyframes_at_cuts': opts.force_keyframes_at_cuts, 'force_keyframes_at_cuts': opts.force_keyframes_at_cuts,

View file

@ -4,7 +4,7 @@
import uuid import uuid
from .common import InfoExtractor from .common import InfoExtractor
from .openload import PhantomJSwrapper from ..jsinterp import PhantomJSwrapper
from ..utils import ( from ..utils import (
ExtractorError, ExtractorError,
UserNotLive, UserNotLive,

View file

@ -5,7 +5,7 @@
import urllib.parse import urllib.parse
from .common import InfoExtractor from .common import InfoExtractor
from .openload import PhantomJSwrapper from ..jsinterp import JSIWrapper
from ..utils import ( from ..utils import (
ExtractorError, ExtractorError,
clean_html, clean_html,
@ -398,6 +398,27 @@ class IqIE(InfoExtractor):
IE_DESC = 'International version of iQiyi' IE_DESC = 'International version of iQiyi'
_VALID_URL = r'https?://(?:www\.)?iq\.com/play/(?:[\w%-]*-)?(?P<id>\w+)' _VALID_URL = r'https?://(?:www\.)?iq\.com/play/(?:[\w%-]*-)?(?P<id>\w+)'
_TESTS = [{ _TESTS = [{
'url': 'https://www.iq.com/play/sangmin-dinneaw-episode-1-xmk7546rfw',
'md5': '63fcb4b7d4863472fe0a9be75d9e9d60',
'info_dict': {
'ext': 'mp4',
'id': 'xmk7546rfw',
'title': '尚岷与丁尼奥 第1集',
'description': 'md5:e8fe4a8da25f4b8c86bc5506b1c3faaa',
'duration': 3092,
'timestamp': 1735520401,
'upload_date': '20241230',
'episode_number': 1,
'episode': 'Episode 1',
'series': 'Sangmin Dinneaw',
'age_limit': 18,
'average_rating': float,
'categories': [],
'cast': ['Sangmin Choi', 'Ratana Aiamsaart'],
},
'expected_warnings': ['format is restricted'],
'jsi_matrix_features': ['dom'],
}, {
'url': 'https://www.iq.com/play/one-piece-episode-1000-1ma1i6ferf4', 'url': 'https://www.iq.com/play/one-piece-episode-1000-1ma1i6ferf4',
'md5': '2d7caf6eeca8a32b407094b33b757d39', 'md5': '2d7caf6eeca8a32b407094b33b757d39',
'info_dict': { 'info_dict': {
@ -418,6 +439,7 @@ class IqIE(InfoExtractor):
'format': '500', 'format': '500',
}, },
'expected_warnings': ['format is restricted'], 'expected_warnings': ['format is restricted'],
'skip': 'geo-restricted',
}, { }, {
# VIP-restricted video # VIP-restricted video
'url': 'https://www.iq.com/play/mermaid-in-the-fog-2021-gbdpx13bs4', 'url': 'https://www.iq.com/play/mermaid-in-the-fog-2021-gbdpx13bs4',
@ -449,7 +471,6 @@ class IqIE(InfoExtractor):
} }
_DASH_JS = ''' _DASH_JS = '''
console.log(page.evaluate(function() {
var tvid = "%(tvid)s"; var vid = "%(vid)s"; var src = "%(src)s"; var tvid = "%(tvid)s"; var vid = "%(vid)s"; var src = "%(src)s";
var uid = "%(uid)s"; var dfp = "%(dfp)s"; var mode = "%(mode)s"; var lang = "%(lang)s"; var uid = "%(uid)s"; var dfp = "%(dfp)s"; var mode = "%(mode)s"; var lang = "%(lang)s";
var bid_list = %(bid_list)s; var ut_list = %(ut_list)s; var tm = new Date().getTime(); var bid_list = %(bid_list)s; var ut_list = %(ut_list)s; var tm = new Date().getTime();
@ -515,9 +536,7 @@ class IqIE(InfoExtractor):
var dash_path = '/dash?' + enc_params.join('&'); dash_path += '&vf=' + cmd5x(dash_path); var dash_path = '/dash?' + enc_params.join('&'); dash_path += '&vf=' + cmd5x(dash_path);
dash_paths[bid] = dash_path; dash_paths[bid] = dash_path;
}); });
return JSON.stringify(dash_paths); console.log(JSON.stringify(dash_paths));
}));
saveAndExit();
''' '''
def _extract_vms_player_js(self, webpage, video_id): def _extract_vms_player_js(self, webpage, video_id):
@ -597,22 +616,22 @@ def _real_extract(self, url):
else: else:
ut_list = ['0'] ut_list = ['0']
jsi = JSIWrapper(self, url, ['dom'], timeout=120)
# bid 0 as an initial format checker # bid 0 as an initial format checker
dash_paths = self._parse_json(PhantomJSwrapper(self, timeout=120_000).get( dash_paths = self._parse_json(jsi.execute(self._DASH_JS % {
url, note2='Executing signature code (this may take a couple minutes)', 'tvid': video_info['tvId'],
html='<!DOCTYPE html>', video_id=video_id, jscode=self._DASH_JS % { 'vid': video_info['vid'],
'tvid': video_info['tvId'], 'src': traverse_obj(next_props, ('initialProps', 'pageProps', 'ptid'),
'vid': video_info['vid'], expected_type=str, default='04022001010011000000'),
'src': traverse_obj(next_props, ('initialProps', 'pageProps', 'ptid'), 'uid': uid,
expected_type=str, default='04022001010011000000'), 'dfp': self._get_cookie('dfp', ''),
'uid': uid, 'mode': self._get_cookie('mod', 'intl'),
'dfp': self._get_cookie('dfp', ''), 'lang': self._get_cookie('lang', 'en_us'),
'mode': self._get_cookie('mod', 'intl'), 'bid_list': '[' + ','.join(['0', *self._BID_TAGS.keys()]) + ']',
'lang': self._get_cookie('lang', 'en_us'), 'ut_list': '[' + ','.join(ut_list) + ']',
'bid_list': '[' + ','.join(['0', *self._BID_TAGS.keys()]) + ']', 'cmd5x_func': self._extract_cmd5x_function(webpage, video_id),
'ut_list': '[' + ','.join(ut_list) + ']', }, video_id, html='<!DOCTYPE html>'), video_id)
'cmd5x_func': self._extract_cmd5x_function(webpage, video_id),
})[1].strip(), video_id)
formats, subtitles = [], {} formats, subtitles = [], {}
initial_format_data = self._download_json( initial_format_data = self._download_json(

View file

@ -1,243 +0,0 @@
import collections
import contextlib
import json
import os
import subprocess
import tempfile
import urllib.parse
from ..utils import (
ExtractorError,
Popen,
check_executable,
format_field,
get_exe_version,
is_outdated_version,
shell_quote,
)
def cookie_to_dict(cookie):
cookie_dict = {
'name': cookie.name,
'value': cookie.value,
}
if cookie.port_specified:
cookie_dict['port'] = cookie.port
if cookie.domain_specified:
cookie_dict['domain'] = cookie.domain
if cookie.path_specified:
cookie_dict['path'] = cookie.path
if cookie.expires is not None:
cookie_dict['expires'] = cookie.expires
if cookie.secure is not None:
cookie_dict['secure'] = cookie.secure
if cookie.discard is not None:
cookie_dict['discard'] = cookie.discard
with contextlib.suppress(TypeError):
if (cookie.has_nonstandard_attr('httpOnly')
or cookie.has_nonstandard_attr('httponly')
or cookie.has_nonstandard_attr('HttpOnly')):
cookie_dict['httponly'] = True
return cookie_dict
def cookie_jar_to_list(cookie_jar):
return [cookie_to_dict(cookie) for cookie in cookie_jar]
class PhantomJSwrapper:
"""PhantomJS wrapper class
This class is experimental.
"""
INSTALL_HINT = 'Please download it from https://phantomjs.org/download.html'
_BASE_JS = R'''
phantom.onError = function(msg, trace) {{
var msgStack = ['PHANTOM ERROR: ' + msg];
if(trace && trace.length) {{
msgStack.push('TRACE:');
trace.forEach(function(t) {{
msgStack.push(' -> ' + (t.file || t.sourceURL) + ': ' + t.line
+ (t.function ? ' (in function ' + t.function +')' : ''));
}});
}}
console.error(msgStack.join('\n'));
phantom.exit(1);
}};
'''
_TEMPLATE = R'''
var page = require('webpage').create();
var fs = require('fs');
var read = {{ mode: 'r', charset: 'utf-8' }};
var write = {{ mode: 'w', charset: 'utf-8' }};
JSON.parse(fs.read("{cookies}", read)).forEach(function(x) {{
phantom.addCookie(x);
}});
page.settings.resourceTimeout = {timeout};
page.settings.userAgent = "{ua}";
page.onLoadStarted = function() {{
page.evaluate(function() {{
delete window._phantom;
delete window.callPhantom;
}});
}};
var saveAndExit = function() {{
fs.write("{html}", page.content, write);
fs.write("{cookies}", JSON.stringify(phantom.cookies), write);
phantom.exit();
}};
page.onLoadFinished = function(status) {{
if(page.url === "") {{
page.setContent(fs.read("{html}", read), "{url}");
}}
else {{
{jscode}
}}
}};
page.open("");
'''
_TMP_FILE_NAMES = ['script', 'html', 'cookies']
@staticmethod
def _version():
return get_exe_version('phantomjs', version_re=r'([0-9.]+)')
def __init__(self, extractor, required_version=None, timeout=10000):
self._TMP_FILES = {}
self.exe = check_executable('phantomjs', ['-v'])
if not self.exe:
raise ExtractorError(f'PhantomJS not found, {self.INSTALL_HINT}', expected=True)
self.extractor = extractor
if required_version:
version = self._version()
if is_outdated_version(version, required_version):
self.extractor._downloader.report_warning(
'Your copy of PhantomJS is outdated, update it to version '
f'{required_version} or newer if you encounter any errors.')
for name in self._TMP_FILE_NAMES:
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.close()
self._TMP_FILES[name] = tmp
self.options = collections.ChainMap({
'timeout': timeout,
}, {
x: self._TMP_FILES[x].name.replace('\\', '\\\\').replace('"', '\\"')
for x in self._TMP_FILE_NAMES
})
def __del__(self):
for name in self._TMP_FILE_NAMES:
with contextlib.suppress(OSError, KeyError):
os.remove(self._TMP_FILES[name].name)
def _save_cookies(self, url):
cookies = cookie_jar_to_list(self.extractor.cookiejar)
for cookie in cookies:
if 'path' not in cookie:
cookie['path'] = '/'
if 'domain' not in cookie:
cookie['domain'] = urllib.parse.urlparse(url).netloc
with open(self._TMP_FILES['cookies'].name, 'wb') as f:
f.write(json.dumps(cookies).encode())
def _load_cookies(self):
with open(self._TMP_FILES['cookies'].name, 'rb') as f:
cookies = json.loads(f.read().decode('utf-8'))
for cookie in cookies:
if cookie['httponly'] is True:
cookie['rest'] = {'httpOnly': None}
if 'expiry' in cookie:
cookie['expire_time'] = cookie['expiry']
self.extractor._set_cookie(**cookie)
def get(self, url, html=None, video_id=None, note=None, note2='Executing JS on webpage', headers={}, jscode='saveAndExit();'):
"""
Downloads webpage (if needed) and executes JS
Params:
url: website url
html: optional, html code of website
video_id: video id
note: optional, displayed when downloading webpage
note2: optional, displayed when executing JS
headers: custom http headers
jscode: code to be executed when page is loaded
Returns tuple with:
* downloaded website (after JS execution)
* anything you print with `console.log` (but not inside `page.execute`!)
In most cases you don't need to add any `jscode`.
It is executed in `page.onLoadFinished`.
`saveAndExit();` is mandatory, use it instead of `phantom.exit()`
It is possible to wait for some element on the webpage, e.g.
var check = function() {
var elementFound = page.evaluate(function() {
return document.querySelector('#b.done') !== null;
});
if(elementFound)
saveAndExit();
else
window.setTimeout(check, 500);
}
page.evaluate(function(){
document.querySelector('#a').click();
});
check();
"""
if 'saveAndExit();' not in jscode:
raise ExtractorError('`saveAndExit();` not found in `jscode`')
if not html:
html = self.extractor._download_webpage(url, video_id, note=note, headers=headers)
with open(self._TMP_FILES['html'].name, 'wb') as f:
f.write(html.encode())
self._save_cookies(url)
user_agent = headers.get('User-Agent') or self.extractor.get_param('http_headers')['User-Agent']
jscode = self._TEMPLATE.format_map(self.options.new_child({
'url': url,
'ua': user_agent.replace('"', '\\"'),
'jscode': jscode,
}))
stdout = self.execute(jscode, video_id, note=note2)
with open(self._TMP_FILES['html'].name, 'rb') as f:
html = f.read().decode('utf-8')
self._load_cookies()
return html, stdout
def execute(self, jscode, video_id=None, *, note='Executing JS'):
"""Execute JS and return stdout"""
if 'phantom.exit();' not in jscode:
jscode += ';\nphantom.exit();'
jscode = self._BASE_JS + jscode
with open(self._TMP_FILES['script'].name, 'w', encoding='utf-8') as f:
f.write(jscode)
self.extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}')
cmd = [self.exe, '--ssl-protocol=any', self._TMP_FILES['script'].name]
self.extractor.write_debug(f'PhantomJS command line: {shell_quote(cmd)}')
try:
stdout, stderr, returncode = Popen.run(cmd, timeout=self.options['timeout'] / 1000,
text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
raise ExtractorError(f'{note} failed: Unable to run PhantomJS binary', cause=e)
if returncode:
raise ExtractorError(f'{note} failed with returncode {returncode}:\n{stderr.strip()}')
return stdout

View file

@ -5,7 +5,7 @@
import re import re
from .common import InfoExtractor from .common import InfoExtractor
from .openload import PhantomJSwrapper from ..jsinterp import PhantomJSwrapper
from ..networking import Request from ..networking import Request
from ..networking.exceptions import HTTPError from ..networking.exceptions import HTTPError
from ..utils import ( from ..utils import (

View file

@ -6,7 +6,7 @@
from .common import InfoExtractor from .common import InfoExtractor
from .periscope import PeriscopeBaseIE, PeriscopeIE from .periscope import PeriscopeBaseIE, PeriscopeIE
from ..jsinterp import js_number_to_string from ..jsinterp.native import js_number_to_string
from ..networking.exceptions import HTTPError from ..networking.exceptions import HTTPError
from ..utils import ( from ..utils import (
ExtractorError, ExtractorError,

View file

@ -21,8 +21,7 @@
import urllib.parse import urllib.parse
from .common import InfoExtractor, SearchInfoExtractor from .common import InfoExtractor, SearchInfoExtractor
from .openload import PhantomJSwrapper from ..jsinterp import JSInterpreter, PhantomJSwrapper
from ..jsinterp import JSInterpreter
from ..networking.exceptions import HTTPError, network_exceptions from ..networking.exceptions import HTTPError, network_exceptions
from ..utils import ( from ..utils import (
NO_DEFAULT, NO_DEFAULT,

View file

@ -0,0 +1,14 @@
# flake8: noqa: F401
from .native import JSInterpreter
from .common import _JSI_PREFERENCES, _JSI_HANDLERS, JSIWrapper
from ._phantomjs import PhantomJSwrapper
from . import _deno # ensure jsi registration
__all__ = [
JSInterpreter,
PhantomJSwrapper,
_JSI_HANDLERS,
_JSI_PREFERENCES,
JSIWrapper,
]

195
yt_dlp/jsinterp/_deno.py Normal file
View file

@ -0,0 +1,195 @@
from __future__ import annotations
import http.cookiejar
import json
import subprocess
import typing
import urllib.parse
from ..utils import (
ExtractorError,
Popen,
int_or_none,
shell_quote,
unified_timestamp,
)
from ._helper import TempFileWrapper, random_string, override_navigator_js, extract_script_tags
from .common import ExternalJSI, register_jsi
@register_jsi
class DenoJSI(ExternalJSI):
"""JS interpreter class using Deno binary"""
_SUPPORTED_FEATURES = {'wasm', 'location'}
_BASE_PREFERENCE = 5
_EXE_NAME = 'deno'
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_INIT_SCRIPT = 'localStorage.clear(); delete window.Deno; global = window;\n'
def __init__(self, *args, flags=[], replace_flags=False, init_script=None, **kwargs):
super().__init__(*args, **kwargs)
self._flags = flags if replace_flags else [*self._DENO_FLAGS, *flags]
self._init_script = self._INIT_SCRIPT if init_script is None else init_script
@property
def _override_navigator_js(self):
return override_navigator_js(self.user_agent)
def _run_deno(self, cmd):
self.write_debug(f'Deno command line: {shell_quote(cmd)}')
try:
stdout, stderr, returncode = Popen.run(
cmd, timeout=self.timeout, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
raise ExtractorError('Unable to run Deno binary', cause=e)
if returncode:
raise ExtractorError(f'Failed with returncode {returncode}:\n{stderr}')
elif stderr:
self.report_warning(f'JS console error msg:\n{stderr.strip()}')
return stdout.strip()
def execute(self, jscode, video_id=None, note='Executing JS in Deno'):
self.report_note(video_id, note)
location_args = ['--location', self._url] if self._url else []
with TempFileWrapper(f'{self._init_script};\n{self._override_navigator_js}\n{jscode}', suffix='.js') as js_file:
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
return self._run_deno(cmd)
@register_jsi
class DenoJSDomJSI(DenoJSI):
_SUPPORTED_FEATURES = {'wasm', 'location', 'dom', 'cookies'}
_BASE_PREFERENCE = 4
_DENO_FLAGS = ['--cached-only', '--no-prompt', '--no-check']
_JSDOM_IMPORT_CHECKED = False
_JSDOM_URL = 'https://cdn.esm.sh/jsdom'
@staticmethod
def serialize_cookie(cookiejar: YoutubeDLCookieJar | None, url: str):
"""serialize netscape-compatible fields from cookiejar for tough-cookie loading"""
# JSDOM use tough-cookie as its CookieJar https://github.com/jsdom/jsdom/blob/main/lib/api.js
# tough-cookie use Cookie.fromJSON and Cookie.toJSON for cookie serialization
# https://github.com/salesforce/tough-cookie/blob/master/lib/cookie/cookie.ts
if not cookiejar:
return json.dumps({'cookies': []})
cookies: list[http.cookiejar.Cookie] = list(cookiejar.get_cookies_for_url(url))
return json.dumps({'cookies': [{
'key': cookie.name,
'value': cookie.value,
# leading dot of domain must be removed, otherwise will fail to match
'domain': cookie.domain.lstrip('.') or urllib.parse.urlparse(url).hostname,
'expires': int_or_none(cookie.expires, invscale=1000),
'hostOnly': not cookie.domain_initial_dot,
'secure': bool(cookie.secure),
'path': cookie.path,
} for cookie in cookies if cookie.value]})
@staticmethod
def apply_cookies(cookiejar: YoutubeDLCookieJar | None, cookies: list[dict]):
"""apply cookies from serialized tough-cookie"""
# see serialize_cookie
if not cookiejar:
return
for cookie_dict in cookies:
if not all(cookie_dict.get(k) for k in ('key', 'value', 'domain')):
continue
if cookie_dict.get('hostOnly'):
cookie_dict['domain'] = cookie_dict['domain'].lstrip('.')
else:
cookie_dict['domain'] = '.' + cookie_dict['domain'].lstrip('.')
cookiejar.set_cookie(http.cookiejar.Cookie(
0, cookie_dict['key'], cookie_dict['value'],
None, False,
cookie_dict['domain'], True, not cookie_dict.get('hostOnly'),
cookie_dict.get('path', '/'), True,
bool(cookie_dict.get('secure')),
unified_timestamp(cookie_dict.get('expires')),
False, None, None, {}))
def _ensure_jsdom(self):
if self._JSDOM_IMPORT_CHECKED:
return
with TempFileWrapper(f'import jsdom from "{self._JSDOM_URL}"', suffix='.js') as js_file:
cmd = [self.exe, 'run', js_file.name]
self._run_deno(cmd)
self._JSDOM_IMPORT_CHECKED = True
def execute(self, jscode, video_id=None, note='Executing JS in Deno with jsdom', html='', cookiejar=None):
self.report_note(video_id, note)
self._ensure_jsdom()
if cookiejar and not self._url:
self.report_warning('No valid url scope provided, cookiejar is not applied')
cookiejar = None
html, inline_scripts = extract_script_tags(html)
wrapper_scripts = '\n'.join(['try { %s } catch (e) {}' % script for script in inline_scripts])
callback_varname = f'__callback_{random_string()}'
script = f'''{self._init_script};
import jsdom from "{self._JSDOM_URL}";
let {callback_varname} = (() => {{
const jar = jsdom.CookieJar.deserializeSync({json.dumps(self.serialize_cookie(cookiejar, self._url))});
const dom = new jsdom.JSDOM({json.dumps(str(html))}, {{
{'url: %s,' % json.dumps(str(self._url)) if self._url else ''}
cookieJar: jar,
pretendToBeVisual: true,
}});
Object.keys(dom.window).filter(key => !['atob', 'btoa', 'crypto', 'location'].includes(key))
.filter(key => !(window.location? [] : ['sessionStorage', 'localStorage']).includes(key))
.forEach((key) => {{
try {{window[key] = dom.window[key]}} catch (e) {{ console.error(e) }}
}});
{self._override_navigator_js};
window.screen = {{
availWidth: 1920,
availHeight: 1040,
width: 1920,
height: 1080,
colorDepth: 24,
isExtended: true,
onchange: null,
orientation: {{angle: 0, type: 'landscape-primary', onchange: null}},
pixelDepth: 24,
}}
Object.defineProperty(document.body, 'clientWidth', {{value: 1903}});
Object.defineProperty(document.body, 'clientHeight', {{value: 2000}});
document.domain = location?.hostname;
delete window.jsdom;
const origLog = console.log;
console.log = () => {{}};
console.info = () => {{}};
return () => {{
const stdout = [];
console.log = (...msg) => stdout.push(msg.map(m => '' + m).join(' '));
return () => {{ origLog(JSON.stringify({{
stdout: stdout.join('\\n'), cookies: jar.serializeSync().cookies}})); }}
}}
}})();
{wrapper_scripts}
{callback_varname} = {callback_varname}(); // begin to capture console.log
try {{
{jscode}
}} finally {{
{callback_varname}();
}}
'''
location_args = ['--location', self._url] if self._url else []
with TempFileWrapper(script, suffix='.js') as js_file:
cmd = [self.exe, 'run', *self._flags, *location_args, js_file.name]
result = self._run_deno(cmd)
try:
data = json.loads(result)
except json.JSONDecodeError as e:
raise ExtractorError(f'Failed to parse JSON output from Deno: {result}', cause=e)
self.apply_cookies(cookiejar, data['cookies'])
return data['stdout']
if typing.TYPE_CHECKING:
from ..cookies import YoutubeDLCookieJar

135
yt_dlp/jsinterp/_helper.py Normal file
View file

@ -0,0 +1,135 @@
from __future__ import annotations
import contextlib
import json
import os
import random
import re
import string
import tempfile
class TempFileWrapper:
"""
Wrapper for NamedTemporaryFile, auto closes file after io and deletes file upon wrapper object gc
@param {str | bytes | None} content: content to write to file upon creation
@param {bool} text: whether to open file in text mode
@param {str} encoding: encoding to use for text mode
@param {str | None} suffix: suffix for filename of temporary file
"""
def __init__(self, content: str | bytes | None = None, text: bool = True,
encoding='utf-8', suffix: str | None = None):
self.encoding = None if not text else encoding
self.text = text
self._file = tempfile.NamedTemporaryFile('w' if text else 'wb', encoding=self.encoding,
suffix=suffix, delete=False)
if content:
self._file.write(content)
self._file.close()
@property
def name(self):
return self._file.name
@contextlib.contextmanager
def opened_file(self, mode, *, seek=None, seek_whence=0):
mode = mode if (self.text or 'b' in mode) else mode + 'b'
with open(self._file.name, mode, encoding=self.encoding) as f:
if seek is not None:
self._file.seek(seek, seek_whence)
yield f
def write(self, s, seek=None, seek_whence=0):
"""re-open file in write mode and write, optionally seek to position first"""
with self.opened_file('w', seek=seek, seek_whence=seek_whence) as f:
return f.write(s)
def append_write(self, s, seek=None, seek_whence=0):
"""re-open file in append mode and write, optionally seek to position first"""
with self.opened_file('a', seek=seek, seek_whence=seek_whence) as f:
return f.write(s)
def read(self, n=-1, seek=None, seek_whence=0):
"""re-open file and read, optionally seek to position first"""
with self.opened_file('r', seek=seek, seek_whence=seek_whence) as f:
return f.read(n)
def cleanup(self):
with contextlib.suppress(OSError):
os.remove(self._file.name)
def __del__(self):
self.cleanup()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
def random_string(length: int = 10) -> str:
return ''.join(random.choices(string.ascii_letters, k=length))
def override_navigator_js(user_agent: str) -> str:
"""Generate js snippet to override navigator properties based on user_agent string"""
return '\n'.join([
'Object.defineProperty(navigator, "%s", { value: %s, configurable: true });' % (k, json.dumps(v))
for k, v in {
'userAgent': user_agent,
'language': 'en-US',
'languages': ['en-US'],
'webdriver': False,
'cookieEnabled': True,
'appCodeName': user_agent.split('/', maxsplit=1)[0],
'appName': 'Netscape',
'appVersion': user_agent.split('/', maxsplit=1)[-1],
'platform': 'Win32',
'product': 'Gecko',
'productSub': '20030107',
'vendor': 'Google Inc.',
'vendorSub': '',
'onLine': True,
}.items()
])
def extract_script_tags(html: str) -> tuple[str, list[str]]:
script_indicies = []
inline_scripts = []
for match_start in re.finditer(r'<script[^>]*>', html, re.DOTALL | re.IGNORECASE):
end = html.find('</script>', match_start.end())
if end > match_start.end():
script_indicies.append((match_start.start(), end + len('</script>')))
inline_scripts.append(html[match_start.end():end])
for start, end in script_indicies:
html = html[:start] + html[end:]
return html, inline_scripts
def prepare_wasm_jsmodule(js_mod: str, wasm: bytes) -> str:
"""
Sanitize js wrapper module generated by rust wasm-pack for wasm init
removes export and import.meta and inlines wasm binary as Uint8Array
See test/test_data/jsi_external/hello_wasm.js for example
@param {str} js_mod: js wrapper module generated by rust wasm-pack
@param {bytes} wasm: wasm binary
"""
js_mod = re.sub(r'export(?:\s+default)?([\s{])', r'\1', js_mod)
js_mod = js_mod.replace('import.meta', '{}')
return js_mod + ''';
await (async () => {
const t = __wbg_get_imports();
__wbg_init_memory(t);
const {module, instance} = await WebAssembly.instantiate(Uint8Array.from(%s), t);
__wbg_finalize_init(instance, module);
})();
''' % list(wasm)

View file

@ -0,0 +1,264 @@
from __future__ import annotations
import contextlib
import http.cookiejar
import json
import subprocess
import typing
import urllib.parse
from ..utils import (
ExtractorError,
Popen,
filter_dict,
int_or_none,
is_outdated_version,
shell_quote,
)
from ._helper import TempFileWrapper, random_string, extract_script_tags
from .common import ExternalJSI, register_jsi
@register_jsi
class PhantomJSJSI(ExternalJSI):
_EXE_NAME = 'phantomjs'
_SUPPORTED_FEATURES = {'location', 'cookies', 'dom'}
_BASE_PREFERENCE = 3
_BASE_JS = R'''
phantom.onError = function(msg, trace) {{
var msgStack = ['PHANTOM ERROR: ' + msg];
if(trace && trace.length) {{
msgStack.push('TRACE:');
trace.forEach(function(t) {{
msgStack.push(' -> ' + (t.file || t.sourceURL) + ': ' + t.line
+ (t.function ? ' (in function ' + t.function +')' : ''));
}});
}}
console.error(msgStack.join('\n'));
phantom.exit(1);
}};
'''
_TEMPLATE = R'''
var page = require('webpage').create();
var fs = require('fs');
var read = {{ mode: 'r', charset: 'utf-8' }};
var write = {{ mode: 'w', charset: 'utf-8' }};
page.settings.resourceTimeout = {timeout};
page.settings.userAgent = {ua};
page.onLoadStarted = function() {{
page.evaluate(function() {{
delete window._phantom;
delete window.callPhantom;
}});
}};
var saveAndExit = function() {{
fs.write({html_fn}, page.content, write);
fs.write({cookies_fn}, JSON.stringify(phantom.cookies), write);
phantom.exit();
}};
var loaded = false;
page.onLoadFinished = function(status) {{
if(page.url === "" && !loaded) {{
page.setContent(fs.read({html_fn}, read), {url});
loaded = true;
}}
else {{
JSON.parse(fs.read({cookies_fn}, read)).forEach(function(x) {{
phantom.addCookie(x);
}});
{jscode}
}}
}};
page.open("");
'''
def _save_cookies(self, url, cookiejar: YoutubeDLCookieJar | None):
def _cookie_to_dict(cookie: http.cookiejar.Cookie):
cookie_dict = {
'name': cookie.name,
'value': cookie.value,
'port': cookie.port,
'domain': cookie.domain,
'path': cookie.path or '/',
'expires': int_or_none(cookie.expires, invscale=1000),
'secure': cookie.secure,
'discard': cookie.discard,
}
if not cookie_dict['domain']:
cookie_dict['domain'] = urllib.parse.urlparse(url).hostname
cookie_dict['port'] = urllib.parse.urlparse(url).port
with contextlib.suppress(TypeError):
if (cookie.has_nonstandard_attr('httpOnly')
or cookie.has_nonstandard_attr('httponly')
or cookie.has_nonstandard_attr('HttpOnly')):
cookie_dict['httponly'] = True
return filter_dict(cookie_dict)
cookies = cookiejar.get_cookies_for_url(url) if cookiejar else []
return json.dumps([_cookie_to_dict(cookie) for cookie in cookies])
def _load_cookies(self, cookies_json: str, cookiejar: YoutubeDLCookieJar | None):
if not cookiejar:
return
cookies = json.loads(cookies_json)
for cookie in cookies:
cookiejar.set_cookie(http.cookiejar.Cookie(
0, cookie['name'], cookie['value'], cookie.get('port'), cookie.get('port') is not None,
cookie['domain'], True, cookie['domain'].startswith('.'),
cookie.get('path', '/'), True,
cookie.get('secure', False), cookie.get('expiry'),
cookie.get('discard', False), None, None,
{'httpOnly': None} if cookie.get('httponly') is True else {},
))
def _execute(self, jscode: str, video_id=None, *, note='Executing JS in PhantomJS'):
"""Execute JS and return stdout"""
if 'phantom.exit();' not in jscode:
jscode += ';\nphantom.exit();'
jscode = self._BASE_JS + jscode
self.report_note(video_id, note)
with TempFileWrapper(jscode, suffix='.js') as js_file:
cmd = [self.exe, '--ssl-protocol=any', js_file.name]
self.write_debug(f'PhantomJS command line: {shell_quote(cmd)}')
try:
stdout, stderr, returncode = Popen.run(
cmd, timeout=self.timeout, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
raise ExtractorError(f'{note} failed: Unable to run PhantomJS binary', cause=e)
if returncode:
raise ExtractorError(f'{note} failed with returncode {returncode}:\n{stderr.strip()}')
return stdout
def _execute_html(self, jscode: str, url: str, html: str, cookiejar, video_id=None, note='Executing JS on webpage'):
if 'saveAndExit();' not in jscode:
raise ExtractorError('`saveAndExit();` not found in `jscode`')
if cookiejar and not url:
self.report_warning('No valid url scope provided, cookiejar is not applied')
cookiejar = None
html, inline_scripts = extract_script_tags(html)
wrapped_scripts = '\n'.join([
'page.evaluate(function() { try { %s } catch (e) {} });' % inline for inline in inline_scripts])
html_file = TempFileWrapper(html, suffix='.html')
cookie_file = TempFileWrapper(self._save_cookies(url, cookiejar), suffix='.json')
script = self._TEMPLATE.format_map({
'url': json.dumps(str(url)),
'ua': json.dumps(str(self.user_agent)),
'jscode': f'{wrapped_scripts}\n{jscode}',
'html_fn': json.dumps(html_file.name),
'cookies_fn': json.dumps(cookie_file.name),
'timeout': int(self.timeout * 1000),
})
stdout = self._execute(script, video_id, note=note)
self._load_cookies(cookie_file.read(), cookiejar)
new_html = html_file.read()
return new_html, stdout
def execute(self, jscode, video_id=None, note='Executing JS in PhantomJS', html='', cookiejar=None):
jscode = '''console.log(page.evaluate(function() {
var %(std_var)s = [];
console.log = function() {
var values = '';
for (var i = 0; i < arguments.length; i++) {
values += arguments[i] + ' ';
}
%(std_var)s.push(values);
}
%(jscode)s;
return %(std_var)s.join('\\n');
}));
saveAndExit();''' % {
'std_var': f'__stdout__values_{random_string()}',
'jscode': jscode,
}
return self._execute_html(jscode, self._url, html, cookiejar, video_id=video_id, note=note)[1].strip()
class PhantomJSwrapper:
"""PhantomJS wrapper class
This class is experimental.
"""
INSTALL_HINT = 'Please download PhantomJS from https://phantomjs.org/download.html'
@classmethod
def _version(cls):
return PhantomJSJSI.exe_version
def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000):
self._jsi = PhantomJSJSI(extractor._downloader, '', timeout / 1000, {})
if not self._jsi.is_available():
raise ExtractorError(f'PhantomJS not found, {self.INSTALL_HINT}', expected=True)
self.extractor = extractor
if required_version:
if is_outdated_version(self._jsi.exe_version, required_version):
self._jsi.report_warning(
'Your copy of PhantomJS is outdated, update it to version '
f'{required_version} or newer if you encounter any errors.')
def get(self, url, html=None, video_id=None, note=None, note2='Executing JS on webpage', headers={}, jscode='saveAndExit();'):
"""
Downloads webpage (if needed) and executes JS
Params:
url: website url
html: optional, html code of website
video_id: video id
note: optional, displayed when downloading webpage
note2: optional, displayed when executing JS
headers: custom http headers
jscode: code to be executed when page is loaded
Returns tuple with:
* downloaded website (after JS execution)
* anything you print with `console.log` (but not inside `page.execute`!)
In most cases you don't need to add any `jscode`.
It is executed in `page.onLoadFinished`.
`saveAndExit();` is mandatory, use it instead of `phantom.exit()`
It is possible to wait for some element on the webpage, e.g.
var check = function() {
var elementFound = page.evaluate(function() {
return document.querySelector('#b.done') !== null;
});
if(elementFound)
saveAndExit();
else
window.setTimeout(check, 500);
}
page.evaluate(function(){
document.querySelector('#a').click();
});
check();
"""
if 'saveAndExit();' not in jscode:
raise ExtractorError('`saveAndExit();` not found in `jscode`')
if not html:
html = self.extractor._download_webpage(url, video_id, note=note, headers=headers)
self._jsi.user_agent = headers.get('User-Agent') or self.extractor.get_param('http_headers')['User-Agent']
return self._jsi._execute_html(jscode, url, html, self.extractor.cookiejar, video_id=video_id, note=note2)
def execute(self, jscode, video_id=None, *, note='Executing JS in PhantomJS'):
"""Execute JS and return stdout"""
return self._jsi.execute(jscode, video_id=video_id, note=note)
if typing.TYPE_CHECKING:
from ..extractor.common import InfoExtractor
from ..cookies import YoutubeDLCookieJar

314
yt_dlp/jsinterp/common.py Normal file
View file

@ -0,0 +1,314 @@
from __future__ import annotations
import abc
import typing
import functools
from ..extractor.common import InfoExtractor
from ..utils import (
classproperty,
format_field,
filter_dict,
get_exe_version,
variadic,
url_or_none,
sanitize_url,
ExtractorError,
)
_JSI_HANDLERS: dict[str, type[JSI]] = {}
_JSI_PREFERENCES: set[JSIPreference] = set()
_ALL_FEATURES = {
'wasm',
'location',
'dom',
'cookies',
}
def get_jsi_keys(jsi_or_keys: typing.Iterable[str | type[JSI] | JSI]) -> list[str]:
return [jok if isinstance(jok, str) else jok.JSI_KEY for jok in jsi_or_keys]
def filter_jsi_keys(features=None, only_include=None, exclude=None):
keys = list(_JSI_HANDLERS)
if features:
keys = [key for key in keys if key in _JSI_HANDLERS
and _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)]
if only_include:
keys = [key for key in keys if key in get_jsi_keys(only_include)]
if exclude:
keys = [key for key in keys if key not in get_jsi_keys(exclude)]
return keys
def filter_jsi_include(only_include: typing.Iterable[str] | None, exclude: typing.Iterable[str] | None):
keys = get_jsi_keys(only_include) if only_include else _JSI_HANDLERS.keys()
return [key for key in keys if key not in (exclude or [])]
def filter_jsi_feature(features: typing.Iterable[str], keys=None):
keys = keys if keys is not None else _JSI_HANDLERS.keys()
return [key for key in keys if key in _JSI_HANDLERS
and _JSI_HANDLERS[key]._SUPPORTED_FEATURES.issuperset(features)]
def order_to_pref(jsi_order: typing.Iterable[str | type[JSI] | JSI], multiplier: int) -> JSIPreference:
jsi_order = reversed(get_jsi_keys(jsi_order))
pref_score = {jsi_cls: (i + 1) * multiplier for i, jsi_cls in enumerate(jsi_order)}
def _pref(jsi: JSI, *args):
return pref_score.get(jsi.JSI_KEY, 0)
return _pref
def require_features(param_features: dict[str, str | typing.Iterable[str]]):
assert all(_ALL_FEATURES.issuperset(variadic(kw_feature)) for kw_feature in param_features.values())
def outer(func):
@functools.wraps(func)
def inner(self: JSIWrapper, *args, **kwargs):
for kw_name, kw_feature in param_features.items():
if kw_name in kwargs and not self._features.issuperset(variadic(kw_feature)):
raise ExtractorError(f'feature {kw_feature} is required for `{kw_name}` param but not declared')
return func(self, *args, **kwargs)
return inner
return outer
class JSIWrapper:
"""
Helper class to forward JS interp request to a JSI that supports it.
Usage:
```
def _real_extract(self, url):
...
jsi = JSIWrapper(self, url, features=['js'])
result = jsi.execute(jscode, video_id)
...
```
Features:
- `wasm`: supports window.WebAssembly
- `location`: supports mocking window.location
- `dom`: supports DOM interface (not necessarily rendering)
- `cookies`: supports document.cookie read & write
@param dl_or_ie: `YoutubeDL` or `InfoExtractor` instance.
@param url: setting url context, used by JSI that supports `location` feature
@param features: only JSI that supports all of these features will be selected
@param only_include: limit JSI to choose from.
@param exclude: JSI to avoid using.
@param jsi_params: extra kwargs to pass to `JSI.__init__()` for each JSI, using jsi key as dict key.
@param preferred_order: list of JSI to use. First in list is tested first.
@param fallback_jsi: list of JSI that may fail and should act non-fatal and fallback to other JSI. Pass `"all"` to always fallback
@param timeout: timeout parameter for all chosen JSI
@param user_agent: override user-agent to use for supported JSI
"""
def __init__(
self,
dl_or_ie: YoutubeDL | InfoExtractor,
url: str = '',
features: typing.Iterable[str] = [],
only_include: typing.Iterable[str | type[JSI]] = [],
exclude: typing.Iterable[str | type[JSI]] = [],
jsi_params: dict[str, dict] = {},
preferred_order: typing.Iterable[str | type[JSI]] = [],
fallback_jsi: typing.Iterable[str | type[JSI]] | typing.Literal['all'] = [],
timeout: float | int = 10,
user_agent: str | None = None,
):
self._downloader: YoutubeDL = dl_or_ie._downloader if isinstance(dl_or_ie, InfoExtractor) else dl_or_ie
self._url = sanitize_url(url_or_none(url)) or ''
self._features = set(features)
if url and not self._url:
self.report_warning(f'Invalid URL: "{url}", using empty string instead')
if unsupported_features := self._features - _ALL_FEATURES:
raise ExtractorError(f'Unsupported features: {unsupported_features}, allowed features: {_ALL_FEATURES}')
user_prefs = self._downloader.params.get('jsi_preference', [])
for invalid_key in [jsi_key for jsi_key in user_prefs if jsi_key not in _JSI_HANDLERS]:
self.report_warning(f'`{invalid_key}` is not a valid JSI, ignoring preference setting')
user_prefs.remove(invalid_key)
handler_classes = [_JSI_HANDLERS[key] for key in filter_jsi_keys(self._features, only_include, exclude)]
self.write_debug(f'Select JSI for features={self._features}: {get_jsi_keys(handler_classes)}, '
f'included: {get_jsi_keys(only_include) or "all"}, excluded: {get_jsi_keys(exclude)}')
if not handler_classes:
raise ExtractorError(f'No JSI supports features={self._features}')
self._handler_dict = {cls.JSI_KEY: cls(
self._downloader, url=self._url, timeout=timeout, features=self._features,
user_agent=user_agent, **jsi_params.get(cls.JSI_KEY, {}),
) for cls in handler_classes}
self.preferences: set[JSIPreference] = {
order_to_pref(user_prefs, 10000), order_to_pref(preferred_order, 100)} | _JSI_PREFERENCES
self._fallback_jsi = get_jsi_keys(handler_classes) if fallback_jsi == 'all' else get_jsi_keys(fallback_jsi)
self._is_test = self._downloader.params.get('test', False)
def write_debug(self, message, only_once=False):
return self._downloader.write_debug(f'[JSIDirector] {message}', only_once=only_once)
def report_warning(self, message, only_once=False):
return self._downloader.report_warning(f'[JSIDirector] {message}', only_once=only_once)
def _get_handlers(self, method_name: str, *args, **kwargs) -> list[JSI]:
handlers = [h for h in self._handler_dict.values() if callable(getattr(h, method_name, None))]
self.write_debug(f'Choosing handlers for method `{method_name}`: {get_jsi_keys(handlers)}')
if not handlers:
raise ExtractorError(f'No JSI supports method `{method_name}`, '
f'included handlers: {get_jsi_keys(self._handler_dict.values())}')
preferences = {
handler.JSI_KEY: sum(pref_func(handler, method_name, args, kwargs) for pref_func in self.preferences)
for handler in handlers
}
self.write_debug('JSI preferences for `{}` request: {}'.format(
method_name, ', '.join(f'{key}={pref}' for key, pref in preferences.items())))
return sorted(handlers, key=lambda h: preferences[h.JSI_KEY], reverse=True)
def _dispatch_request(self, method_name: str, *args, **kwargs):
handlers = self._get_handlers(method_name, *args, **kwargs)
unavailable: list[str] = []
exceptions: list[tuple[JSI, Exception]] = []
for handler in handlers:
if not handler.is_available():
if self._is_test:
raise ExtractorError(f'{handler.JSI_NAME} is not available for testing, '
f'add "{handler.JSI_KEY}" in `exclude` if it should not be used')
self.write_debug(f'{handler.JSI_KEY} is not available')
unavailable.append(handler.JSI_NAME)
continue
try:
self.write_debug(f'Dispatching `{method_name}` task to {handler.JSI_NAME}')
return getattr(handler, method_name)(*args, **kwargs)
except ExtractorError as e:
if handler.JSI_KEY not in self._fallback_jsi:
raise
else:
exceptions.append((handler, e))
self.write_debug(f'{handler.JSI_NAME} encountered error, fallback to next handler: {e}')
if not exceptions:
msg = f'No available JSI installed, please install one of: {", ".join(unavailable)}'
else:
msg = f'Failed to perform {method_name}, total {len(exceptions)} errors'
if unavailable:
msg = f'{msg}. You can try installing one of unavailable JSI: {", ".join(unavailable)}'
raise ExtractorError(msg)
@require_features({'location': 'location', 'html': 'dom', 'cookiejar': 'cookies'})
def execute(self, jscode: str, video_id: str | None, note: str | None = None,
html: str | None = None, cookiejar: YoutubeDLCookieJar | None = None) -> str:
"""
Execute JS code and return stdout from console.log
@param jscode: JS code to execute
@param video_id
@param note
@param html: html to load as document, requires `dom` feature
@param cookiejar: cookiejar to read and set cookies, requires `cookies` feature, pass `InfoExtractor.cookiejar` if you want to read and write cookies
"""
return self._dispatch_request('execute', jscode, video_id, **filter_dict({
'note': note, 'html': html, 'cookiejar': cookiejar}))
class JSI(abc.ABC):
_SUPPORTED_FEATURES: set[str] = set()
_BASE_PREFERENCE: int = 0
def __init__(self, downloader: YoutubeDL, url: str, timeout: float | int, features: set[str], user_agent=None):
if not self._SUPPORTED_FEATURES.issuperset(features):
raise ExtractorError(f'{self.JSI_NAME} does not support all required features: {features}')
self._downloader = downloader
self._url = url
self.timeout = timeout
self.features = features
self.user_agent: str = user_agent or self._downloader.params['http_headers']['User-Agent']
@abc.abstractmethod
def is_available(self) -> bool:
raise NotImplementedError
def write_debug(self, message, *args, **kwargs):
self._downloader.write_debug(f'[{self.JSI_KEY}] {message}', *args, **kwargs)
def report_warning(self, message, *args, **kwargs):
self._downloader.report_warning(f'[{self.JSI_KEY}] {message}', *args, **kwargs)
def to_screen(self, msg, *args, **kwargs):
self._downloader.to_screen(f'[{self.JSI_KEY}] {msg}', *args, **kwargs)
def report_note(self, video_id, note):
self.to_screen(f'{format_field(video_id, None, "%s: ")}{note}')
@classproperty
def JSI_NAME(cls) -> str:
return cls.__name__[:-3]
@classproperty
def JSI_KEY(cls) -> str:
assert cls.__name__.endswith('JSI'), 'JSI class names must end with "JSI"'
return cls.__name__[:-3]
class ExternalJSI(JSI, abc.ABC):
_EXE_NAME: str
@classproperty(cache=True)
def exe_version(cls):
return get_exe_version(cls._EXE_NAME, args=getattr(cls, 'V_ARGS', ['--version']), version_re=r'([0-9.]+)')
@classproperty
def exe(cls):
return cls._EXE_NAME if cls.exe_version else None
@classmethod
def is_available(cls):
return bool(cls.exe)
def register_jsi(jsi_cls: JsiClass) -> JsiClass:
"""Register a JS interpreter class"""
assert issubclass(jsi_cls, JSI), f'{jsi_cls} must be a subclass of JSI'
assert jsi_cls.JSI_KEY not in _JSI_HANDLERS, f'JSI {jsi_cls.JSI_KEY} already registered'
assert jsi_cls._SUPPORTED_FEATURES.issubset(_ALL_FEATURES), f'{jsi_cls._SUPPORTED_FEATURES - _ALL_FEATURES} not declared in `_All_FEATURES`'
_JSI_HANDLERS[jsi_cls.JSI_KEY] = jsi_cls
return jsi_cls
def register_jsi_preference(*handlers: type[JSI]):
assert all(issubclass(handler, JSI) for handler in handlers), f'{handlers} must all be a subclass of JSI'
def outer(pref_func: JSIPreference) -> JSIPreference:
def inner(handler: JSI, *args):
if not handlers or isinstance(handler, handlers):
return pref_func(handler, *args)
return 0
_JSI_PREFERENCES.add(inner)
return inner
return outer
@register_jsi_preference()
def _base_preference(handler: JSI, *args):
return getattr(handler, '_BASE_PREFERENCE', 0)
if typing.TYPE_CHECKING:
from ..YoutubeDL import YoutubeDL
from ..cookies import YoutubeDLCookieJar
JsiClass = typing.TypeVar('JsiClass', bound=type[JSI])
class JSIPreference(typing.Protocol):
def __call__(self, handler: JSI, method_name: str, *args, **kwargs) -> int:
...

View file

@ -6,7 +6,7 @@
import operator import operator
import re import re
from .utils import ( from ..utils import (
NO_DEFAULT, NO_DEFAULT,
ExtractorError, ExtractorError,
function_with_repr, function_with_repr,

View file

@ -1147,6 +1147,11 @@ def _alias_callback(option, opt_str, value, parser, opts, nargs):
'--sleep-subtitles', metavar='SECONDS', '--sleep-subtitles', metavar='SECONDS',
dest='sleep_interval_subtitles', default=0, type=int, dest='sleep_interval_subtitles', default=0, type=int,
help='Number of seconds to sleep before each subtitle download') help='Number of seconds to sleep before each subtitle download')
workarounds.add_option(
'--jsi-preference',
metavar='JSI', dest='jsi_preference', default=[], type='str', action='callback',
callback=_list_from_options_callback,
help='Preferred JS interpreters to use during extraction. Can be given as comma-separated values.')
verbosity = optparse.OptionGroup(parser, 'Verbosity and Simulation Options') verbosity = optparse.OptionGroup(parser, 'Verbosity and Simulation Options')
verbosity.add_option( verbosity.add_option(