1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00
yt-dlp/yt_dlp/extractor/rplay.py
2024-08-02 12:33:39 -04:00

394 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import base64
import datetime as dt
import hashlib
import hmac
import json
import random
import re
import time
from playwright.async_api import async_playwright
from .common import InfoExtractor
from ..utils import (
ExtractorError,
UserNotLive,
encode_data_uri,
float_or_none,
parse_iso8601,
parse_qs,
traverse_obj,
url_or_none,
)
class RPlayBaseIE(InfoExtractor):
_NETRC_MACHINE = 'rplaylive'
_TOKEN_CACHE = {}
_user_id = None
_login_type = None
_jwt_token = None
@property
def user_id(self):
return self._user_id
@property
def login_type(self):
return self._login_type
@property
def jwt_token(self):
return self._jwt_token
@property
def requestor_query(self):
return {
'requestorOid': self.user_id,
'loginType': self.login_type,
} if self.user_id else {}
@property
def jwt_header(self):
return {
'Referer': 'https://rplay.live/',
'Authorization': self.jwt_token or 'null'
}
def _jwt_encode_hs256(self, payload: dict, key: str):
# ..utils.jwt_encode_hs256() uses slightly different details that would fails
# and we need to re-implement it with minor changes
b64encode = lambda x: base64.urlsafe_b64encode(
json.dumps(x, separators=(',', ':')).encode()).strip(b'=')
header_b64 = b64encode({'alg': 'HS256', 'typ': 'JWT'})
payload_b64 = b64encode(payload)
h = hmac.new(key.encode(), header_b64 + b'.' + payload_b64, hashlib.sha256)
signature_b64 = base64.urlsafe_b64encode(h.digest()).strip(b'=')
return header_b64 + b'.' + payload_b64 + b'.' + signature_b64
def _perform_login(self, username, password):
payload = {
'eml': username,
'dat': dt.datetime.now(dt.timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z'),
'iat': int(time.time()),
}
key = hashlib.sha256(password.encode()).hexdigest()
self._login_by_token(self._jwt_encode_hs256(payload, key).decode())
def _login_by_token(self, jwt_token):
user_info = self._download_json(
'https://api.rplay.live/account/login', 'login', note='performing login', errnote='Failed to login',
data=f'{{"token":"{jwt_token}","loginType":null,"checkAdmin":null}}'.encode(),
headers={'Content-Type': 'application/json', 'Authorization': 'null'}, fatal=False)
if user_info:
self._user_id = traverse_obj(user_info, 'oid')
self._login_type = traverse_obj(user_info, 'accountType')
self._jwt_token = jwt_token if self._user_id else None
if not self._user_id:
self.report_warning('Failed to login, possibly due to wrong password or website change')
def _get_butter_files(self):
cache = self.cache.load('rplay', 'butter-code') or {}
if cache.get('date', 0) > time.time() - 86400:
return cache['js'], cache['wasm']
butter_js = self._download_webpage(
'https://pb.rplay.live/kr/public/smooth_like_butter.js', 'butter', 'getting butter-sign js')
urlh = self._request_webpage(
'https://pb.rplay.live/kr/public/smooth_like_butter_bg.wasm', 'butter', 'getting butter-sign wasm')
butter_wasm_array = list(urlh.read())
self.cache.store('rplay', 'butter-code', {'js': butter_js, 'wasm': butter_wasm_array, 'date': time.time()})
return butter_js, butter_wasm_array
def _playwright_eval(self, jscode, location='about:blank', body=''):
async def __aeval():
async with async_playwright() as p:
browser = await p.chromium.launch(chromium_sandbox=True)
page = await browser.new_page()
# use page.route to skip network request while allowing changing window.location
await page.route('**', lambda route: route.fulfill(status=200, body=body))
# mock navigator to mimic regular browser
await page.add_init_script('''const proxy = new Proxy(window.navigator, {get(target, prop, receiver) {
if (prop === "webdriver") return false;
if (prop === "appVersion" || prop === "userAgent") return target[prop].replace(/Headless/g, '');
return target[prop];
}});
Object.defineProperty(window, "navigator", {get: ()=> proxy});''')
def _page_eval_js(exp, timeout=10):
return asyncio.wait_for(page.evaluate(exp), timeout=timeout)
try:
await page.goto(location) # always navigate once to trigger init script
start = time.time()
value = await _page_eval_js(jscode)
self.write_debug(f'JS execution finished in {time.time() - start:.3f}s')
return value
except asyncio.TimeoutError:
self.report_warning('PlayWright JS evaluation timed out')
finally:
await browser.close()
try:
return asyncio.run(__aeval())
except asyncio.InvalidStateError:
self.report_warning('PlayWright failed to evaluate JS')
def _calc_butter_token(self):
butter_js, butter_wasm_array = self._get_butter_files()
butter_js = re.sub(r'export(?:\s+default)?([\s{])', r'\1', butter_js)
butter_js = butter_js.replace('import.meta', '{}')
butter_js += '''__new_init = async () => {
const t = __wbg_get_imports();
__wbg_init_memory(t);
const {module, instance} = await WebAssembly.instantiate(Uint8Array.from(%s), t);
__wbg_finalize_init(instance, module);
};''' % butter_wasm_array # noqa: UP031
butter_js += '__new_init().then(() => (new ButterFactory()).generate_butter())'
# The script checks `navigator.webdriver` and `location.origin` to generate correct token
return self._playwright_eval(butter_js, location='https://rplay.live')
def get_butter_token(self):
cache = self.cache.load('rplay', 'butter-token') or {}
timestamp = str(int(time.time() / 360))
if cache.get(timestamp):
return cache[timestamp]
token = self._calc_butter_token()
self.cache.store('rplay', 'butter-token', {timestamp: token})
return token
class RPlayVideoIE(RPlayBaseIE):
_VALID_URL = r'https://rplay.live/play/(?P<id>[\d\w]+)'
_TESTS = [{
'url': 'https://rplay.live/play/669203d25223214e67579dc3/',
'info_dict': {
'id': '669203d25223214e67579dc3',
'ext': 'mp4',
'title': 'md5:6ab0a76410b40b1f5fb48a2ad7571264',
'description': 'md5:d2fb2f74a623be439cf454df5ff3344a',
'timestamp': 1720845266,
'upload_date': '20240713',
'release_timestamp': 1720846360,
'release_date': '20240713',
'duration': 5349.0,
'thumbnail': r're:https://[\w\d]+.cloudfront.net/.*',
'uploader': '杏都める',
'uploader_id': '667adc9e9aa7f739a2158ff3',
'tags': ['杏都める', 'めいどるーちぇ', '無料', '耳舐め', 'ASMR'],
},
'params': {'cachedir': False},
}, {
'url': 'https://rplay.live/play/660bee4fd3c1d09d69db6870/',
'info_dict': {
'id': '660bee4fd3c1d09d69db6870',
'ext': 'mp4',
'title': 'md5:7de162a0f1c2266ec428234620a124fc',
'description': 'md5:c6d12cc8110b748d5588d5f00787cd35',
'timestamp': 1712057935,
'upload_date': '20240402',
'release_timestamp': 1712061900,
'release_date': '20240402',
'duration': 6791.0,
'thumbnail': r're:https://[\w\d]+.cloudfront.net/.*',
'uploader': '狐月れんげ',
'uploader_id': '65eeb4b237043dc0b5654f86',
'tags': 'count:10',
'age_limit': 18,
},
}]
def _real_extract(self, url):
video_id = self._match_id(url)
playlist_id = traverse_obj(parse_qs(url), ('playlist', ..., any))
if playlist_id and self._yes_playlist(playlist_id, video_id):
playlist_info = self._download_json(
'https://api.rplay.live/content/playlist', playlist_id,
query={'playlistOid': playlist_id, **self.requestor_query},
headers=self.jwt_header, fatal=False)
if playlist_info:
entries = traverse_obj(playlist_info, ('contentData', ..., '_id', {
lambda x: self.url_result(f'https://rplay.live/play/{x}/', ie=RPlayVideoIE, video_id=x)}))
return self.playlist_result(entries, playlist_id, playlist_info.get('name'))
else:
self.report_warning('Failed to get playlist, downloading video only')
video_info = self._download_json('https://api.rplay.live/content', video_id, query={
'contentOid': video_id,
'status': 'published',
'withComments': True,
'requestCanView': True,
**self.requestor_query,
}, headers=self.jwt_header)
if video_info.get('drm'):
raise ExtractorError('This video is DRM-protected')
metainfo = traverse_obj(video_info, {
'title': ('title', {str}),
'description': ('introText', {str}),
'release_timestamp': ('publishedAt', {parse_iso8601}),
'timestamp': ('createdAt', {parse_iso8601}),
'duration': ('length', {float_or_none}),
'uploader': ('nickname', {str}),
'uploader_id': ('creatorOid', {str}),
'tags': ('hashtags', lambda _, v: v[0] != '_'),
'age_limit': (('hideContent', 'isAdultContent'), {lambda x: 18 if x else None}, any),
})
m3u8_url = traverse_obj(video_info, ('canView', 'url', {url_or_none}))
if not m3u8_url:
msg = 'You do not have access to this video'
if traverse_obj(video_info, ('viewableTiers', 'free')):
msg = 'This video requires a free subscription to access'
if not self.user_id:
msg += f'. {self._login_hint(method="password")}'
raise ExtractorError(msg, expected=True)
thumbnail_key = traverse_obj(video_info, (
'streamables', lambda _, v: v['type'].startswith('image/'), 's3key', any))
if thumbnail_key:
metainfo['thumbnail'] = url_or_none(self._download_webpage(
'https://api.rplay.live/upload/privateasset', video_id, 'getting cover url', query={
'key': thumbnail_key,
'contentOid': video_id,
'creatorOid': metainfo.get('uploader_id'),
**self.requestor_query,
}, fatal=False))
formats = self._extract_m3u8_formats(m3u8_url, video_id, headers={
'Referer': 'https://rplay.live/', 'Butter': self.get_butter_token()})
for fmt in formats:
m3u8_doc = self._download_webpage(fmt['url'], video_id, 'getting m3u8 contents', headers={
'Referer': 'https://rplay.live/', 'Butter': self.get_butter_token()})
fmt['url'] = encode_data_uri(m3u8_doc.encode(), 'application/x-mpegurl')
match = re.search(r'^#EXT-X-KEY.*?URI="([^"]+)"', m3u8_doc, flags=re.M)
if match:
urlh = self._request_webpage(match[1], video_id, 'getting hls key', headers={
'Referer': 'https://rplay.live/',
'rplay-private-content-requestor': self.user_id or 'not-logged-in',
'age': random.randint(1, 4999),
})
fmt['hls_aes'] = {'key': urlh.read().hex()}
return {
'id': video_id,
'formats': formats,
**metainfo,
'http_headers': {'Referer': 'https://rplay.live/'},
}
class RPlayUserIE(InfoExtractor):
_VALID_URL = r'https://rplay.live/(?P<short>c|creatorhome)/(?P<id>[\d\w]+)/?(?:[#?]|$)'
_TESTS = [{
'url': 'https://rplay.live/creatorhome/667adc9e9aa7f739a2158ff3?page=contents',
'info_dict': {
'id': '667adc9e9aa7f739a2158ff3',
'title': '杏都める',
},
'playlist_mincount': 34,
}, {
'url': 'https://rplay.live/c/furachi?page=contents',
'info_dict': {
'id': '65e07e60850f4527aab74757',
'title': '逢瀬ふらち OuseFurachi',
},
'playlist_mincount': 77,
}]
def _real_extract(self, url):
user_id, short = self._match_valid_url(url).group('id', 'short')
key = 'customUrl' if short == 'c' else 'userOid'
user_info = self._download_json(
f'https://api.rplay.live/account/getuser?{key}={user_id}&filter[]=nickname&filter[]=published', user_id)
replays = self._download_json(
'https://api.rplay.live/live/replays?=667e4cd99aa7f739a2c91852', user_id, query={
'creatorOid': user_info.get('_id')})
entries = traverse_obj(user_info, ('published', ..., {
lambda x: self.url_result(f'https://rplay.live/play/{x}/', ie=RPlayVideoIE, video_id=x)}))
for entry_id in traverse_obj(replays, (..., '_id', {str})):
if entry_id in user_info.get('published', []):
continue
entries.append(self.url_result(f'https://rplay.live/play/{entry_id}/', ie=RPlayVideoIE, video_id=entry_id))
return self.playlist_result(entries, user_info.get('_id', user_id), user_info.get('nickname'))
class RPlayLiveIE(RPlayBaseIE):
_VALID_URL = [
r'https://rplay.live/(?P<short>c)/(?P<id>[\d\w]+)/live',
r'https://rplay.live/(?P<short>live)/(?P<id>[\d\w]+)',
]
_TESTS = [{
'url': 'https://rplay.live/c/chachamaru/live',
'info_dict': {
'id': '667e4cd99aa7f739a2c91852',
'ext': 'mp4',
'title': r're:【ASMR】んっやば//スキスキ耐久.*',
'description': 'md5:7f88ac0a7a3d5d0b926a0baecd1d40e1',
'timestamp': 1721739947,
'upload_date': '20240723',
'live_status': 'is_live',
'thumbnail': 'https://pb.rplay.live/liveChannelThumbnails/667e4cd99aa7f739a2c91852',
'uploader': '愛犬茶々丸',
'uploader_id': '667e4cd99aa7f739a2c91852',
'tags': 'count:9',
},
'skip': 'live',
}, {
'url': 'https://rplay.live/live/667adc9e9aa7f739a2158ff3',
'only_matching': True,
}]
def _real_extract(self, url):
user_id, short = self._match_valid_url(url).group('id', 'short')
if short == 'c':
user_info = self._download_json(f'https://api.rplay.live/account/getuser?customUrl={user_id}', user_id)
user_id = user_info['_id']
else:
user_info = self._download_json(f'https://api.rplay.live/account/getuser?userOid={user_id}', user_id)
live_info = self._download_json('https://api.rplay.live/live/play', user_id, query={'creatorOid': user_id})
stream_state = live_info['streamState']
if stream_state == 'youtube':
return self.url_result(f'https://www.youtube.com/watch?v={live_info["liveStreamId"]}')
elif stream_state == 'live':
if not self.user_id and not live_info.get('allowAnonymous'):
self.raise_login_required(method='password')
key2 = self._download_webpage(
'https://api.rplay.live/live/key2', user_id, 'getting live key',
headers=self.jwt_header, query=self.requestor_query) if self.user_id else ''
formats = self._extract_m3u8_formats(
'https://api.rplay.live/live/stream/playlist.m3u8', user_id,
query={'creatorOid': user_id, 'key2': key2})
return {
'id': user_id,
'formats': formats,
'is_live': True,
'http_headers': {'Referer': 'https://rplay.live'},
'thumbnail': f'https://pb.rplay.live/liveChannelThumbnails/{user_id}',
'uploader': traverse_obj(user_info, ('nickname', {str})),
'uploader_id': user_id,
**traverse_obj(live_info, {
'title': ('title', {str}),
'description': ('description', {str}),
'timestamp': ('streamStartTime', {parse_iso8601}),
'tags': ('hashtags', ..., {str}),
'age_limit': ('isAdultContent', {lambda x: 18 if x else None}),
}),
}
elif stream_state == 'offline':
raise UserNotLive
else:
raise ExtractorError(f'Unknow streamState: {stream_state}')