diff --git a/yt_dlp/extractor/sheeta.py b/yt_dlp/extractor/sheeta.py index 77fa940a3..1221d50f9 100644 --- a/yt_dlp/extractor/sheeta.py +++ b/yt_dlp/extractor/sheeta.py @@ -25,6 +25,327 @@ from ..utils.traversal import traverse_obj +class AuthManager: + _AUTH_INFO = {} + + def __init__(self, ie: 'SheetaEmbedIE'): + self._ie = ie + self._auth_info = {} + + @property + def _auth_info(self): + if not self._AUTH_INFO.get(self._ie._DOMAIN): + self._AUTH_INFO[self._ie._DOMAIN] = {} + return self._AUTH_INFO.get(self._ie._DOMAIN) + + @_auth_info.setter + def _auth_info(self, value): + if not self._AUTH_INFO.get(self._ie._DOMAIN): + self._AUTH_INFO[self._ie._DOMAIN] = {} + self._AUTH_INFO[self._ie._DOMAIN].update(value) + + def _get_authed_info(self, query_path, item_id, dict_path, expected_code_msg, **query_kwargs): + try: + res = self._ie._call_api(query_path, item_id, **query_kwargs) + return traverse_obj(res, dict_path) + except ExtractorError as e: + if not isinstance(e.cause, HTTPError) or e.cause.status not in expected_code_msg: + raise e + self._ie.raise_login_required( + expected_code_msg[e.cause.status], metadata_available=True, + method=self._auth_info.get('login_method')) + return None + + def _get_auth_token(self): + if not self._auth_info.get('auth_token'): + try: + self._login() + return self._auth_info.get('auth_token') + except Exception as e: + raise ExtractorError('Unable to login due to unknown reasons') from e + + if self._auth_info.get('auth_token'): + try: + self._refresh_token() + return self._auth_info.get('auth_token') + except Exception as e: + raise ExtractorError('Unable to refresh token due to unknown reasons') from e + + return None + + def _refresh_token(self): + if not (refresh_func := self._auth_info.get('refresh_func')): + return False + + res = self._ie._download_json( + **refresh_func(self._auth_info), expected_status=(400, 403, 404), + note='Refreshing token', errnote='Unable to refresh token') + if error := traverse_obj( + res, ('error', 'message', {lambda x: base64.b64decode(x).decode()}), ('error', 'message')): + self._ie.report_warning(f'Unable to refresh token: {error!r}') + elif token := traverse_obj(res, ('data', 'access_token', {str})): + # niconico + self._auth_info = {'auth_token': f'Bearer {token}'} + return True + elif token := traverse_obj(res, ('access_token', {str})): + # auth0 + self._auth_info = {'auth_token': f'Bearer {token}'} + if refresh_token := traverse_obj(res, ('refresh_token', {str})): + self._auth_info = {'refresh_token': refresh_token} + self._ie.cache.store( + self._ie._NETRC_MACHINE, self._auth_info['cache_key'], {self._auth_info['cache_name']: refresh_token}) + return True + self._ie.report_warning('Unable to find new refresh_token') + else: + self._ie.report_warning('Unable to refresh token') + + return False + + def _login(self): + social_login_providers = traverse_obj(self._ie._call_api( + f'fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/login', None), + ('data', 'fanclub_group', 'fanclub_social_login_providers', ..., {dict})) or [] + self._ie.write_debug(f'social_login_providers = {social_login_providers!r}') + + for provider in social_login_providers: + provider_name = traverse_obj(provider, ('social_login_provider', 'provider_name', {str})) + if provider_name == 'ニコニコ': + redirect_url = update_url_query(provider['url'], { + 'client_id': 'FCS{:05d}'.format(provider['id']), + 'redirect_uri': f'https://{self._ie._DOMAIN}/login', + }) + refresh_url = f'{self._ie._API_BASE_URL}/fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/auth/refresh' + return self._niconico_sns_login(redirect_url, refresh_url) + else: + raise ExtractorError(f'Unsupported social login provider: {provider_name}') + + return self._auth0_login() + + def _niconico_sns_login(self, redirect_url, refresh_url): + self._auth_info = {'login_method': 'any'} + mail_tel, password = self._ie._get_login_info() + if not mail_tel: + return + + cache_key = hashlib.sha1(f'{self._ie._DOMAIN}:{mail_tel}:{password}'.encode()).hexdigest() + self._auth_info = {'cache_key': cache_key} + cache_name = 'niconico_sns' + + if cached_cookies := traverse_obj(self._ie.cache.load( + self._ie._NETRC_MACHINE, cache_key), (cache_name, {dict})): + for name, value in cached_cookies.items(): + self._ie._set_cookie(get_domain(redirect_url), name, value) + + if not (auth_token := self._niconico_get_token_by_cookies(redirect_url)): + if cached_cookies: + self._ie.cache.store(self._ie._NETRC_MACHINE, cache_key, None) + + self._niconico_login(mail_tel, password) + + if not (auth_token := self._niconico_get_token_by_cookies(redirect_url)): + self._ie.report_warning('Unable to get token after login, please check if ' + 'niconico channel plus is authorized to use your niconico account') + return + + self._auth_info = { + 'refresh_func': lambda data: { + 'url_or_request': data['refresh_url'], + 'video_id': None, + 'headers': {'Authorization': data['auth_token']}, + 'data': b'', + }, + 'refresh_url': refresh_url, + 'auth_token': auth_token, + } + + cookies = dict(traverse_obj(self._ie.cookiejar.get_cookies_for_url( + redirect_url), (..., {lambda item: (item.name, item.value)}))) + self._ie.cache.store(self._ie._NETRC_MACHINE, cache_key, {cache_name: cookies}) + + def _niconico_get_token_by_cookies(self, redirect_url): + urlh = self._ie._request_webpage( + redirect_url, None, note='Getting niconico auth status', + expected_status=404, errnote='Unable to get niconico auth status') + if not urlh.url.startswith(f'https://{self._DOMAIN}/login'): + return None + + if not (sns_login_code := traverse_obj(parse_qs(urlh.url), ('code', 0))): + self._ie.report_warning('Unable to get sns login code') + return None + + token = traverse_obj(self._ie._call_api( + f'fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/sns_login', None, fatal=False, + note='Fetching sns login info', errnote='Unable to fetch sns login info', + data=json.dumps({ + 'key_cloak_user': { + 'code': sns_login_code, + 'redirect_uri': f'https://{self._ie._DOMAIN}/login', + }, + 'fanclub_site': {'id': int(self._ie._FANCLUB_SITE_ID_AUTH)}, + }).encode(), headers={ + 'Content-Type': 'application/json', + 'fc_use_device': 'null', + 'Referer': f'https://{self._ie._DOMAIN}', + }), ('data', 'access_token', {str})) + if token: + return f'Bearer {token}' + + self._ie.report_warning('Unable to get token from sns login info') + return None + + def _niconico_login(self, mail_tel, password): + login_form_strs = { + 'mail_tel': mail_tel, + 'password': password, + } + page, urlh = self._ie._download_webpage_handle( + 'https://account.nicovideo.jp/login/redirector', None, + note='Logging into niconico', errnote='Unable to log into niconico', + data=urlencode_postdata(login_form_strs), + headers={ + 'Referer': 'https://account.nicovideo.jp/login', + 'Content-Type': 'application/x-www-form-urlencoded', + }) + if urlh.url.startswith('https://account.nicovideo.jp/login'): + self._ie.report_warning('Unable to log in: bad username or password') + return False + elif urlh.url.startswith('https://account.nicovideo.jp/mfa'): + post_url = self._ie._search_regex( + r'