1
0
Fork 0
mirror of https://github.com/yt-dlp/yt-dlp.git synced 2025-03-09 12:50:23 -05:00

Some improvements to the code, more to come.

This commit is contained in:
florty2 2025-03-09 18:11:54 +11:00
parent d4d26564fd
commit 6ec0b6cd61

View file

@ -1,4 +1,3 @@
import json
import random
import re
import urllib.parse
@ -105,24 +104,11 @@ def _websocket_data(self, username, chat_servers):
return message, php_message
def _get_camserver(self, servers, key):
server_type = None
value = None
h5video_servers = servers['h5video_servers']
ngvideo_servers = servers['ngvideo_servers']
wzobs_servers = servers['wzobs_servers']
if h5video_servers.get(str(key)):
value = h5video_servers[str(key)]
server_type = 'h5video_servers'
elif wzobs_servers.get(str(key)):
value = wzobs_servers[str(key)]
server_type = 'wzobs_servers'
elif ngvideo_servers.get(str(key)):
value = ngvideo_servers[str(key)]
server_type = 'ngvideo_servers'
return value, server_type
server_types = ['h5video_servers', 'wzobs_servers', 'ngvideo_servers']
for server_type in server_types:
if servers[server_type].get(str(key)):
return servers[server_type][str(key)], server_type
return None, None
def _get_streams(self, url):
self.video_id = self._match_id(url)
@ -133,36 +119,33 @@ def _get_streams(self, url):
message, php_message = self._websocket_data(self.video_id, chat_servers)
if self.video_id:
self.write_debug('Attempting to use WebSocket data')
data = self._dict_re.search(message)
if data is None:
raise ExtractorError('Could not find data in WebSocket message')
data = parse_json(data.group('data'))
self.write_debug('Attempting to use WebSocket data')
data = self._search_json(r'(\w+) (\w+) (\w+) (\w+) (\w+)', message, name='ws_data', video_id=self.video_id)
if not data:
raise ExtractorError('Could not find data in WebSocket message')
vs = data['vs']
ok_vs = [0, 90]
if vs not in ok_vs:
if vs == 2:
error = ('Model is currently away')
elif vs == 12:
error = ('Model is currently in a private show')
elif vs == 13:
error = ('Model is currently in a group show')
elif vs == 14:
error = ('Model is currently in a club show')
elif vs == 127:
error = ('Model is currently offline')
else:
error = (f'Stream status: {vs}')
raise ExtractorError(error, expected=True)
try:
vs = data['vs']
ok_vs = [0, 90]
if vs not in ok_vs:
error_messages = {
2: 'Model is currently away',
12: 'Model is currently in a private show',
13: 'Model is currently in a group show',
14: 'Model is currently in a club show',
127: 'Model is currently offline',
}
error = error_messages.get(vs, f'Stream status: {vs}')
raise ExtractorError(error, expected=True)
self.write_debug(f'VS: {vs}')
self.write_debug(f'VS: {vs}')
nm = data['nm']
uid = data['uid']
uid_video = uid + 100000000
camserver = data['u']['camserv']
nm = data['nm']
uid = data['uid']
uid_video = uid + 100000000
camserver = data['u']['camserv']
except KeyError:
raise ExtractorError('Could not find required data in WebSocket message')
server, server_type = self._get_camserver(servers, camserver)
@ -198,33 +181,7 @@ def _get_streams(self, url):
}
def _real_extract(self, url):
if not websockets:
raise ImportError('This extractor needs websockets installed')
self.write_debug(self._get_streams(url=url))
return self._get_streams(url=url)
def _parse(parser, data, name, *args, **kwargs):
try:
parsed = parser(data, *args, **kwargs)
except Exception as err:
snippet = repr(data)
if len(snippet) > 35:
snippet = f'{snippet[:35]} ...'
raise ExtractorError(f'Unable to parse {name}: {err} ({snippet})')
return parsed
def parse_json(
data,
name='JSON',
schema=None,
*args,
**kwargs,
):
"""Wrapper around json.loads.
Provides these extra features:
- Wraps errors in custom exception with a snippet of the data in the message
"""
return _parse(*args, **kwargs, parser=json.loads, data=data, name=name)