1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00
yt-dlp/yt_dlp/extractor/sheeta.py

857 lines
36 KiB
Python
Raw Normal View History

import base64
2025-01-30 02:30:35 -06:00
import enum
import functools
import hashlib
import json
import random
import re
import string
import urllib.parse
from .common import InfoExtractor
from ..networking.exceptions import HTTPError
from ..utils import (
ExtractorError,
OnDemandPagedList,
filter_dict,
get_domain,
int_or_none,
parse_qs,
unified_timestamp,
2024-10-15 11:06:22 -05:00
update_url_query,
url_or_none,
urlencode_postdata,
urljoin,
)
2024-10-15 11:06:22 -05:00
from ..utils.traversal import traverse_obj
2025-01-30 02:30:35 -06:00
class AuthType(enum.Enum):
AUTH0 = 'auth0'
NICONICO = 'niconico'
2025-01-29 06:18:44 -06:00
class AuthManager:
2025-01-29 23:53:40 -06:00
_AUTH_INFO_CACHE = {}
_AUTH0_BASE64_TRANS = str.maketrans({
'+': '-',
'/': '_',
'=': None,
})
2025-01-29 06:18:44 -06:00
def __init__(self, ie: 'SheetaEmbedIE'):
self._ie = ie
self._auth_info = {}
@property
def _auth_info(self):
2025-01-29 23:53:40 -06:00
if not self._AUTH_INFO_CACHE.get(self._ie._DOMAIN):
self._AUTH_INFO_CACHE[self._ie._DOMAIN] = {}
return self._AUTH_INFO_CACHE.get(self._ie._DOMAIN)
2025-01-29 06:18:44 -06:00
@_auth_info.setter
def _auth_info(self, value):
2025-01-29 23:53:40 -06:00
if not self._AUTH_INFO_CACHE.get(self._ie._DOMAIN):
self._AUTH_INFO_CACHE[self._ie._DOMAIN] = {}
self._AUTH_INFO_CACHE[self._ie._DOMAIN].update(value)
2025-01-29 06:18:44 -06:00
def _get_authed_info(self, query_path, item_id, dict_path, expected_code_msg, **query_kwargs):
try:
res = self._ie._call_api(query_path, item_id, **query_kwargs)
return traverse_obj(res, dict_path)
except ExtractorError as e:
if not isinstance(e.cause, HTTPError) or e.cause.status not in expected_code_msg:
raise e
self._ie.raise_login_required(
expected_code_msg[e.cause.status], metadata_available=True,
method=self._auth_info.get('login_method'))
return None
def _get_auth_token(self):
if not self._auth_info.get('auth_token'):
try:
self._login()
return self._auth_info.get('auth_token')
except Exception as e:
raise ExtractorError('Unable to login due to unknown reasons') from e
if self._auth_info.get('auth_token'):
try:
self._refresh_token()
return self._auth_info.get('auth_token')
except Exception as e:
raise ExtractorError('Unable to refresh token due to unknown reasons') from e
return None
def _refresh_token(self):
2025-01-30 02:30:35 -06:00
if not (refresh_func_params := self._auth_info.get('refresh_func_params')):
2025-01-29 06:18:44 -06:00
return False
2025-01-30 02:30:35 -06:00
if self._auth_info.get('auth_type') == AuthType.AUTH0:
refresh_func_params['data'] = urlencode_postdata({
**refresh_func_params['data'],
'refresh_token': self._auth_info.get('refresh_token'),
})
2025-01-29 06:18:44 -06:00
res = self._ie._download_json(
2025-01-30 02:30:35 -06:00
**refresh_func_params, expected_status=(400, 403, 404),
2025-01-29 06:18:44 -06:00
note='Refreshing token', errnote='Unable to refresh token')
if error := traverse_obj(
res, ('error', 'message', {lambda x: base64.b64decode(x).decode()}), ('error', 'message')):
self._ie.report_warning(f'Unable to refresh token: {error!r}')
elif token := traverse_obj(res, ('data', 'access_token', {str})):
# niconico
self._auth_info = {'auth_token': f'Bearer {token}'}
return True
elif token := traverse_obj(res, ('access_token', {str})):
# auth0
self._auth_info = {'auth_token': f'Bearer {token}'}
if refresh_token := traverse_obj(res, ('refresh_token', {str})):
self._auth_info = {'refresh_token': refresh_token}
self._ie.cache.store(
self._ie._NETRC_MACHINE, self._auth_info['cache_key'], {self._auth_info['cache_name']: refresh_token})
return True
self._ie.report_warning('Unable to find new refresh_token')
else:
self._ie.report_warning('Unable to refresh token')
return False
def _login(self):
social_login_providers = traverse_obj(self._ie._call_api(
f'fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/login', None),
('data', 'fanclub_group', 'fanclub_social_login_providers', ..., {dict})) or []
self._ie.write_debug(f'social_login_providers = {social_login_providers!r}')
for provider in social_login_providers:
provider_name = traverse_obj(provider, ('social_login_provider', 'provider_name', {str}))
if provider_name == 'ニコニコ':
redirect_url = update_url_query(provider['url'], {
'client_id': 'FCS{:05d}'.format(provider['id']),
'redirect_uri': f'https://{self._ie._DOMAIN}/login',
})
refresh_url = f'{self._ie._API_BASE_URL}/fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/auth/refresh'
return self._niconico_sns_login(redirect_url, refresh_url)
else:
raise ExtractorError(f'Unsupported social login provider: {provider_name}')
return self._auth0_login()
def _niconico_sns_login(self, redirect_url, refresh_url):
2025-01-30 02:30:35 -06:00
self._auth_info = {'login_method': 'any', 'auth_type': AuthType.NICONICO}
2025-01-29 06:18:44 -06:00
mail_tel, password = self._ie._get_login_info()
if not mail_tel:
return
cache_key = f'{self._ie._DOMAIN}:{mail_tel}'
2025-01-29 06:18:44 -06:00
self._auth_info = {'cache_key': cache_key}
cache_name = 'niconico_sns'
if cached_cookies := traverse_obj(self._ie.cache.load(
self._ie._NETRC_MACHINE, cache_key), (cache_name, {dict})):
for name, value in cached_cookies.items():
self._ie._set_cookie(get_domain(redirect_url), name, value)
if not (auth_token := self._niconico_get_token_by_cookies(redirect_url)):
if cached_cookies:
self._ie.cache.store(self._ie._NETRC_MACHINE, cache_key, None)
self._niconico_login(mail_tel, password)
if not (auth_token := self._niconico_get_token_by_cookies(redirect_url)):
self._ie.report_warning('Unable to get token after login, please check if '
'niconico channel plus is authorized to use your niconico account')
return
self._auth_info = {
2025-01-30 02:30:35 -06:00
'refresh_func_params': {
'url_or_request': refresh_url,
2025-01-29 06:18:44 -06:00
'video_id': None,
2025-01-30 02:30:35 -06:00
'headers': {'Authorization': auth_token},
2025-01-29 06:18:44 -06:00
'data': b'',
},
'auth_token': auth_token,
}
cookies = dict(traverse_obj(self._ie.cookiejar.get_cookies_for_url(
redirect_url), (..., {lambda item: (item.name, item.value)})))
self._ie.cache.store(self._ie._NETRC_MACHINE, cache_key, {cache_name: cookies})
def _niconico_get_token_by_cookies(self, redirect_url):
urlh = self._ie._request_webpage(
redirect_url, None, note='Getting niconico auth status',
expected_status=404, errnote='Unable to get niconico auth status')
if not urlh.url.startswith(f'https://{self._DOMAIN}/login'):
return None
if not (sns_login_code := traverse_obj(parse_qs(urlh.url), ('code', 0))):
self._ie.report_warning('Unable to get sns login code')
return None
token = traverse_obj(self._ie._call_api(
f'fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/sns_login', None, fatal=False,
note='Fetching sns login info', errnote='Unable to fetch sns login info',
data=json.dumps({
'key_cloak_user': {
'code': sns_login_code,
'redirect_uri': f'https://{self._ie._DOMAIN}/login',
},
'fanclub_site': {'id': int(self._ie._FANCLUB_SITE_ID_AUTH)},
}).encode(), headers={
'Content-Type': 'application/json',
'fc_use_device': 'null',
'Referer': f'https://{self._ie._DOMAIN}',
}), ('data', 'access_token', {str}))
if token:
return f'Bearer {token}'
self._ie.report_warning('Unable to get token from sns login info')
return None
def _niconico_login(self, mail_tel, password):
login_form_strs = {
'mail_tel': mail_tel,
'password': password,
}
page, urlh = self._ie._download_webpage_handle(
'https://account.nicovideo.jp/login/redirector', None,
note='Logging into niconico', errnote='Unable to log into niconico',
data=urlencode_postdata(login_form_strs),
headers={
'Referer': 'https://account.nicovideo.jp/login',
'Content-Type': 'application/x-www-form-urlencoded',
})
if urlh.url.startswith('https://account.nicovideo.jp/login'):
self._ie.report_warning('Unable to log in: bad username or password')
return False
elif urlh.url.startswith('https://account.nicovideo.jp/mfa'):
post_url = self._ie._search_regex(
r'<form[^>]+action=(["\'])(?P<url>.+?)\1', page, 'mfa post url', group='url')
page, urlh = self._ie._download_webpage_handle(
urljoin('https://account.nicovideo.jp/', post_url), None,
note='Performing MFA', errnote='Unable to complete MFA',
data=urlencode_postdata({
'otp': self._ie._get_tfa_info('6 digits code'),
}), headers={
'Content-Type': 'application/x-www-form-urlencoded',
})
if urlh.url.startswith('https://account.nicovideo.jp/mfa') or 'formError' in page:
err_msg = self._ie._html_search_regex(
r'formError\b[^>]*>(.*?)</div>', page, 'form_error',
default='There\'s an error but the message can\'t be parsed.',
flags=re.DOTALL)
self._ie.report_warning(f'Unable to log in: MFA challenge failed, "{err_msg}"')
return False
return True
def _auth0_login(self):
2025-01-30 02:30:35 -06:00
self._auth_info = {'login_method': 'password', 'auth_type': AuthType.AUTH0}
2025-01-29 06:18:44 -06:00
username, password = self._ie._get_login_info()
if not username:
return
cache_key = f'{self._ie._DOMAIN}:{username}'
2025-01-29 06:18:44 -06:00
cache_name = 'refresh'
self._auth_info = {
'cache_key': cache_key,
'cache_name': cache_name,
}
login_info = self._ie._call_api(f'fanclub_sites/{self._ie._FANCLUB_SITE_ID_AUTH}/login', None)['data']['fanclub_site']
2025-01-29 06:18:44 -06:00
self._ie.write_debug(f'login_info = {login_info}')
auth0_web_client_id = login_info['auth0_web_client_id']
auth0_domain = login_info['fanclub_group']['auth0_domain']
2025-01-29 06:18:44 -06:00
token_url = f'https://{auth0_domain}/oauth/token'
redirect_url = f'https://{self._ie._DOMAIN}/login/login-redirect'
auth0_client = base64.b64encode(json.dumps({
'name': 'auth0-spa-js',
'version': '2.0.6',
}).encode()).decode()
2025-01-30 02:30:35 -06:00
self._auth_info = {'refresh_func_params': {
2025-01-29 06:18:44 -06:00
'url_or_request': token_url,
'video_id': None,
'headers': {'Auth0-Client': auth0_client},
2025-01-30 02:30:35 -06:00
'data': {
2025-01-29 06:18:44 -06:00
'client_id': auth0_web_client_id,
'grant_type': 'refresh_token',
'redirect_uri': redirect_url,
2025-01-30 02:30:35 -06:00
}}}
2025-01-29 06:18:44 -06:00
def random_str():
return ''.join(random.choices(string.digits + string.ascii_letters, k=43))
state = base64.b64encode(random_str().encode())
nonce = base64.b64encode(random_str().encode())
code_verifier = random_str().encode()
code_challenge = base64.b64encode(
2025-01-29 23:53:40 -06:00
hashlib.sha256(code_verifier).digest()).decode().translate(self._AUTH0_BASE64_TRANS)
2025-01-29 06:18:44 -06:00
authorize_url = update_url_query(f'https://{auth0_domain}/authorize', {
'client_id': auth0_web_client_id,
'scope': 'openid profile email offline_access',
'redirect_uri': redirect_url,
'audience': f'api.{self._ie._DOMAIN}',
'prompt': 'login',
'response_type': 'code',
'response_mode': 'query',
'state': state,
'nonce': nonce,
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
'auth0Client': auth0_client,
})
if cached_refresh_token := traverse_obj(self._ie.cache.load(
self._ie._NETRC_MACHINE, cache_key), (cache_name, {str})):
self._auth_info = {'refresh_token': cached_refresh_token}
if self._refresh_token():
self._ie.write_debug('cached tokens updated')
return
self._ie.cache.store(self._ie._NETRC_MACHINE, cache_key, None)
login_form = self._ie._hidden_inputs(self._ie._download_webpage(
authorize_url, None, note='Getting login form', errnote='Unable to get login form'))
state_obtained = login_form['state']
login_url = f'https://{auth0_domain}/u/login?state={state_obtained}'
login_form.update({
'username': username,
'password': password,
'action': 'default',
})
urlh = self._ie._request_webpage(
login_url, None, note='Logging in', errnote='Unable to log in',
data=urlencode_postdata(login_form), expected_status=(400, 404))
if urlh.status == 400:
self._ie.report_warning('Unable to log in: bad username or password')
return
if not (urlh.status == 404 and urlh.url.startswith(redirect_url)):
self._ie.report_warning('Unable to log in: Unknown login status')
return
code = parse_qs(urlh.url)['code'][0]
2025-01-29 06:18:44 -06:00
token_json = self._ie._download_json(
token_url, None, headers={'Auth0-Client': auth0_client},
note='Getting auth0 tokens', errnote='Unable to get auth0 tokens',
data=urlencode_postdata({
'client_id': auth0_web_client_id,
'code_verifier': code_verifier,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': redirect_url,
}))
access_token = token_json['access_token']
refresh_token = token_json['refresh_token']
2025-01-29 06:18:44 -06:00
auth_token = f'Bearer {access_token}'
self._auth_info = {
'auth_token': auth_token,
'refresh_token': refresh_token,
}
self._ie.cache.store(self._NETRC_MACHINE, cache_key, {cache_name: refresh_token})
class SheetaEmbedIE(InfoExtractor):
_NETRC_MACHINE = 'sheeta'
IE_NAME = 'sheeta'
IE_DESC = 'fan club system developed by DWANGO (ドワンゴ)'
_VALID_URL = False
_WEBPAGE_TESTS = [{
'url': 'https://qlover.jp/doku/video/smy4caVHR6trSddiG9uCDiy4',
'info_dict': {
'id': 'smy4caVHR6trSddiG9uCDiy4',
'title': '名取さなの毒にも薬にもならないラジオ#39',
'ext': 'mp4',
'channel': '名取さなの毒にも薬にもならないラジオ',
'channel_id': 'qlover.jp/doku',
'channel_url': 'https://qlover.jp/doku',
'age_limit': 0,
'live_status': 'not_live',
'thumbnail': str,
'description': 'md5:75c2143a59b4b70141b77ddb485991fd',
'timestamp': 1711933200,
'duration': 1872,
'comment_count': int,
'view_count': int,
'tags': ['名取さな', 'どくラジ', '文化放送', 'ラジオ'],
'upload_date': '20240401',
},
'params': {
'skip_download': True,
},
}, {
'url': 'https://itomiku-fc.jp/live/sm4P8x6oVPFBx59bNBGSgKoE',
'info_dict': {
'id': 'sm4P8x6oVPFBx59bNBGSgKoE',
'title': '【3/9(土)14:00】「all yours」美来の日SP♪',
'ext': 'mp4',
'channel': '伊藤美来 Official Fanclub 「all yours」',
'channel_id': 'itomiku-fc.jp',
'channel_url': 'https://itomiku-fc.jp',
'age_limit': 0,
'live_status': 'was_live',
'thumbnail': str,
'description': 'md5:80a6a14db30d8506f70bec6a28a6c4ad',
'timestamp': 1709964399,
'duration': 4542,
'comment_count': int,
'view_count': int,
'tags': ['生放送', '生放送アーカイブ'],
'upload_date': '20240309',
'release_timestamp': 1709959800,
'release_date': '20240309',
},
2024-10-15 11:06:22 -05:00
'params': {'skip_download': True},
}, {
'url': 'https://canan8181.com/video/smxar9atjfNBn27bHhcTFLyg',
'info_dict': {
'id': 'smxar9atjfNBn27bHhcTFLyg',
'title': '💛【7月】ドネートお礼しながら感想どきどきトーク【感想会】',
'ext': 'mp4',
'channel': 'Canan official fanclub',
'channel_id': 'canan8181.com',
'channel_url': 'https://canan8181.com',
'age_limit': 15,
'live_status': 'was_live',
'thumbnail': str,
'description': 'md5:0cd80e51da82dbb89deae5ea14aad24d',
'timestamp': 1659182206,
'duration': 6997,
'comment_count': int,
'view_count': int,
'tags': ['安眠など♡アーカイブ&動画(わらのおうちプラン以上)'],
'upload_date': '20220730',
'release_timestamp': 1659175200,
'release_date': '20220730',
},
2024-10-15 11:06:22 -05:00
'params': {'skip_download': True},
}, {
'url': 'https://11audee.jp/audio/smx3ebEZFRnHeaGzUzgi5A98',
'info_dict': {
'id': 'smx3ebEZFRnHeaGzUzgi5A98',
'title': '#相坂湯 第38回 ロコムジカちゃんの歌唱についてモノ申す!? ある意味レアな?鼻声坂くん!',
'ext': 'm4a',
'channel': '相坂優歌 湯上がり何飲む?',
'channel_id': '11audee.jp',
'channel_url': 'https://11audee.jp',
'age_limit': 0,
'live_status': 'not_live',
'thumbnail': str,
'description': 'md5:fdf881191f8057aa6af6042fc17fb94c',
'timestamp': 1710860400,
'duration': 631,
'comment_count': int,
'view_count': int,
'tags': ['RADIO'],
'upload_date': '20240319',
},
2024-10-15 11:06:22 -05:00
'params': {'skip_download': True},
}, {
'url': 'https://hololive-fc.com/videos',
'info_dict': {
'id': 'hololive-fc.com/videos',
'title': '旧ホロライブ公式ファンクラブ-videos',
'age_limit': 0,
'timestamp': 1715652389,
'upload_date': '20240514',
},
'playlist_mincount': 12,
}, {
'url': 'https://tokinosora-fc.com/videos?vodType=1',
'info_dict': {
'id': 'tokinosora-fc.com/videos',
'title': 'ときのそらオフィシャルファンクラブ-videos',
'age_limit': 0,
'timestamp': 1715652399,
'upload_date': '20240514',
},
'playlist_mincount': 18,
}, {
'url': 'https://01audee.jp/videos?tag=RADIO&vodType=1&sort=display_date',
'info_dict': {
'id': '01audee.jp/videos',
'title': '大熊和奏 朝のささやき-videos',
'age_limit': 0,
'timestamp': 1715652369,
'upload_date': '20240514',
},
'playlist_mincount': 6,
}, {
'url': 'https://qlover.jp/bokuao/lives',
'info_dict': {
'id': 'qlover.jp/bokuao/lives',
'title': '僕が見たかった青空の 「青天のヘキレキ!」-lives',
'age_limit': 0,
'timestamp': 1715652429,
'upload_date': '20240514',
},
'playlist_mincount': 1,
}, {
'url': 'https://06audee.jp/lives',
'info_dict': {
'id': '06audee.jp/lives',
'title': '田中ちえ美のたなかのカナタ!-lives',
'age_limit': 0,
'timestamp': 1715652369,
'upload_date': '20240514',
},
'playlist_mincount': 5,
}]
_DOMAIN = None
_API_BASE_URL = None
_FANCLUB_GROUP_ID = None
_FANCLUB_SITE_ID_AUTH = None
_FANCLUB_SITE_ID_INFO = None
_LIST_PAGE_SIZE = 12
2025-01-29 06:18:44 -06:00
auth_manager: AuthManager = None
def _extract_from_url(self, url):
parsed_url = urllib.parse.urlparse(url)
2025-01-29 06:18:44 -06:00
if not self.auth_manager:
self.auth_manager = AuthManager(self)
if '/videos' in parsed_url.path:
return self._extract_video_list_page(url)
elif '/lives' in parsed_url.path:
return self._extract_live_list_page(url)
else:
return self._extract_player_page(url)
def _extract_from_webpage(self, url, webpage):
if 'GTM-KXT7G5G' in webpage or 'NicoGoogleTagManagerDataLayer' in webpage:
yield self._extract_from_url(url)
2024-10-15 11:06:22 -05:00
raise self.StopExtraction
def _call_api(self, path, item_id, *args, **kwargs):
return self._download_json(f'{self._API_BASE_URL}/{path}', item_id, *args, **kwargs)
def _find_fanclub_site_id(self, channel_id):
fanclub_list_json = self._call_api(
'content_providers/channel_domain', f'channels/{channel_id}',
query={'current_site_domain': urllib.parse.quote(f'https://{self._DOMAIN}/{channel_id}')},
note='Fetching channel list', errnote='Unable to fetch channel list',
)
if fanclub_id := traverse_obj(
fanclub_list_json, ('data', 'content_providers', 'id', {int_or_none}), get_all=False):
return fanclub_id
raise ExtractorError(f'Channel {channel_id} does not exist', expected=True)
def _extract_base_info(self, channel_id):
site_settings = self._download_json(
f'https://{self._DOMAIN}/site/settings.json', None,
note='Fetching site settings', errnote='Unable to fetch site settings')
self.write_debug(f'site_settings = {site_settings!r}')
self._API_BASE_URL = site_settings['api_base_url']
self._FANCLUB_GROUP_ID = site_settings['fanclub_group_id']
self._FANCLUB_SITE_ID_AUTH = site_settings['fanclub_site_id']
if channel_id:
self._FANCLUB_SITE_ID_INFO = self._find_fanclub_site_id(channel_id)
else:
self._FANCLUB_SITE_ID_INFO = self._FANCLUB_SITE_ID_AUTH
@property
def _channel_base_info(self):
return traverse_obj(self._call_api(
f'fanclub_sites/{self._FANCLUB_SITE_ID_INFO}/page_base_info', f'fanclub_sites/{self._FANCLUB_SITE_ID_INFO}',
note='Fetching channel base info', errnote='Unable to fetch channel base info', fatal=False,
), ('data', 'fanclub_site', {dict})) or {}
@property
def _channel_user_info(self):
return traverse_obj(self._call_api(
f'fanclub_sites/{self._FANCLUB_SITE_ID_INFO}/user_info', f'fanclub_sites/{self._FANCLUB_SITE_ID_INFO}',
note='Fetching channel user info', errnote='Unable to fetch channel user info', fatal=False,
data=json.dumps('null').encode(),
), ('data', 'fanclub_site', {dict})) or {}
def _extract_channel_info(self, channel_id):
if channel_id:
full_channel_id = f'{self._DOMAIN}/{channel_id}'
channel_url = f'https://{self._DOMAIN}/{channel_id}'
else:
full_channel_id = self._DOMAIN
channel_url = f'https://{self._DOMAIN}'
return {
'channel': self._channel_base_info.get('fanclub_site_name'),
'channel_id': full_channel_id,
'channel_url': channel_url,
'age_limit': traverse_obj(self._channel_user_info, (
'content_provider', 'age_limit', {int_or_none})),
}
def _extract_player_page(self, url):
self._DOMAIN, channel_id, content_code = re.match(
2024-10-15 11:06:22 -05:00
r'https?://(?P<domain>[\w.-]+)(/(?P<channel>[\w.-]+))?/(?:live|video|audio)/(?P<code>sm\w+)', url,
).group('domain', 'channel', 'code')
self._extract_base_info(channel_id)
data_json = self._call_api(
f'video_pages/{content_code}', content_code, headers={'fc_use_device': 'null'},
note='Fetching video page info', errnote='Unable to fetch video page info',
)['data']['video_page']
live_status = self._get_live_status(data_json, content_code)
formats = self._get_formats(data_json, live_status, content_code)
release_timestamp_str = data_json.get('live_scheduled_start_at')
if live_status == 'is_upcoming':
if release_timestamp_str:
msg = f'This live event will begin at {release_timestamp_str} UTC'
else:
msg = 'This event has not started yet'
self.raise_no_formats(msg, expected=True, video_id=content_code)
return {
'id': content_code,
'formats': formats,
'live_status': live_status,
'release_timestamp': unified_timestamp(release_timestamp_str),
**self._extract_channel_info(channel_id),
**traverse_obj(data_json, {
'title': ('title', {str}),
'thumbnail': ('thumbnail_url', {url_or_none}),
'description': ('description', {str}),
'timestamp': ('display_date', {unified_timestamp}),
'duration': ('active_video_filename', 'length', {int_or_none}),
'comment_count': ('video_aggregate_info', 'number_of_comments', {int_or_none}),
'view_count': ('video_aggregate_info', 'total_views', {int_or_none}),
'tags': ('video_tags', ..., 'tag', {str}),
}),
'__post_extractor': self.extract_comments(
content_code=content_code,
comment_group_id=traverse_obj(data_json, ('video_comment_setting', 'comment_group_id'))),
}
def _get_comments(self, content_code, comment_group_id):
item_id = f'{content_code}/comments'
if not comment_group_id:
return None
comment_access_token = self._call_api(
f'video_pages/{content_code}/comments_user_token', item_id,
note='Getting comment token', errnote='Unable to get comment token',
)['data']['access_token']
comment_list, urlh = self._download_json_handle(
'https://comm-api.sheeta.com/messages.history', video_id=item_id,
note='Fetching comments', errnote='Unable to fetch comments',
headers={'Content-Type': 'application/json'}, expected_status=404,
query={
'sort_direction': 'asc',
'limit': int_or_none(self._configuration_arg('max_comments', [''])[0]) or 120,
},
data=json.dumps({
'token': comment_access_token,
'group_id': comment_group_id,
}).encode())
if urlh.status == 404:
self.report_warning('Unable to fetch comments due to rate limit', content_code)
return
for comment in traverse_obj(comment_list, ...):
yield traverse_obj(comment, {
'author': ('nickname', {str}),
'author_id': ('sender_id', {str}),
'id': ('id', {str}, {lambda x: x or None}),
'text': ('message', {str}),
'timestamp': (('updated_at', 'sent_at', 'created_at'), {unified_timestamp}),
'author_is_uploader': ('sender_id', {lambda x: x == '-1'}),
}, get_all=False)
def _get_live_status(self, data_json, content_code):
video_type = data_json.get('type')
live_finished_at = data_json.get('live_finished_at')
if video_type == 'vod':
if live_finished_at:
live_status = 'was_live'
else:
live_status = 'not_live'
elif video_type == 'live':
if not data_json.get('live_started_at'):
return 'is_upcoming'
if not live_finished_at:
live_status = 'is_live'
else:
live_status = 'was_live'
video_allow_dvr_flg = traverse_obj(data_json, ('video', 'allow_dvr_flg'))
video_convert_to_vod_flg = traverse_obj(data_json, ('video', 'convert_to_vod_flg'))
self.write_debug(
f'{content_code}: allow_dvr_flg = {video_allow_dvr_flg}, convert_to_vod_flg = {video_convert_to_vod_flg}.')
if not (video_allow_dvr_flg and video_convert_to_vod_flg):
raise ExtractorError(
'Live was ended, there is no video for download', video_id=content_code, expected=True)
else:
raise ExtractorError(f'Unknown type: {video_type!r}', video_id=content_code)
self.write_debug(f'{content_code}: video_type={video_type}, live_status={live_status}')
return live_status
def _get_formats(self, data_json, live_status, content_code):
headers = filter_dict({
'Content-Type': 'application/json',
'fc_use_device': 'null',
'origin': f'https://{self._DOMAIN}',
2025-01-29 06:18:44 -06:00
'Authorization': self.auth_manager._get_auth_token(),
})
formats = []
if data_json.get('video'):
payload = {}
if data_json.get('type') == 'live' and live_status == 'was_live':
payload = {'broadcast_type': 'dvr'}
2025-01-29 06:18:44 -06:00
session_id = self.auth_manager._get_authed_info(
f'video_pages/{content_code}/session_ids', f'{content_code}/session',
('data', 'session_id', {str}), {
401: 'Members-only content',
403: 'Login required',
408: 'Outdated token',
}, data=json.dumps(payload).encode(), headers=headers,
note='Getting session id', errnote='Unable to get session id')
if session_id:
m3u8_url = data_json['video_stream']['authenticated_url'].format(session_id=session_id)
formats = self._extract_m3u8_formats(m3u8_url, content_code)
elif data_json.get('audio'):
2025-01-29 06:18:44 -06:00
m3u8_url = self.auth_manager._get_authed_info(
f'video_pages/{content_code}/content_access', f'{content_code}/content_access',
('data', 'resource', {url_or_none}), {
403: 'Login required',
404: 'Members-only content',
408: 'Outdated token',
}, headers=headers, note='Getting content resource',
errnote='Unable to get content resource')
if m3u8_url:
audio_type = traverse_obj(data_json, (
'audio_filename_transcoded_list', lambda _, v: v['url'] == m3u8_url,
'video_filename_type', 'value', {str}), get_all=False)
if audio_type == 'audio_free':
# fully free audios are always of "audio_paid"
msg = 'You have no right to access the paid content. '
if traverse_obj(data_json, 'video_free_periods'):
msg += 'There may be some silent parts in this audio'
else:
msg += 'This audio may be completely blank'
self.raise_login_required(
2025-01-29 06:18:44 -06:00
msg, metadata_available=True, method=self.auth_manager._auth_info.get('login_method'))
formats = [{
'url': m3u8_url,
'format_id': 'audio',
'protocol': 'm3u8_native',
'ext': 'm4a',
'vcodec': 'none',
'acodec': 'aac',
'format_note': audio_type,
}]
else:
raise ExtractorError('Unknown media type', video_id=content_code)
return formats
def _fetch_paged_channel_video_list(self, path, query, channel, item_id, page):
response = self._call_api(
path, item_id, query={
**query,
'page': (page + 1),
'per_page': self._LIST_PAGE_SIZE,
},
headers={'fc_use_device': 'null'},
note=f'Fetching channel info (page {page + 1})',
errnote=f'Unable to fetch channel info (page {page + 1})')
for content_code in traverse_obj(
response, ('data', 'video_pages', 'list', ..., 'content_code', {str})):
yield self.url_result('/'.join(filter(
2025-01-29 06:18:44 -06:00
None, [f'https://{self._DOMAIN}', channel, 'video', content_code])))
def _extract_video_list_page(self, url):
"""
API parameters:
sort:
-display_date 公開日が新しい順 (newest to oldest)
display_date 公開日が古い順 (oldest to newest)
-number_of_vod_views 再生数が多い順 (most play count)
number_of_vod_views コメントが多い順 (most comments)
vod_type (is "vodType" in "url"):
0 すべて (all)
1 会員限定 (members only)
2 一部無料 (partially free)
3 レンタル (rental)
4 生放送アーカイブ (live archives)
5 アップロード動画 (uploaded videos)
7 無料 (free)
"""
self._DOMAIN, channel_id = re.match(
2024-10-15 11:06:22 -05:00
r'https?://(?P<domain>[\w.-]+)(/(?P<channel>[\w.-]+))?/videos', url,
).group('domain', 'channel')
self._extract_base_info(channel_id)
channel_info = self._extract_channel_info(channel_id)
full_channel_id = channel_info['channel_id']
channel_name = channel_info['channel']
qs = parse_qs(url)
return self.playlist_result(
OnDemandPagedList(
functools.partial(
self._fetch_paged_channel_video_list, f'fanclub_sites/{self._FANCLUB_SITE_ID_INFO}/video_pages',
filter_dict({
'tag': traverse_obj(qs, ('tag', 0)),
'sort': traverse_obj(qs, ('sort', 0), default='-display_date'),
'vod_type': traverse_obj(qs, ('vodType', 0), default='0'),
}),
channel_id, f'{full_channel_id}/videos'),
self._LIST_PAGE_SIZE),
playlist_id=f'{full_channel_id}/videos', playlist_title=f'{channel_name}-videos')
def _extract_live_list_page(self, url):
"""
API parameters:
live_type:
1 放送中 (on air)
2 放送予定 (scheduled live streams, oldest to newest)
3 過去の放送 - すべて (all ended live streams, newest to oldest)
4 過去の放送 - 生放送アーカイブ (all archives for live streams, oldest to newest)
We use "4" instead of "3" because some recently ended live streams could not be downloaded.
"""
self._DOMAIN, channel_id = re.match(
2024-10-15 11:06:22 -05:00
r'https?://(?P<domain>[\w.-]+)(/(?P<channel>[\w.-]+))?/lives', url,
).group('domain', 'channel')
self._extract_base_info(channel_id)
channel_info = self._extract_channel_info(channel_id)
full_channel_id = channel_info['channel_id']
channel_name = channel_info['channel']
return self.playlist_result(
OnDemandPagedList(
functools.partial(
self._fetch_paged_channel_video_list, f'fanclub_sites/{self._FANCLUB_SITE_ID_INFO}/live_pages',
{'live_type': 4}, channel_id, f'{full_channel_id}/lives'),
self._LIST_PAGE_SIZE),
playlist_id=f'{full_channel_id}/lives', playlist_title=f'{channel_name}-lives')