mirror of
https://github.com/yt-dlp/yt-dlp.git
synced 2025-03-09 12:50:23 -05:00
wip: use traverse_obj instead
This commit is contained in:
parent
a9ba89fd03
commit
68232ed11c
1 changed files with 21 additions and 25 deletions
|
@ -26,7 +26,12 @@
|
|||
|
||||
|
||||
class AuthManager:
|
||||
_AUTH_INFO = {}
|
||||
_AUTH_INFO_CACHE = {}
|
||||
_AUTH0_BASE64_TRANS = str.maketrans({
|
||||
'+': '-',
|
||||
'/': '_',
|
||||
'=': None,
|
||||
})
|
||||
|
||||
def __init__(self, ie: 'SheetaEmbedIE'):
|
||||
self._ie = ie
|
||||
|
@ -34,15 +39,15 @@ def __init__(self, ie: 'SheetaEmbedIE'):
|
|||
|
||||
@property
|
||||
def _auth_info(self):
|
||||
if not self._AUTH_INFO.get(self._ie._DOMAIN):
|
||||
self._AUTH_INFO[self._ie._DOMAIN] = {}
|
||||
return self._AUTH_INFO.get(self._ie._DOMAIN)
|
||||
if not self._AUTH_INFO_CACHE.get(self._ie._DOMAIN):
|
||||
self._AUTH_INFO_CACHE[self._ie._DOMAIN] = {}
|
||||
return self._AUTH_INFO_CACHE.get(self._ie._DOMAIN)
|
||||
|
||||
@_auth_info.setter
|
||||
def _auth_info(self, value):
|
||||
if not self._AUTH_INFO.get(self._ie._DOMAIN):
|
||||
self._AUTH_INFO[self._ie._DOMAIN] = {}
|
||||
self._AUTH_INFO[self._ie._DOMAIN].update(value)
|
||||
if not self._AUTH_INFO_CACHE.get(self._ie._DOMAIN):
|
||||
self._AUTH_INFO_CACHE[self._ie._DOMAIN] = {}
|
||||
self._AUTH_INFO_CACHE[self._ie._DOMAIN].update(value)
|
||||
|
||||
def _get_authed_info(self, query_path, item_id, dict_path, expected_code_msg, **query_kwargs):
|
||||
try:
|
||||
|
@ -242,10 +247,12 @@ def _auth0_login(self):
|
|||
'cache_name': cache_name,
|
||||
}
|
||||
|
||||
login_info = self._ie._call_api(f'fanclub_sites/{self._ie._FANCLUB_SITE_ID_AUTH}/login', None)['data']['fanclub_site']
|
||||
login_info = traverse_obj(
|
||||
self._ie._call_api(f'fanclub_sites/{self._ie._FANCLUB_SITE_ID_AUTH}/login', None),
|
||||
('data', 'fanclub_site'))
|
||||
self._ie.write_debug(f'login_info = {login_info}')
|
||||
auth0_web_client_id = login_info['auth0_web_client_id']
|
||||
auth0_domain = login_info['fanclub_group']['auth0_domain']
|
||||
auth0_web_client_id = login_info.get('auth0_web_client_id')
|
||||
auth0_domain = traverse_obj(login_info, ('fanclub_group', 'auth0_domain'))
|
||||
|
||||
token_url = f'https://{auth0_domain}/oauth/token'
|
||||
redirect_url = f'https://{self._ie._DOMAIN}/login/login-redirect'
|
||||
|
@ -274,7 +281,7 @@ def random_str():
|
|||
nonce = base64.b64encode(random_str().encode())
|
||||
code_verifier = random_str().encode()
|
||||
code_challenge = base64.b64encode(
|
||||
hashlib.sha256(code_verifier).digest()).decode().translate(self._ie._AUTH0_BASE64_TRANS)
|
||||
hashlib.sha256(code_verifier).digest()).decode().translate(self._AUTH0_BASE64_TRANS)
|
||||
|
||||
authorize_url = update_url_query(f'https://{auth0_domain}/authorize', {
|
||||
'client_id': auth0_web_client_id,
|
||||
|
@ -320,7 +327,7 @@ def random_str():
|
|||
self._ie.report_warning('Unable to log in: Unknown login status')
|
||||
return
|
||||
|
||||
code = parse_qs(urlh.url)['code'][0]
|
||||
code = traverse_obj(parse_qs(urlh.url), ('code', 0))
|
||||
|
||||
token_json = self._ie._download_json(
|
||||
token_url, None, headers={'Auth0-Client': auth0_client},
|
||||
|
@ -333,8 +340,8 @@ def random_str():
|
|||
'redirect_uri': redirect_url,
|
||||
}))
|
||||
|
||||
access_token = token_json['access_token']
|
||||
refresh_token = token_json['refresh_token']
|
||||
access_token = token_json.get('access_token')
|
||||
refresh_token = token_json.get('refresh_token')
|
||||
|
||||
auth_token = f'Bearer {access_token}'
|
||||
|
||||
|
@ -499,21 +506,10 @@ class SheetaEmbedIE(InfoExtractor):
|
|||
_FANCLUB_SITE_ID_AUTH = None
|
||||
_FANCLUB_SITE_ID_INFO = None
|
||||
|
||||
_AUTH0_BASE64_TRANS = str.maketrans({
|
||||
'+': '-',
|
||||
'/': '_',
|
||||
'=': None,
|
||||
})
|
||||
_LIST_PAGE_SIZE = 12
|
||||
|
||||
auth_manager: AuthManager = None
|
||||
|
||||
# @classmethod
|
||||
# def suitable(cls, url):
|
||||
# return (
|
||||
# not any(ie.suitable(url) for ie in (ArteTVIE, ArteTVPlaylistIE))
|
||||
# and super().suitable(url))
|
||||
|
||||
def _extract_from_url(self, url):
|
||||
parsed_url = urllib.parse.urlparse(url)
|
||||
if not self.auth_manager:
|
||||
|
|
Loading…
Reference in a new issue