import base64 import functools import hashlib import json import random import re import string import urllib.parse from .common import InfoExtractor from ..networking.exceptions import HTTPError from ..utils import ( ExtractorError, OnDemandPagedList, filter_dict, get_domain, int_or_none, parse_qs, unified_timestamp, update_url_query, url_or_none, urlencode_postdata, urljoin, ) from ..utils.traversal import traverse_obj class AuthManager: _AUTH_INFO = {} def __init__(self, ie: 'SheetaEmbedIE'): self._ie = ie self._auth_info = {} @property def _auth_info(self): if not self._AUTH_INFO.get(self._ie._DOMAIN): self._AUTH_INFO[self._ie._DOMAIN] = {} return self._AUTH_INFO.get(self._ie._DOMAIN) @_auth_info.setter def _auth_info(self, value): if not self._AUTH_INFO.get(self._ie._DOMAIN): self._AUTH_INFO[self._ie._DOMAIN] = {} self._AUTH_INFO[self._ie._DOMAIN].update(value) def _get_authed_info(self, query_path, item_id, dict_path, expected_code_msg, **query_kwargs): try: res = self._ie._call_api(query_path, item_id, **query_kwargs) return traverse_obj(res, dict_path) except ExtractorError as e: if not isinstance(e.cause, HTTPError) or e.cause.status not in expected_code_msg: raise e self._ie.raise_login_required( expected_code_msg[e.cause.status], metadata_available=True, method=self._auth_info.get('login_method')) return None def _get_auth_token(self): if not self._auth_info.get('auth_token'): try: self._login() return self._auth_info.get('auth_token') except Exception as e: raise ExtractorError('Unable to login due to unknown reasons') from e if self._auth_info.get('auth_token'): try: self._refresh_token() return self._auth_info.get('auth_token') except Exception as e: raise ExtractorError('Unable to refresh token due to unknown reasons') from e return None def _refresh_token(self): if not (refresh_func := self._auth_info.get('refresh_func')): return False res = self._ie._download_json( **refresh_func(self._auth_info), expected_status=(400, 403, 404), note='Refreshing token', errnote='Unable to refresh token') if error := traverse_obj( res, ('error', 'message', {lambda x: base64.b64decode(x).decode()}), ('error', 'message')): self._ie.report_warning(f'Unable to refresh token: {error!r}') elif token := traverse_obj(res, ('data', 'access_token', {str})): # niconico self._auth_info = {'auth_token': f'Bearer {token}'} return True elif token := traverse_obj(res, ('access_token', {str})): # auth0 self._auth_info = {'auth_token': f'Bearer {token}'} if refresh_token := traverse_obj(res, ('refresh_token', {str})): self._auth_info = {'refresh_token': refresh_token} self._ie.cache.store( self._ie._NETRC_MACHINE, self._auth_info['cache_key'], {self._auth_info['cache_name']: refresh_token}) return True self._ie.report_warning('Unable to find new refresh_token') else: self._ie.report_warning('Unable to refresh token') return False def _login(self): social_login_providers = traverse_obj(self._ie._call_api( f'fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/login', None), ('data', 'fanclub_group', 'fanclub_social_login_providers', ..., {dict})) or [] self._ie.write_debug(f'social_login_providers = {social_login_providers!r}') for provider in social_login_providers: provider_name = traverse_obj(provider, ('social_login_provider', 'provider_name', {str})) if provider_name == 'ニコニコ': redirect_url = update_url_query(provider['url'], { 'client_id': 'FCS{:05d}'.format(provider['id']), 'redirect_uri': f'https://{self._ie._DOMAIN}/login', }) refresh_url = f'{self._ie._API_BASE_URL}/fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/auth/refresh' return self._niconico_sns_login(redirect_url, refresh_url) else: raise ExtractorError(f'Unsupported social login provider: {provider_name}') return self._auth0_login() def _niconico_sns_login(self, redirect_url, refresh_url): self._auth_info = {'login_method': 'any'} mail_tel, password = self._ie._get_login_info() if not mail_tel: return cache_key = hashlib.sha1(f'{self._ie._DOMAIN}:{mail_tel}:{password}'.encode()).hexdigest() self._auth_info = {'cache_key': cache_key} cache_name = 'niconico_sns' if cached_cookies := traverse_obj(self._ie.cache.load( self._ie._NETRC_MACHINE, cache_key), (cache_name, {dict})): for name, value in cached_cookies.items(): self._ie._set_cookie(get_domain(redirect_url), name, value) if not (auth_token := self._niconico_get_token_by_cookies(redirect_url)): if cached_cookies: self._ie.cache.store(self._ie._NETRC_MACHINE, cache_key, None) self._niconico_login(mail_tel, password) if not (auth_token := self._niconico_get_token_by_cookies(redirect_url)): self._ie.report_warning('Unable to get token after login, please check if ' 'niconico channel plus is authorized to use your niconico account') return self._auth_info = { 'refresh_func': lambda data: { 'url_or_request': data['refresh_url'], 'video_id': None, 'headers': {'Authorization': data['auth_token']}, 'data': b'', }, 'refresh_url': refresh_url, 'auth_token': auth_token, } cookies = dict(traverse_obj(self._ie.cookiejar.get_cookies_for_url( redirect_url), (..., {lambda item: (item.name, item.value)}))) self._ie.cache.store(self._ie._NETRC_MACHINE, cache_key, {cache_name: cookies}) def _niconico_get_token_by_cookies(self, redirect_url): urlh = self._ie._request_webpage( redirect_url, None, note='Getting niconico auth status', expected_status=404, errnote='Unable to get niconico auth status') if not urlh.url.startswith(f'https://{self._DOMAIN}/login'): return None if not (sns_login_code := traverse_obj(parse_qs(urlh.url), ('code', 0))): self._ie.report_warning('Unable to get sns login code') return None token = traverse_obj(self._ie._call_api( f'fanclub_groups/{self._ie._FANCLUB_GROUP_ID}/sns_login', None, fatal=False, note='Fetching sns login info', errnote='Unable to fetch sns login info', data=json.dumps({ 'key_cloak_user': { 'code': sns_login_code, 'redirect_uri': f'https://{self._ie._DOMAIN}/login', }, 'fanclub_site': {'id': int(self._ie._FANCLUB_SITE_ID_AUTH)}, }).encode(), headers={ 'Content-Type': 'application/json', 'fc_use_device': 'null', 'Referer': f'https://{self._ie._DOMAIN}', }), ('data', 'access_token', {str})) if token: return f'Bearer {token}' self._ie.report_warning('Unable to get token from sns login info') return None def _niconico_login(self, mail_tel, password): login_form_strs = { 'mail_tel': mail_tel, 'password': password, } page, urlh = self._ie._download_webpage_handle( 'https://account.nicovideo.jp/login/redirector', None, note='Logging into niconico', errnote='Unable to log into niconico', data=urlencode_postdata(login_form_strs), headers={ 'Referer': 'https://account.nicovideo.jp/login', 'Content-Type': 'application/x-www-form-urlencoded', }) if urlh.url.startswith('https://account.nicovideo.jp/login'): self._ie.report_warning('Unable to log in: bad username or password') return False elif urlh.url.startswith('https://account.nicovideo.jp/mfa'): post_url = self._ie._search_regex( r'