1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00
This commit is contained in:
c-basalt 2025-03-08 14:54:18 +01:00 committed by GitHub
commit 7f6210b18b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 404 additions and 114 deletions

View file

@ -1817,6 +1817,8 @@ #### instagram
#### niconicochannelplus #### niconicochannelplus
* `max_comments`: Maximum number of comments to extract - default is `120` * `max_comments`: Maximum number of comments to extract - default is `120`
* `jwt_token`: JWT token saved in browser localStorage for login and authorization of paid content, note lifetime of JWT token is only 5 minutes
* `refresh_token`: refresh token saved in browser localStorage for obtaining new JWT token, note using this arg invalidates the token and browser login
#### tiktok #### tiktok
* `api_hostname`: Hostname to use for mobile API calls, e.g. `api22-normal-c-alisg.tiktokv.com` * `api_hostname`: Hostname to use for mobile API calls, e.g. `api22-normal-c-alisg.tiktokv.com`

View file

@ -2406,7 +2406,7 @@ def _real_extract(self, url):
full_response = self._request_webpage(url, video_id, headers=filter_dict({ full_response = self._request_webpage(url, video_id, headers=filter_dict({
'Accept-Encoding': 'identity', 'Accept-Encoding': 'identity',
'Referer': smuggled_data.get('referer'), 'Referer': smuggled_data.get('referer'),
}), impersonate=impersonate) }), impersonate=impersonate, expected_status=404)
except ExtractorError as e: except ExtractorError as e:
if not (isinstance(e.cause, HTTPError) and e.cause.status == 403 if not (isinstance(e.cause, HTTPError) and e.cause.status == 403
and e.cause.response.get_header('cf-mitigated') == 'challenge' and e.cause.response.get_header('cf-mitigated') == 'challenge'

View file

@ -1,5 +1,11 @@
import base64
import functools import functools
import hashlib
import json import json
import random
import re
import time
import urllib.parse
from .common import InfoExtractor from .common import InfoExtractor
from ..utils import ( from ..utils import (
@ -7,45 +13,289 @@
OnDemandPagedList, OnDemandPagedList,
filter_dict, filter_dict,
int_or_none, int_or_none,
jwt_decode_hs256,
parse_qs, parse_qs,
str_or_none, str_or_none,
traverse_obj, traverse_obj,
unified_timestamp, unified_timestamp,
url_or_none, url_or_none,
urlencode_postdata,
urljoin,
) )
_SUITABLE_NICOCHANNEL_PLUS_DOMAINS = set()
class NiconicoChannelPlusBaseIE(InfoExtractor):
_WEBPAGE_BASE_URL = 'https://nicochannel.jp'
def _call_api(self, path, item_id, **kwargs): class NicoChannelCommonBaseIE(InfoExtractor):
return self._download_json( _SITE_SETTINGS = {}
f'https://nfc-api.nicochannel.jp/fc/{path}', video_id=item_id, **kwargs)
def _find_fanclub_site_id(self, channel_name): def _get_jwt_token(self, url):
fanclub_list_json = self._call_api( pass
'content_providers/channels', item_id=f'channels/{channel_name}',
note='Fetching channel list', errnote='Unable to fetch channel list',
)['data']['content_providers']
fanclub_id = traverse_obj(fanclub_list_json, (
lambda _, v: v['domain'] == f'{self._WEBPAGE_BASE_URL}/{channel_name}', 'id'),
get_all=False)
if not fanclub_id:
raise ExtractorError(f'Channel {channel_name} does not exist', expected=True)
return fanclub_id
def _get_channel_base_info(self, fanclub_site_id): def _get_settings(self, url, video_id=None):
return traverse_obj(self._call_api( base_url = urljoin(url, '/')
f'fanclub_sites/{fanclub_site_id}/page_base_info', item_id=f'fanclub_sites/{fanclub_site_id}', if base_url not in self._SITE_SETTINGS:
note='Fetching channel base info', errnote='Unable to fetch channel base info', fatal=False, site_settings = self._download_json(
), ('data', 'fanclub_site', {dict})) or {} urljoin(base_url, '/site/settings.json'), video_id, note='Downloading site settings')
if 'api_base_url' not in site_settings or 'fanclub_site_id' not in site_settings:
raise ExtractorError('Unable to get site settings')
self._SITE_SETTINGS[base_url] = site_settings
_SUITABLE_NICOCHANNEL_PLUS_DOMAINS.add(urllib.parse.urlparse(url).netloc)
def _get_channel_user_info(self, fanclub_site_id): if self._SITE_SETTINGS[base_url].get('platform_id') not in ['CHPL', 'SHTA', 'JOQR', 'TKFM']:
return traverse_obj(self._call_api( self.report_warning(f'Unknown platform type: {self._SITE_SETTINGS[base_url].get("platform_id")}')
f'fanclub_sites/{fanclub_site_id}/user_info', item_id=f'fanclub_sites/{fanclub_site_id}', return self._SITE_SETTINGS[base_url]
note='Fetching channel user info', errnote='Unable to fetch channel user info', fatal=False,
data=json.dumps('null').encode('ascii'), def _download_api_json(self, site_url, path, video_id, headers={}, **kwargs):
), ('data', 'fanclub_site', {dict})) or {} path = f'/{path}' if path[0] != '/' else path
settings = self._get_settings(site_url, video_id)
headers = {
'origin': urljoin(site_url, '/').strip('/'),
'referer': urljoin(site_url, '/'),
'fc_site_id': settings['fanclub_site_id'],
'fc_use_device': 'null',
**headers,
}
if jwt_token := self._get_jwt_token(site_url):
headers['Authorization'] = f'Bearer {jwt_token}'
data, handle = self._download_json_handle(
f'{settings["api_base_url"]}{path}', video_id, headers=headers, expected_status=403, **kwargs)
if handle.status == 403:
if not self._get_jwt_token(site_url):
self.raise_login_required()
raise ExtractorError('You may have no access to this video')
return data
class NicoChannelAuthBaseIE(NicoChannelCommonBaseIE):
_NETRC_MACHINE = False
_AUTH_SETTINGS = {}
_AUTH_TOKENS = {}
_ARG_REFRESH_USED = False
_REFRESH_TIMEOUT_THRES = 15
_netrc_domain: str
def _get_auth(self, url) -> dict:
return self._AUTH_TOKENS.get(urljoin(url, '/'), {})
def _set_auth(self, url, auth):
self._AUTH_TOKENS[urljoin(url, '/')] = auth
def _login_hint(self, *args, **kwargs):
return (super()._login_hint('password', netrc=getattr(self, '_netrc_domain', None))
+ ', or --extractor-args "niconicochannelplus:jwt_token=xxx" or --extractor-args '
'"niconicochannelplus:refresh_token=xxx" to directly providing auth token')
def _get_auth_settings(self, url):
fanclub_site_id = self._get_settings(url)['fanclub_site_id']
if fanclub_site_id not in self._AUTH_SETTINGS:
self._AUTH_SETTINGS[fanclub_site_id] = traverse_obj(self._download_api_json(
url, f'/fanclub_sites/{fanclub_site_id}/login', f'site/{fanclub_site_id}',
note='Downloading auth settings'), ('data', 'fanclub_site', {
'auth0_web_client': ('auth0_web_client_id', {str}),
'auth0_domain': ('fanclub_group', 'auth0_domain', {str}),
}))
return self._AUTH_SETTINGS[fanclub_site_id]
def _get_jwt_token(self, url):
def _load_access_token():
if access_token := self._get_auth(url).get('access_token'):
if time.time() < jwt_decode_hs256(access_token)['exp'] - self._REFRESH_TIMEOUT_THRES:
return access_token
def _try_then_load(func, error_msg, *args, **kwargs):
try:
func(*args, **kwargs)
return _load_access_token()
except Exception as e:
self.report_warning(f'{error_msg}: {e}')
if access_token := _load_access_token():
return access_token
if access_token := _try_then_load(self._refresh_sheeta_token, 'Failed to refresh token', url):
return access_token
if not self._has_attempted_login:
if access_token := _try_then_load(self._perform_sheeta_login, 'Failed to login', url):
return access_token
if jwt_args := self._configuration_arg('jwt_token', ie_key='niconicochannelplus', casesense=True):
jwt_token = jwt_args[0]
try:
if time.time() < jwt_decode_hs256(jwt_token)['exp']:
return jwt_token
else:
self.report_warning('JWT token expired, continuing without login.')
except Exception:
self.report_warning('Invalid JWT token, continuing without login.')
@property
def _auth0_client(self):
return base64.b64encode(json.dumps({ # index.js: btoa(JSON.stringify(s || Aq))
'name': 'auth0-spa-js', # index.js: Aq = ...
'version': '2.0.6',
}, separators=(',', ':')).encode()).decode()
@property
def _has_attempted_login(self):
return getattr(self, '_netrc_domain', None) is not None
def _perform_sheeta_login(self, url):
def _random_string():
return ''.join(random.choices('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_~.', k=43))
self._netrc_domain = urllib.parse.urlparse(url).netloc
username, password = self._get_login_info(netrc_machine=self._netrc_domain)
if not username or not password:
return
auth_settings = self._get_auth_settings(url)
site_settings = self._get_settings(url)
code_verifier = _random_string()
code_challenge = base64.urlsafe_b64encode(hashlib.sha256(code_verifier.encode()).digest()).decode().rstrip('=')
preauth_query = {
'client_id': auth_settings['auth0_web_client'],
'scope': 'openid profile email offline_access',
'redirect_uri': urljoin(url, '/login/login-redirect'),
'audience': urllib.parse.urlparse(site_settings['api_base_url']).hostname,
'ext-group_id': site_settings['fanclub_group_id'],
'ext-platform_id': site_settings['platform_id'],
'ext-terms': urljoin(url, '/terms__content_type___nfc_terms_of_services'),
'prompt': 'login',
'response_type': 'code',
'response_mode': 'query',
'state': base64.b64encode(_random_string().encode()).decode(),
'nonce': base64.b64encode(_random_string().encode()).decode(),
'code_challenge': code_challenge,
'code_challenge_method': 'S256',
'auth0Client': self._auth0_client,
}
_, handler = self._download_webpage_handle(
f'https://{auth_settings["auth0_domain"]}/authorize', 'preauth', query=preauth_query)
_, handler = self._download_webpage_handle(handler.url, 'login', data=urlencode_postdata({
'state': parse_qs(handler.url)['state'][0],
'username': username,
'password': password,
'action': 'default',
}), headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': urljoin(handler.url, '/').rstrip('/'),
'Referer': handler.url,
}, expected_status=404)
data = self._download_json(
f'https://{auth_settings["auth0_domain"]}/oauth/token', 'login-token',
data=urlencode_postdata({
'client_id': auth_settings['auth0_web_client'],
'code_verifier': code_verifier,
'grant_type': 'authorization_code',
'code': parse_qs(handler.url)['code'][0],
'redirect_uri': urljoin(url, '/login/login-redirect'),
}), headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Origin': urljoin(handler.url, '/').rstrip('/'),
'Referer': handler.url,
})
self._set_auth(url, {
'access_token': data['access_token'],
'refresh_token': data['refresh_token'],
})
def _load_args_refresh_token(self):
if self._ARG_REFRESH_USED:
return
if refresh_token_args := self._configuration_arg('refresh_token', ie_key='niconicochannelplus', casesense=True):
self._ARG_REFRESH_USED = True
return refresh_token_args[0]
def _refresh_sheeta_token(self, url):
if not (refresh_token := self._get_auth(url).get('refresh_token') or self._load_args_refresh_token()):
return
auth_settings = self._get_auth_settings(url)
data = self._download_json(
f'https://{auth_settings["auth0_domain"]}/oauth/token', 'refresh',
data=urlencode_postdata({
'client_id': auth_settings['auth0_web_client'],
'redirect_uri': urljoin(url, '/login/login-redirect'),
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
}), headers={
'Content-Type': 'application/x-www-form-urlencoded',
'Auth0-Client': self._auth0_client,
'Origin': urljoin(url, '/').rstrip('/'),
'Referer': urljoin(url, '/'),
}, note='Refreshing auth token')
self._set_auth(url, {
'access_token': data['access_token'],
'refresh_token': data['refresh_token'],
})
class NiconicoChannelPlusBaseIE(NicoChannelAuthBaseIE):
_CHANNEL_NAMES = {}
_CHANNEL_AGE_LIMIT = {}
_DOMAIN_SITE_ID = {}
def _get_channel_id(self, url):
parsed = urllib.parse.urlparse(url)
if self._get_settings(url)['platform_id'] == 'SHTA':
return parsed.hostname.replace('.', '_')
elif self._get_settings(url)['platform_id'] == 'CHPL':
return parsed.path.split('/')[1]
else:
return f'{parsed.hostname.replace(".", "_")}_{parsed.path.split("/")[1]}'
def _get_fanclub_site_id(self, url):
settings = self._get_settings(url)
if settings['platform_id'] == 'SHTA':
return str(settings['fanclub_site_id'])
else:
parsed = urllib.parse.urlparse(url)
# parsed.path starts with '/', so index 0 is empty string
domain_url = f'{parsed.scheme}://{parsed.netloc}/{parsed.path.split("/")[1].lower()}'
if domain_url not in self._DOMAIN_SITE_ID:
self._DOMAIN_SITE_ID[domain_url] = str(self._download_api_json(
url, '/content_providers/channel_domain', domain_url,
query={'current_site_domain': domain_url})['data']['content_providers']['id'])
return self._DOMAIN_SITE_ID[domain_url]
def _get_channel_url(self, url):
parsed = urllib.parse.urlparse(url)
if self._get_settings(url)['platform_id'] == 'SHTA':
return f'{parsed.scheme}://{parsed.netloc}'
else:
return f'{parsed.scheme}://{parsed.netloc}/{parsed.path.split("/")[1]}'
def _get_channel_name(self, url):
fanclub_site_id = self._get_fanclub_site_id(url)
if fanclub_site_id not in self._CHANNEL_NAMES:
self._CHANNEL_NAMES[fanclub_site_id] = traverse_obj(self._download_api_json(
url, f'/fanclub_sites/{fanclub_site_id}/page_base_info', video_id=str(fanclub_site_id),
note='Downloading channel name', fatal=False,
), ('data', 'fanclub_site', 'fanclub_site_name', {str}))
return self._CHANNEL_NAMES[fanclub_site_id]
def _get_age_limit(self, url):
fanclub_site_id = self._get_fanclub_site_id(url)
if fanclub_site_id not in self._CHANNEL_AGE_LIMIT:
self._CHANNEL_AGE_LIMIT[fanclub_site_id] = traverse_obj(self._download_api_json(
url, f'/fanclub_sites/{fanclub_site_id}/user_info', video_id=str(fanclub_site_id), data=b'',
note='Downloading channel age limit', fatal=False,
), ('data', 'fanclub_site', 'content_provider', 'age_limit', {int}))
return self._CHANNEL_AGE_LIMIT[fanclub_site_id]
def _is_channel_plus_webpage(self, webpage):
return 'GTM-KXT7G5G' in webpage or 'NicoGoogleTagManagerDataLayer' in webpage
class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE): class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
@ -53,6 +303,25 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
IE_DESC = 'ニコニコチャンネルプラス' IE_DESC = 'ニコニコチャンネルプラス'
_VALID_URL = r'https?://nicochannel\.jp/(?P<channel>[\w.-]+)/(?:video|live)/(?P<code>sm\w+)' _VALID_URL = r'https?://nicochannel\.jp/(?P<channel>[\w.-]+)/(?:video|live)/(?P<code>sm\w+)'
_TESTS = [{ _TESTS = [{
'url': 'https://nicochannel.jp/renge/video/smjHSEPCxd4ohY4zg8iyGKnX',
'info_dict': {
'id': 'smjHSEPCxd4ohY4zg8iyGKnX',
'title': '【両耳舐め】あまいちゃトロらぶ両耳舐め【本多ぽこちゃんと耳舐めASMR②】',
'ext': 'mp4',
'channel': '狐月れんげのあまとろASMR',
'channel_id': 'renge',
'channel_url': 'https://nicochannel.jp/renge',
'live_status': 'not_live',
'thumbnail': 'https://nicochannel.jp/public_html/contents/video_pages/35690/thumbnail_path?time=1722439868',
'description': 'お耳が癒されて疲れもヌケる♡\n本多ぽこちゃんとの2024年7月24日の耳舐めコラボアーカイブです。',
'timestamp': 1722439866,
'duration': 2698,
'comment_count': int,
'view_count': int,
'tags': list,
'upload_date': '20240731',
},
}, {
'url': 'https://nicochannel.jp/kaorin/video/smsDd8EdFLcVZk9yyAhD6H7H', 'url': 'https://nicochannel.jp/kaorin/video/smsDd8EdFLcVZk9yyAhD6H7H',
'info_dict': { 'info_dict': {
'id': 'smsDd8EdFLcVZk9yyAhD6H7H', 'id': 'smsDd8EdFLcVZk9yyAhD6H7H',
@ -71,9 +340,7 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
'tags': [], 'tags': [],
'upload_date': '20220105', 'upload_date': '20220105',
}, },
'params': { 'skip': 'subscriber only',
'skip_download': True,
},
}, { }, {
# age limited video; test purpose channel. # age limited video; test purpose channel.
'url': 'https://nicochannel.jp/testman/video/smDXbcrtyPNxLx9jc4BW69Ve', 'url': 'https://nicochannel.jp/testman/video/smDXbcrtyPNxLx9jc4BW69Ve',
@ -93,49 +360,66 @@ class NiconicoChannelPlusIE(NiconicoChannelPlusBaseIE):
'tags': [], 'tags': [],
'upload_date': '20221021', 'upload_date': '20221021',
}, },
'params': { 'skip': 'subscriber only',
'skip_download': True,
},
}] }]
@staticmethod
def _match_video_id(url):
return re.search(r'/(?:video|audio|live)/(?P<id>sm\w+)', urllib.parse.urlparse(url).path)
@classmethod
def suitable(cls, url):
try:
return super().suitable(url) or (
urllib.parse.urlparse(url).netloc in _SUITABLE_NICOCHANNEL_PLUS_DOMAINS
and cls._match_video_id(url))
except NameError:
# fallback for lazy extractor
return super().suitable(url)
def _extract_from_webpage(self, url, webpage):
if self._match_video_id(url) and self._is_channel_plus_webpage(webpage):
yield self._real_extract(url)
def _real_extract(self, url): def _real_extract(self, url):
content_code, channel_id = self._match_valid_url(url).group('code', 'channel') video_id = self._match_video_id(url).group('id')
fanclub_site_id = self._find_fanclub_site_id(channel_id)
data_json = self._call_api( video_info = self._download_api_json(url, f'/video_pages/{video_id}', video_id,
f'video_pages/{content_code}', item_id=content_code, headers={'fc_use_device': 'null'}, note='Downloading video info')['data']['video_page']
note='Fetching video page info', errnote='Unable to fetch video page info',
)['data']['video_page']
live_status, session_id = self._get_live_status_and_session_id(content_code, data_json) live_status, session_payload, timestamp = self._parse_live_status(video_id, video_info)
if video_info.get('video'):
release_timestamp_str = data_json.get('live_scheduled_start_at') session_id = self._download_api_json(
url, f'/video_pages/{video_id}/session_ids', video_id, data=json.dumps(session_payload).encode(),
formats = [] headers={'content-type': 'application/json'}, note='Downloading video session')['data']['session_id']
if live_status == 'is_upcoming':
if release_timestamp_str:
msg = f'This live event will begin at {release_timestamp_str} UTC'
else:
msg = 'This event has not started yet'
self.raise_no_formats(msg, expected=True, video_id=content_code)
else:
formats = self._extract_m3u8_formats( formats = self._extract_m3u8_formats(
# "authenticated_url" is a format string that contains "{session_id}". video_info['video_stream']['authenticated_url'].format(session_id=session_id), video_id)
m3u8_url=data_json['video_stream']['authenticated_url'].format(session_id=session_id), elif video_info.get('audio'):
video_id=content_code) audio_url = self._download_api_json(
url, f'/video_pages/{video_id}/content_access', video_id)['data']['resource']
format_id = traverse_obj(video_info, ('audio_filename_transcoded_list', lambda _, v: v['url'] == audio_url, 'video_filename_type', 'value', any))
if format_id != 'audio_paid':
self.report_warning('The audio may be empty, or incomplete and contains only trial parts.')
formats = [{
'url': audio_url,
'ext': 'm4a',
'protocol': 'm3u8_native',
'format_id': format_id,
}]
else:
raise ExtractorError('Unknown media type', video_id=video_id)
return { return {
'id': content_code, 'id': video_id,
'formats': formats, 'formats': formats,
'_format_sort_fields': ('tbr', 'vcodec', 'acodec'), '_format_sort_fields': ('tbr', 'vcodec', 'acodec'),
'channel': self._get_channel_base_info(fanclub_site_id).get('fanclub_site_name'), 'channel': self._get_channel_name(url),
'channel_id': channel_id, 'channel_id': self._get_channel_id(url),
'channel_url': f'{self._WEBPAGE_BASE_URL}/{channel_id}', 'channel_url': self._get_channel_url(url),
'age_limit': traverse_obj(self._get_channel_user_info(fanclub_site_id), ('content_provider', 'age_limit')), 'age_limit': self._get_age_limit(url),
'live_status': live_status, 'live_status': live_status,
'release_timestamp': unified_timestamp(release_timestamp_str), 'release_timestamp': timestamp,
**traverse_obj(data_json, { **traverse_obj(video_info, {
'title': ('title', {str}), 'title': ('title', {str}),
'thumbnail': ('thumbnail_url', {url_or_none}), 'thumbnail': ('thumbnail_url', {url_or_none}),
'description': ('description', {str}), 'description': ('description', {str}),
@ -146,23 +430,22 @@ def _real_extract(self, url):
'tags': ('video_tags', ..., 'tag', {str}), 'tags': ('video_tags', ..., 'tag', {str}),
}), }),
'__post_extractor': self.extract_comments( '__post_extractor': self.extract_comments(
content_code=content_code, url=url,
comment_group_id=traverse_obj(data_json, ('video_comment_setting', 'comment_group_id'))), comment_group_id=traverse_obj(video_info, ('video_comment_setting', 'comment_group_id'))),
} }
def _get_comments(self, content_code, comment_group_id): def _get_comments(self, url, comment_group_id):
item_id = f'{content_code}/comments'
if not comment_group_id: if not comment_group_id:
return None return None
video_id = self._parse_video_id(url)
comment_access_token = self._call_api( comment_access_token = self._download_api_json(
f'video_pages/{content_code}/comments_user_token', item_id, url, f'video_pages/{video_id}/comments_user_token', f'{video_id}/comments',
note='Getting comment token', errnote='Unable to get comment token', note='Getting comment token', errnote='Unable to get comment token',
)['data']['access_token'] )['data']['access_token']
comment_list = self._download_json( comment_list = self._download_json(
'https://comm-api.sheeta.com/messages.history', video_id=item_id, 'https://comm-api.sheeta.com/messages.history', video_id=f'{video_id}/comments',
note='Fetching comments', errnote='Unable to fetch comments', note='Fetching comments', errnote='Unable to fetch comments',
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
query={ query={
@ -184,9 +467,10 @@ def _get_comments(self, content_code, comment_group_id):
'author_is_uploader': ('sender_id', {lambda x: x == '-1'}), 'author_is_uploader': ('sender_id', {lambda x: x == '-1'}),
}, get_all=False) }, get_all=False)
def _get_live_status_and_session_id(self, content_code, data_json): def _parse_live_status(self, video_id, video_info):
video_type = data_json.get('type') video_type = video_info.get('type')
live_finished_at = data_json.get('live_finished_at') live_finished_at = video_info.get('live_finished_at')
release_timestamp_str = video_info.get('live_scheduled_start_at')
payload = {} payload = {}
if video_type == 'vod': if video_type == 'vod':
@ -195,8 +479,13 @@ def _get_live_status_and_session_id(self, content_code, data_json):
else: else:
live_status = 'not_live' live_status = 'not_live'
elif video_type == 'live': elif video_type == 'live':
if not data_json.get('live_started_at'): if not video_info.get('live_started_at'):
return 'is_upcoming', '' live_status = 'is_upcoming'
if release_timestamp_str:
msg = f'This live event will begin at {release_timestamp_str} UTC'
else:
msg = 'This event has not started yet'
self.raise_no_formats(msg, expected=True, video_id=video_id)
if not live_finished_at: if not live_finished_at:
live_status = 'is_live' live_status = 'is_live'
@ -204,50 +493,43 @@ def _get_live_status_and_session_id(self, content_code, data_json):
live_status = 'was_live' live_status = 'was_live'
payload = {'broadcast_type': 'dvr'} payload = {'broadcast_type': 'dvr'}
video_allow_dvr_flg = traverse_obj(data_json, ('video', 'allow_dvr_flg')) video_allow_dvr_flg = traverse_obj(video_info, ('video', 'allow_dvr_flg'))
video_convert_to_vod_flg = traverse_obj(data_json, ('video', 'convert_to_vod_flg')) video_convert_to_vod_flg = traverse_obj(video_info, ('video', 'convert_to_vod_flg'))
self.write_debug(f'allow_dvr_flg = {video_allow_dvr_flg}, convert_to_vod_flg = {video_convert_to_vod_flg}.') self.write_debug(f'allow_dvr_flg = {video_allow_dvr_flg}, convert_to_vod_flg = {video_convert_to_vod_flg}.')
if not (video_allow_dvr_flg and video_convert_to_vod_flg): if not (video_allow_dvr_flg and video_convert_to_vod_flg):
raise ExtractorError( raise ExtractorError(
'Live was ended, there is no video for download.', video_id=content_code, expected=True) 'Live was ended, there is no video for download.', video_id=video_id, expected=True)
else: else:
raise ExtractorError(f'Unknown type: {video_type}', video_id=content_code, expected=False) raise ExtractorError(f'Unknown type: {video_type}', video_id=video_id, expected=False)
self.write_debug(f'{content_code}: video_type={video_type}, live_status={live_status}') self.write_debug(f'{video_id}: video_type={video_type}, live_status={live_status}')
session_id = self._call_api( return live_status, payload, unified_timestamp(release_timestamp_str)
f'video_pages/{content_code}/session_ids', item_id=f'{content_code}/session',
data=json.dumps(payload).encode('ascii'), headers={
'Content-Type': 'application/json',
'fc_use_device': 'null',
'origin': 'https://nicochannel.jp',
},
note='Getting session id', errnote='Unable to get session id',
)['data']['session_id']
return live_status, session_id
class NiconicoChannelPlusChannelBaseIE(NiconicoChannelPlusBaseIE): class NiconicoChannelPlusChannelBaseIE(NiconicoChannelPlusBaseIE):
_PAGE_SIZE = 12 _PAGE_SIZE = 12
def _fetch_paged_channel_video_list(self, path, query, channel_name, item_id, page): def _fetch_paged_channel_video_list(self, site_url, path, query, video_id, page):
response = self._call_api( response = self._download_api_json(
path, item_id, query={ site_url, path, video_id, query={
**query, **query,
'page': (page + 1), 'page': (page + 1),
'per_page': self._PAGE_SIZE, 'per_page': self._PAGE_SIZE,
}, },
headers={'fc_use_device': 'null'},
note=f'Getting channel info (page {page + 1})', note=f'Getting channel info (page {page + 1})',
errnote=f'Unable to get channel info (page {page + 1})') errnote=f'Unable to get channel info (page {page + 1})')
for content_code in traverse_obj(response, ('data', 'video_pages', 'list', ..., 'content_code')): # ensure that real extractor is instantiated over lazy extractor
self._downloader.get_info_extractor(NiconicoChannelPlusIE.ie_key())
for entry in traverse_obj(response, ('data', 'video_pages', 'list', lambda _, v: v['content_code'])):
# "video/{content_code}" works for both VOD and live, but "live/{content_code}" doesn't work for VOD # "video/{content_code}" works for both VOD and live, but "live/{content_code}" doesn't work for VOD
yield self.url_result( yield self.url_result(
f'{self._WEBPAGE_BASE_URL}/{channel_name}/video/{content_code}', NiconicoChannelPlusIE) f'{self._get_channel_url(site_url)}/video/{entry["content_code"]}', ie=NiconicoChannelPlusIE,
video_id=entry['content_code'], video_title=entry.get('title'))
class NiconicoChannelPlusChannelVideosIE(NiconicoChannelPlusChannelBaseIE): class NiconicoChannelPlusChannelVideosIE(NiconicoChannelPlusChannelBaseIE):
@ -275,7 +557,7 @@ class NiconicoChannelPlusChannelVideosIE(NiconicoChannelPlusChannelBaseIE):
'url': 'https://nicochannel.jp/testjirou/videos', 'url': 'https://nicochannel.jp/testjirou/videos',
'info_dict': { 'info_dict': {
'id': 'testjirou-videos', 'id': 'testjirou-videos',
'title': 'チャンネルプラステスト二郎-videos', 'title': 'チャンネルプラステスト"二郎21-videos',
}, },
'playlist_mincount': 12, 'playlist_mincount': 12,
}, { }, {
@ -336,6 +618,10 @@ class NiconicoChannelPlusChannelVideosIE(NiconicoChannelPlusChannelBaseIE):
'playlist_mincount': 6, 'playlist_mincount': 6,
}] }]
def _extract_from_webpage(self, url, webpage):
if re.search(r'/videos/?(?:[\?#]|$)', url) and self._is_channel_plus_webpage(webpage):
yield self._real_extract(url)
def _real_extract(self, url): def _real_extract(self, url):
""" """
API parameters: API parameters:
@ -353,23 +639,23 @@ def _real_extract(self, url):
5 アップロード動画 (uploaded videos) 5 アップロード動画 (uploaded videos)
""" """
channel_id = self._match_id(url) channel_id = self._get_channel_id(url)
fanclub_site_id = self._find_fanclub_site_id(channel_id)
channel_name = self._get_channel_base_info(fanclub_site_id).get('fanclub_site_name')
qs = parse_qs(url) qs = parse_qs(url)
return self.playlist_result( return self.playlist_result(
OnDemandPagedList( OnDemandPagedList(
functools.partial( functools.partial(
self._fetch_paged_channel_video_list, f'fanclub_sites/{fanclub_site_id}/video_pages', self._fetch_paged_channel_video_list,
url,
f'fanclub_sites/{self._get_fanclub_site_id(url)}/video_pages',
filter_dict({ filter_dict({
'tag': traverse_obj(qs, ('tag', 0)), 'tag': traverse_obj(qs, ('tag', 0)),
'sort': traverse_obj(qs, ('sort', 0), default='-released_at'), 'sort': traverse_obj(qs, ('sort', 0), default='-released_at'),
'vod_type': traverse_obj(qs, ('vodType', 0), default='0'), 'vod_type': traverse_obj(qs, ('vodType', 0), default='0'),
}), }),
channel_id, f'{channel_id}/videos'), f'{channel_id}/videos'),
self._PAGE_SIZE), self._PAGE_SIZE),
playlist_id=f'{channel_id}-videos', playlist_title=f'{channel_name}-videos') playlist_id=f'{channel_id}-videos', playlist_title=f'{self._get_channel_name(url)}-videos')
class NiconicoChannelPlusChannelLivesIE(NiconicoChannelPlusChannelBaseIE): class NiconicoChannelPlusChannelLivesIE(NiconicoChannelPlusChannelBaseIE):
@ -399,6 +685,10 @@ class NiconicoChannelPlusChannelLivesIE(NiconicoChannelPlusChannelBaseIE):
'playlist_mincount': 6, 'playlist_mincount': 6,
}] }]
def _extract_from_webpage(self, url, webpage):
if re.search(r'/lives/?(?:[\?#]|$)', url) and self._is_channel_plus_webpage(webpage):
yield self._real_extract(url)
def _real_extract(self, url): def _real_extract(self, url):
""" """
API parameters: API parameters:
@ -410,17 +700,15 @@ def _real_extract(self, url):
We use "4" instead of "3" because some recently ended live streams could not be downloaded. We use "4" instead of "3" because some recently ended live streams could not be downloaded.
""" """
channel_id = self._match_id(url) channel_id = self._get_channel_id(url)
fanclub_site_id = self._find_fanclub_site_id(channel_id)
channel_name = self._get_channel_base_info(fanclub_site_id).get('fanclub_site_name')
return self.playlist_result( return self.playlist_result(
OnDemandPagedList( OnDemandPagedList(
functools.partial( functools.partial(
self._fetch_paged_channel_video_list, f'fanclub_sites/{fanclub_site_id}/live_pages', self._fetch_paged_channel_video_list,
{ url,
'live_type': 4, f'fanclub_sites/{self._get_fanclub_site_id(url)}/live_pages',
}, {'live_type': 4},
channel_id, f'{channel_id}/lives'), f'{channel_id}/lives'),
self._PAGE_SIZE), self._PAGE_SIZE),
playlist_id=f'{channel_id}-lives', playlist_title=f'{channel_name}-lives') playlist_id=f'{channel_id}-lives', playlist_title=f'{self._get_channel_name(url)}-lives')