1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00

split and fixes for phantom

This commit is contained in:
c-basalt 2024-12-30 05:41:09 -05:00
parent 6d622d5481
commit 8c6d01f757
3 changed files with 147 additions and 118 deletions

View file

@ -0,0 +1,71 @@
from __future__ import annotations
import contextlib
import os
import random
import string
import tempfile
class TempFileWrapper:
"""
Wrapper for NamedTemporaryFile, auto closes file after io and deletes file upon wrapper object gc
@param {str | bytes | None} content: content to write to file upon creation
@param {bool} text: whether to open file in text mode
@param {str} encoding: encoding to use for text mode
@param {str | None} suffix: suffix for filename of temporary file
"""
def __init__(self, content: str | bytes | None = None, text: bool = True,
encoding='utf-8', suffix: str | None = None):
self.encoding = None if not text else encoding
self.text = text
self._file = tempfile.NamedTemporaryFile('w' if text else 'wb', encoding=self.encoding,
suffix=suffix, delete=False)
if content:
self._file.write(content)
self._file.close()
@property
def name(self):
return self._file.name
@contextlib.contextmanager
def opened_file(self, mode, *, seek=None, seek_whence=0):
mode = mode if (self.text or 'b' in mode) else mode + 'b'
with open(self._file.name, mode, encoding=self.encoding) as f:
if seek is not None:
self._file.seek(seek, seek_whence)
yield f
def write(self, s, seek=None, seek_whence=0):
"""re-open file in write mode and write, optionally seek to position first"""
with self.opened_file('w', seek=seek, seek_whence=seek_whence) as f:
return f.write(s)
def append_write(self, s, seek=None, seek_whence=0):
"""re-open file in append mode and write, optionally seek to position first"""
with self.opened_file('a', seek=seek, seek_whence=seek_whence) as f:
return f.write(s)
def read(self, n=-1, seek=None, seek_whence=0):
"""re-open file and read, optionally seek to position first"""
with self.opened_file('r', seek=seek, seek_whence=seek_whence) as f:
return f.read(n)
def cleanup(self):
with contextlib.suppress(OSError):
os.remove(self._file.name)
def __del__(self):
self.cleanup()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
def random_string(length: int = 10) -> str:
return ''.join(random.choices(string.ascii_letters, k=length))

View file

@ -4,8 +4,14 @@
import typing
import functools
from ..utils import classproperty, format_field, variadic, ExtractorError
from ..extractor.common import InfoExtractor
from ..utils import (
classproperty,
format_field,
get_exe_version,
variadic,
ExtractorError,
)
_JSI_HANDLERS: dict[str, type[JSI]] = {}
@ -220,6 +226,22 @@ def JSI_KEY(cls) -> str:
return cls.__name__[:-3]
class ExternalJSI(JSI, abc.ABC):
_EXE_NAME: str
@classproperty(cache=True)
def exe_version(cls):
return get_exe_version(cls._EXE_NAME, args=getattr(cls, 'V_ARGS', ['--version']), version_re=r'([0-9.]+)')
@classproperty
def exe(cls):
return cls._EXE_NAME if cls.exe_version else None
@classmethod
def is_available(cls):
return bool(cls.exe)
def register_jsi(jsi_cls: JsiClass) -> JsiClass:
"""Register a JS interpreter class"""
assert issubclass(jsi_cls, JSI), f'{jsi_cls} must be a subclass of JSI'

View file

