From 61c9a938b390b8334ee3a879fe2d93f714e30138 Mon Sep 17 00:00:00 2001 From: bashonly <88596187+bashonly@users.noreply.github.com> Date: Tue, 29 Apr 2025 20:15:17 -0500 Subject: [PATCH] [ie/youtube] Cache signature timestamps (#13047) Closes #12825 Authored by: bashonly --- yt_dlp/extractor/youtube/_video.py | 64 ++++++++++++++++-------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/yt_dlp/extractor/youtube/_video.py b/yt_dlp/extractor/youtube/_video.py index 45ad62c133..693f9e9c3b 100644 --- a/yt_dlp/extractor/youtube/_video.py +++ b/yt_dlp/extractor/youtube/_video.py @@ -2122,23 +2122,23 @@ def inner(*args, **kwargs): return ret return inner - def _load_nsig_code_from_cache(self, player_url): - cache_id = ('youtube-nsig', self._player_js_cache_key(player_url)) + def _load_player_data_from_cache(self, name, player_url): + cache_id = (f'youtube-{name}', self._player_js_cache_key(player_url)) - if func_code := self._player_cache.get(cache_id): - return func_code + if data := self._player_cache.get(cache_id): + return data - func_code = self.cache.load(*cache_id, min_ver='2025.03.31') - if func_code: - self._player_cache[cache_id] = func_code + data = self.cache.load(*cache_id, min_ver='2025.03.31') + if data: + self._player_cache[cache_id] = data - return func_code + return data - def _store_nsig_code_to_cache(self, player_url, func_code): - cache_id = ('youtube-nsig', self._player_js_cache_key(player_url)) + def _store_player_data_to_cache(self, name, player_url, data): + cache_id = (f'youtube-{name}', self._player_js_cache_key(player_url)) if cache_id not in self._player_cache: - self.cache.store(*cache_id, func_code) - self._player_cache[cache_id] = func_code + self.cache.store(*cache_id, data) + self._player_cache[cache_id] = data def _decrypt_signature(self, s, video_id, player_url): """Turn the encrypted s field into a working signature""" @@ -2181,7 +2181,7 @@ def _decrypt_nsig(self, s, video_id, player_url): self.write_debug(f'Decrypted nsig {s} => {ret}') # Only cache nsig func JS code to disk if successful, and only once - self._store_nsig_code_to_cache(player_url, func_code) + self._store_player_data_to_cache('nsig', player_url, func_code) return ret def _extract_n_function_name(self, jscode, player_url=None): @@ -2300,7 +2300,7 @@ def _fixup_n_function_code(self, argnames, nsig_code, jscode, player_url): def _extract_n_function_code(self, video_id, player_url): player_id = self._extract_player_info(player_url) - func_code = self._load_nsig_code_from_cache(player_url) + func_code = self._load_player_data_from_cache('nsig', player_url) jscode = func_code or self._load_player(video_id, player_url) jsi = JSInterpreter(jscode) @@ -2336,23 +2336,27 @@ def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=F Extract signatureTimestamp (sts) Required to tell API what sig/player version is in use. """ - sts = None - if isinstance(ytcfg, dict): - sts = int_or_none(ytcfg.get('STS')) + if sts := traverse_obj(ytcfg, ('STS', {int_or_none})): + return sts + + if not player_url: + error_msg = 'Cannot extract signature timestamp without player url' + if fatal: + raise ExtractorError(error_msg) + self.report_warning(error_msg) + return None + + sts = self._load_player_data_from_cache('sts', player_url) + if sts: + return sts + + if code := self._load_player(video_id, player_url, fatal=fatal): + sts = int_or_none(self._search_regex( + r'(?:signatureTimestamp|sts)\s*:\s*(?P[0-9]{5})', code, + 'JS player signature timestamp', group='sts', fatal=fatal)) + if sts: + self._store_player_data_to_cache('sts', player_url, sts) - if not sts: - # Attempt to extract from player - if player_url is None: - error_msg = 'Cannot extract signature timestamp without player_url.' - if fatal: - raise ExtractorError(error_msg) - self.report_warning(error_msg) - return - code = self._load_player(video_id, player_url, fatal=fatal) - if code: - sts = int_or_none(self._search_regex( - r'(?:signatureTimestamp|sts)\s*:\s*(?P[0-9]{5})', code, - 'JS player signature timestamp', group='sts', fatal=fatal)) return sts def _mark_watched(self, video_id, player_responses):