1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00
This commit is contained in:
c-basalt 2024-08-02 16:53:30 -04:00
parent dd294fc10e
commit b0ee898da9
2 changed files with 97 additions and 43 deletions

View file

@ -6,6 +6,7 @@
import tempfile
import urllib.parse
from .common import InfoExtractor
from ..utils import (
ExtractorError,
Popen,
@ -46,6 +47,95 @@ def cookie_jar_to_list(cookie_jar):
return [cookie_to_dict(cookie) for cookie in cookie_jar]
class DenoWrapper:
"""Deno wrapper class
This class is experimental.
"""
INSTALL_HINT = 'Please install deno following https://docs.deno.com/runtime/manual/getting_started/installation/ or download its binary from https://github.com/denoland/deno/releases'
_BASE_JS = '''
delete window.Deno;
global = window;
const navProxy = new Proxy(window.navigator, { get: (target, prop, receiver) => ({
appCodeName: 'Mozilla',
appName: 'Netscape',
appVersion: '5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.6533.17 Safari/537.36',
language: 'en',
languages: ['en'],
userAgent: 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/127.0.6533.17 Safari/537.36',
webdriver: false,
}[prop])});
Object.defineProperty(window, "navigator", {get: () => navProxy})
'''
@staticmethod
def _version():
return get_exe_version('deno', version_re=r'([0-9.]+)')
def __init__(self, extractor: InfoExtractor, required_version=None, timeout=10000):
self.extractor = extractor
self.timeout = timeout
self.exe = check_executable('deno', ['-V'])
if not self.exe:
raise ExtractorError(f'Deno not found, {self.INSTALL_HINT}', expected=True)
if required_version:
if is_outdated_version(self._version(), required_version):
self.extractor.report_warning(
f'Deno is outdated, update it to version {required_version} or newer if you encounter any errors.')
@contextlib.contextmanager
def _create_temp_js(self, jscode):
js_file = tempfile.NamedTemporaryFile('wt', encoding='utf-8', suffix='.js', delete=False)
try:
js_file.write(jscode)
js_file.close()
yield js_file
finally:
with contextlib.suppress(OSError):
os.remove(js_file.name)
@staticmethod
def _location_js(location: str):
parsed = urllib.parse.urlparse(location)
return f'''
window.location = {{
href: "{location}",
origin: "{parsed.scheme}://{parsed.netloc}",
host: "{parsed.netloc}",
hostname: "{parsed.netloc.split(':')[0]}",
hash: "{parsed.fragment}",
protocol: "{parsed.scheme}:",
}};
'''
def execute(self, jscode, video_id=None, *, note='Executing JS', allow_net=None, location=None):
"""Execute JS and return stdout"""
if location:
jscode = self._location_js(location) + jscode
with self._create_temp_js(self._BASE_JS + jscode) as js_file:
self.extractor.to_screen(f'{format_field(video_id, None, "%s: ")}{note}')
cmd = [self.exe, 'run', js_file.name]
if allow_net:
cmd.append('--allow-net' if isinstance(allow_net, bool) else f'--allow-net={allow_net}')
self.extractor.write_debug(f'Deno command line: {shell_quote(cmd)}')
try:
stdout, stderr, returncode = Popen.run(cmd, timeout=self.timeout / 1000, text=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
except Exception as e:
raise ExtractorError(f'{note} failed: Unable to run Deno binary', cause=e)
if returncode:
raise ExtractorError(f'{note} failed with returncode {returncode}:\n{stderr}')
elif stderr:
self.extractor.report_warning(f'JS console error msg:\n{stderr.strip()}', video_id=video_id)
return stdout.strip()
class PhantomJSwrapper:
"""PhantomJS wrapper class

View file

@ -1,4 +1,3 @@
import asyncio
import base64
import datetime as dt
import hashlib
@ -8,9 +7,8 @@
import re
import time
from playwright.async_api import async_playwright
from .common import InfoExtractor
from .openload import DenoWrapper
from ..utils import (
ExtractorError,
UserNotLive,
@ -53,11 +51,11 @@ def requestor_query(self):
def jwt_header(self):
return {
'Referer': 'https://rplay.live/',
'Authorization': self.jwt_token or 'null'
'Authorization': self.jwt_token or 'null',
}
def _jwt_encode_hs256(self, payload: dict, key: str):
# ..utils.jwt_encode_hs256() uses slightly different details that would fails
# yt_dlp.utils.jwt_encode_hs256() uses slightly different details that would fails
# and we need to re-implement it with minor changes
b64encode = lambda x: base64.urlsafe_b64encode(
json.dumps(x, separators=(',', ':')).encode()).strip(b'=')
@ -75,7 +73,6 @@ def _perform_login(self, username, password):
'iat': int(time.time()),
}
key = hashlib.sha256(password.encode()).hexdigest()
self._login_by_token(self._jwt_encode_hs256(payload, key).decode())
def _login_by_token(self, jwt_token):
@ -103,55 +100,22 @@ def _get_butter_files(self):
self.cache.store('rplay', 'butter-code', {'js': butter_js, 'wasm': butter_wasm_array, 'date': time.time()})
return butter_js, butter_wasm_array
def _playwright_eval(self, jscode, location='about:blank', body=''):
async def __aeval():
async with async_playwright() as p:
browser = await p.chromium.launch(chromium_sandbox=True)
page = await browser.new_page()
# use page.route to skip network request while allowing changing window.location
await page.route('**', lambda route: route.fulfill(status=200, body=body))
# mock navigator to mimic regular browser
await page.add_init_script('''const proxy = new Proxy(window.navigator, {get(target, prop, receiver) {
if (prop === "webdriver") return false;
if (prop === "appVersion" || prop === "userAgent") return target[prop].replace(/Headless/g, '');
return target[prop];
}});
Object.defineProperty(window, "navigator", {get: ()=> proxy});''')
def _page_eval_js(exp, timeout=10):
return asyncio.wait_for(page.evaluate(exp), timeout=timeout)
try:
await page.goto(location) # always navigate once to trigger init script
start = time.time()
value = await _page_eval_js(jscode)
self.write_debug(f'JS execution finished in {time.time() - start:.3f}s')
return value
except asyncio.TimeoutError:
self.report_warning('PlayWright JS evaluation timed out')
finally:
await browser.close()
try:
return asyncio.run(__aeval())
except asyncio.InvalidStateError:
self.report_warning('PlayWright failed to evaluate JS')
def _calc_butter_token(self):
butter_js, butter_wasm_array = self._get_butter_files()
butter_js = re.sub(r'export(?:\s+default)?([\s{])', r'\1', butter_js)
butter_js = butter_js.replace('import.meta', '{}')
butter_js += '''__new_init = async () => {
butter_js += '''const __new_init = async () => {
const t = __wbg_get_imports();
__wbg_init_memory(t);
const {module, instance} = await WebAssembly.instantiate(Uint8Array.from(%s), t);
__wbg_finalize_init(instance, module);
};''' % butter_wasm_array # noqa: UP031
butter_js += '__new_init().then(() => (new ButterFactory()).generate_butter())'
butter_js += '__new_init().then(() => console.log((new ButterFactory()).generate_butter()));'
# The script checks `navigator.webdriver` and `location.origin` to generate correct token
return self._playwright_eval(butter_js, location='https://rplay.live')
jsi = DenoWrapper(self)
return jsi.execute(butter_js, location='https://rplay.live/')
def get_butter_token(self):
cache = self.cache.load('rplay', 'butter-token') or {}