Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix 更新虎牙 2024 #472

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 121 additions & 46 deletions huya.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,133 @@
# 获取虎牙直播的真实流媒体地址。

import json
import requests
import re
import base64
import urllib.parse
import hashlib
from urllib.parse import parse_qs, urlencode, unquote
from datetime import datetime
import random
import time
import traceback


class HuYa:
def __init__(self, room_id: str):
self.room_id = room_id

def get_anonymous_uid(self):
data = {
"appId": 5002,
"byPass": 3,
"context": "",
"version": "2.4",
"data": {}
}
url = "https://udblgn.huya.com/web/anonymousLogin"
try:
resp = requests.post(url, json=data, timeout=5)
resp.raise_for_status()
return resp.json()["data"]["uid"]
except requests.RequestException as e:
raise ConnectionError(f"获取匿名UID失败: {e}")

@staticmethod
def get_uuid():
now = datetime.now().timestamp() * 1000
rand = random.randint(0, 1000) | 0
return int((now % 10000000000 * 1000 + rand) % 4294967295)

def process_anticode(self, anticode, uid, streamname):
url_query = dict(parse_qs(anticode))
platform_id = 102 # web = 100, mobile = 103
url_query['ctype'][0] = 'tars_mp'
uid = int(uid)
convert_uid = (uid << 8 | uid >> (32 - 8)) & 0xFFFFFFFF
ws_time = url_query['wsTime'][0]
seq_id = uid + int(time.time() * 1000)
ws_secret_prefix = base64.b64decode(unquote(url_query['fm'][0]).encode()).decode().split('_')[0]
ws_secret_hash = hashlib.md5(f"{seq_id}|{url_query['ctype'][0]}|{platform_id}".encode()).hexdigest()
ws_secret = hashlib.md5(f'{ws_secret_prefix}_{convert_uid}_{streamname}_{ws_secret_hash}_{ws_time}'.encode()).hexdigest()

query_dict = {
"ctype": url_query['ctype'][0],
"fs": url_query['fs'][0],
"sv": 2401090219,
"ver": 1,
"seqid": seq_id,
"uid": convert_uid,
"uuid": self.get_uuid(),
"wsSecret": ws_secret,
"wsTime": ws_time,
"t": platform_id,
# -- Attributes below are optional
"sdk_sid": int(time.time() * 1000),
"codec": "264",
"sphdDC": "huya",
"exsphd": url_query['exsphd'][0],
"sphd": url_query['sphd'][0],
"sphdcdn": url_query['sphdcdn'][0],
}

def live(e):
i, b = e.split('?')
r = i.split('/')
s = re.sub(r'.(flv|m3u8)', '', r[-1])
c = b.split('&', 3)
c = [i for i in c if i != '']
n = {i.split('=')[0]: i.split('=')[1] for i in c}
fm = urllib.parse.unquote(n['fm'])
u = base64.b64decode(fm).decode('utf-8')
p = u.split('_')[0]
f = str(int(time.time() * 1e7))
l = n['wsTime']
t = '0'
h = '_'.join([p, t, s, f, l])
m = hashlib.md5(h.encode('utf-8')).hexdigest()
y = c[-1]
url = "{}?wsSecret={}&wsTime={}&u={}&seqid={}&{}".format(i, m, l, t, f, y)
return url


def get_real_url(room_id):
try:
room_url = 'https://m.huya.com/' + str(room_id)
return urlencode(query_dict)

def get_stream_info(self, info):
stream_info = dict({'flv': {}, 'hls': {}})
cdn_map = dict({'AL': '阿里', 'TX': '腾讯', 'HW': '华为', 'HS': '火山', 'WS': '网宿', 'HY': '虎牙'})
uid = self.get_anonymous_uid()

streams = info.get("roomInfo", {}).get("tLiveInfo", {}).get("tLiveStreamInfo", {}).get("vStreamInfo", {}).get("value", [])
for s in streams:
cdn_type = cdn_map.get(s["sCdnType"], s["sCdnType"])
if s["sFlvUrl"]:
url = s["sFlvUrl"].replace('http://', 'https://')
stream_info["flv"][cdn_type] = "{}/{}.{}?{}".format(
url, s["sStreamName"], s["sFlvUrlSuffix"], self.process_anticode(s["sFlvAntiCode"], uid, s["sStreamName"])
)
if s["sHlsUrl"]:
url = s["sHlsUrl"].replace('http://', 'https://')
stream_info["hls"][cdn_type] = "{}/{}.{}?{}".format(
url, s["sStreamName"], s["sHlsUrlSuffix"], self.process_anticode(s["sHlsAntiCode"], uid, s["sStreamName"]))
return stream_info

def get_real_url(self):
room_url = 'https://m.huya.com/' + str(self.room_id)
header = {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': 'Mozilla/5.0 (Linux; Android 5.0; SM-G900P Build/LRX21T) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/75.0.3770.100 Mobile Safari/537.36 '
'User-Agent':
'Mozilla/5.0 (Linux; Android 5.0; SM-G900P Build/LRX21T) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/75.0.3770.100 Mobile Safari/537.36 '
}
response = requests.get(url=room_url, headers=header).text
liveLineUrl = re.findall(r'"liveLineUrl":"([\s\S]*?)",', response)[0]
liveline = base64.b64decode(liveLineUrl).decode('utf-8')
if liveline:
if 'replay' in liveline:
return '直播录像:' + liveline
try:
response = requests.get(url=room_url, headers=header, timeout=10).text
match = re.search(r'\s*<script>\s*window.HNF_GLOBAL_INIT\s*=\s*(\{.*?\})\s*</script>', response, re.DOTALL)
if not match:
return {"errors": "虎牙响应格式已更改"}

room_info_str = match.group(1)
# Remove js functions
room_info_str = re.sub(r'function\s*\([^{]*\{[^}]*\}', '""', room_info_str)
room_info = json.loads(room_info_str)
if "roomInfo" not in room_info:
return {"errors": f"直播间({self.room_id})不存在"}

live_status = room_info["roomInfo"].get("eLiveStatus")
if live_status == 2:
return self.get_stream_info(room_info)
elif live_status == 3:
print('该直播间正在回放历史直播,低清晰度源地址为:')
return "https:{}".format(base64.b64decode(room_info["roomProfile"]["liveLineUrl"]).decode('utf-8'))
else:
liveline = live(liveline)
real_url = ("https:" + liveline).replace("hls", "flv").replace("m3u8", "flv")
else:
real_url = '未开播或直播间不存在'
except:
real_url = '未开播或直播间不存在'
return real_url


rid = input('输入虎牙直播房间号:\n')
real_url = get_real_url(rid)
print('该直播间源地址为:')
print(real_url)
return {"errors": f"直播间({self.room_id})未开播"}
except requests.RequestException as e:
return {"errors": f"获取虎牙real url时出错: {e}"}
except Exception:
return {"errors": f"处理响应时出错: {traceback.format_exc()}"}


if __name__ == '__main__':
room_id = input('输入虎牙直播房间号:\n')
huya = HuYa(room_id)
real_url = huya.get_real_url()
print('该直播间源地址为:')
print(json.dumps(real_url, ensure_ascii=False))