@ -1,14 +1,11 @@
from __future__ import annotations
import abc
import contextlib
import json
import os
import subprocess
import tempfile
import urllib.parse
import typing
import http.cookiejar
import json
import subprocess
import typing
import urllib.parse
from ..utils import (
@ -16,109 +13,13 @@
Popen,
classproperty,
format_field,
get_exe_version,
int_or_none,
is_outdated_version,
shell_quote,
int_or_none,
unified_timestamp,
)
from .common import JSI, register_jsi
def cookie_to_dict(cookie):
cookie_dict = {
'name': cookie.name,
'value': cookie.value,
}
if cookie.port_specified:
cookie_dict['port'] = cookie.port
if cookie.domain_specified:
cookie_dict['domain'] = cookie.domain
if cookie.path_specified:
cookie_dict['path'] = cookie.path
if cookie.expires is not None:
cookie_dict['expires'] = cookie.expires
if cookie.secure is not None:
cookie_dict['secure'] = cookie.secure
if cookie.discard is not None:
cookie_dict['discard'] = cookie.discard
with contextlib.suppress(TypeError):
if (cookie.has_nonstandard_attr('httpOnly')
or cookie.has_nonstandard_attr('httponly')
or cookie.has_nonstandard_attr('HttpOnly')):
cookie_dict['httponly'] = True
return cookie_dict
def cookie_jar_to_list(cookie_jar):
return [cookie_to_dict(cookie) for cookie in cookie_jar]
class TempFileWrapper:
"""Wrapper for NamedTemporaryFile, auto closes file after io and deletes file upon wrapper object gc"""
def __init__(self, content: str | bytes | None = None, text: bool = True,
encoding='utf-8', suffix: str | None = None):
self.encoding = None if not text else encoding
self.text = text
self._file = tempfile.NamedTemporaryFile('w' if text else 'wb', encoding=self.encoding,
suffix=suffix, delete=False)
if content:
self._file.write(content)
self._file.close()
@property
def name(self):
return self._file.name
@contextlib.contextmanager
def opened_file(self, mode, *, seek=None, seek_whence=0):
mode = mode if (self.text or 'b' in mode) else mode + 'b'
with open(self._file.name, mode, encoding=self.encoding) as f:
if seek is not None:
self._file.seek(seek, seek_whence)
yield f
def write(self, s, seek=None, seek_whence=0):
with self.opened_file('w', seek=seek, seek_whence=seek_whence) as f:
return f.write(s)
def append_write(self, s, seek=None, seek_whence=0):
with self.opened_file('a', seek=seek, seek_whence=seek_whence) as f:
return f.write(s)
def read(self, n=-1, seek=None, seek_whence=0):
with self.opened_file('r', seek=seek, seek_whence=seek_whence) as f:
return f.read(n)
def cleanup(self):
with contextlib.suppress(OSError):
os.remove(self._file.name)
def __del__(self):
self.cleanup()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
class ExternalJSI(JSI, abc.ABC):
_EXE_NAME: str
@classproperty(cache=True)
def exe_version(cls):
return get_exe_version(cls._EXE_NAME, args=getattr(cls, 'V_ARGS', ['--version']), version_re=r'([0-9.]+)')
@classproperty
def exe(cls):
return cls._EXE_NAME if cls.exe_version else None
@classmethod
def is_available(cls):
return bool(cls.exe)
from ._helper import TempFileWrapper, random_string
from .common import ExternalJSI, register_jsi
@register_jsi
@ -360,9 +261,6 @@ class PhantomJSJSI(ExternalJSI):
var fs = require('fs');
var read = {{ mode: 'r', charset: 'utf-8' }};
var write = {{ mode: 'w', charset: 'utf-8' }};
JSON.parse(fs.read({cookies_fn}, read)).forEach(function(x) {{
phantom.addCookie(x);
}});
page.settings.resourceTimeout = {timeout};
page.settings.userAgent = {ua};
page.onLoadStarted = function() {{
@ -381,20 +279,41 @@ class PhantomJSJSI(ExternalJSI):
page.setContent(fs.read({html_fn}, read), {url});
}}
else {{
JSON.parse(fs.read({cookies_fn}, read)).forEach(function(x) {{
phantom.addCookie(x);
}});
{jscode}
}}
}};
page.open("");
'''
def _save_cookies(self, url, cookiejar):
cookies = cookie_jar_to_list(cookiejar) if cookiejar else []
for cookie in cookies:
if 'path' not in cookie:
cookie['path'] = '/'
if 'domain' not in cookie:
cookie['domain'] = urllib.parse.urlparse(url).netloc
return json.dumps(cookies)
def _save_cookies(self, url, cookiejar: YoutubeDLCookieJar | None):
def _cookie_to_dict(cookie: http.cookiejar.Cookie):
cookie_dict = {
'name': cookie.name,
'value': cookie.value,
'port': cookie.port,
'domain': cookie.domain,
'path': cookie.path or '/',
'expires': int_or_none(cookie.expires, invscale=1000),
'secure': cookie.secure,
'discard': cookie.discard,
}
if not cookie_dict['domain']:
cookie_dict['domain'] = urllib.parse.urlparse(url).hostname
cookie_dict['port'] = urllib.parse.urlparse(url).port
for key in [key for key, value in cookie_dict.items() if value is None]:
cookie_dict.pop(key)
with contextlib.suppress(TypeError):
if (cookie.has_nonstandard_attr('httpOnly')
or cookie.has_nonstandard_attr('httponly')
or cookie.has_nonstandard_attr('HttpOnly')):
cookie_dict['httponly'] = True
return cookie_dict
cookies = cookiejar.get_cookies_for_url(url) if cookiejar else []
return json.dumps([_cookie_to_dict(cookie) for cookie in cookies])
def _load_cookies(self, cookies_json: str, cookiejar):
if not cookiejar:
@ -454,6 +373,23 @@ def _execute_html(self, jscode: str, url: str, html: str, cookiejar, video_id=No
def execute(self, jscode, video_id=None,
note='Executing JS in PhantomJS', location=None, html='', cookiejar=None):
if location:
jscode = '''console.log(page.evaluate(function() {
var %(std_var)s = [];
console.log = function() {
var values = '';
for (var i = 0; i < arguments.length; i++) {
values += arguments[i] + ' ';
}
%(std_var)s.push(values);
}
%(jscode)s;
return %(std_var)s.join('\\n');
}));
saveAndExit();''' % {
'std_var': f'__stdout__values_{random_string()}',
'jscode': jscode,
}
return self._execute_html(jscode, location, html, cookiejar, video_id=video_id, note=note)[1]
if html:
self.report_warning('`location` is required to use `html`')