Skip to content

Commit

Permalink
5.8.1 Improve performance.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-X-Net committed Oct 16, 2023
1 parent a210426 commit cd243e5
Show file tree
Hide file tree
Showing 30 changed files with 343 additions and 139 deletions.
41 changes: 0 additions & 41 deletions code/default/launcher/web_control.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,47 +896,6 @@ def req_debug_handler(self):
self.send_response("text/plain", dat)

def req_log_files(self):
# collect debug info and save to folders
debug_infos = {
"system_info": "http://localhost:8085/debug",
"xtunnel_info": "http://127.0.0.1:8085/module/x_tunnel/control/debug",
"xtunnel_status": "http://127.0.0.1:8085/module/x_tunnel/control/status",
"cloudflare_info": "http://127.0.0.1:8085/module/x_tunnel/control/cloudflare_front/debug",
"tls_info": "http://127.0.0.1:8085/module/x_tunnel/control/tls_relay_front/debug",
"seley_info": "http://127.0.0.1:8085/module/x_tunnel/control/seley_front/debug",
"cloudflare_log": "http://localhost:8085/module/x_tunnel/control/cloudflare_front/log?cmd=get_new&last_no=1",
"tls_log": "http://localhost:8085/module/x_tunnel/control/tls_relay_front/log?cmd=get_new&last_no=1",
"seley_log": "http://localhost:8085/module/x_tunnel/control/seley_front/log?cmd=get_new&last_no=1",
"xtunnel_log": "http://localhost:8085/module/x_tunnel/control/log?cmd=get_new&last_no=1",
"smartroute_log": "http://localhost:8085/module/smart_router/control/log?cmd=get_new&last_no=1",
"launcher_log": "http://localhost:8085/log?cmd=get_new&last_no=1"
}

download_path = os.path.join(env_info.data_path, "downloads")
if not os.path.isdir(download_path):
os.mkdir(download_path)

for name, url in debug_infos.items():
# xlog.debug("fetch %s %s", name, url)
try:
res = simple_http_client.request("GET", url, timeout=1)
if name.endswith("log"):
dat = json.loads(res.text)
no_line = list(dat.items())
no_line = [[int(line[0]), line[1]] for line in no_line]
no_line = sorted(no_line, key=operator.itemgetter(0))
lines = [line[1] for line in no_line]
data = "".join(lines)
data = utils.to_bytes(data)
else:
data = res.text

fn = os.path.join(download_path, name + ".txt")
with open(fn, "wb") as fd:
fd.write(data)
except Exception as e:
xlog.exception("fetch info %s fail:%r", url, e)

# pack data folder and response
x_tunnel_local = os.path.abspath(os.path.join(default_path, 'x_tunnel', 'local'))
sys.path.append(x_tunnel_local)
Expand Down
7 changes: 7 additions & 0 deletions code/default/lib/noarch/front_base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ def set_default(self):
self.set_var("dispather_max_idle_workers", 30)
self.set_var("dispather_worker_max_continue_fail", 8)
self.set_var("dispather_connect_all_workers_on_startup", 0)
self.set_var("dispather_ping_check_speed_interval", 300)
self.set_var("dispather_ping_upload_size", 1024)
self.set_var("dispather_ping_download_size", 10240)

self.set_var("max_task_num", 100)

Expand All @@ -30,6 +33,7 @@ def set_default(self):
self.set_var("http1_ping_interval", 300)
self.set_var("http1_idle_time", 360)
self.set_var("http1_max_process_tasks", 99999999)
self.set_var("http1_trace_size", 20)

# http 2 worker
self.set_var("http2_max_concurrent", 60)
Expand Down Expand Up @@ -92,6 +96,9 @@ def set_default(self):
self.set_var("long_fail_connect_interval", 180)
self.set_var("short_fail_connect_interval", 10)
self.set_var("shuffle_ip_on_first_load", 0)
self.set_var("ip_speed_history_size", 10)
self.set_var("ip_initial_speed", 1000000)
self.set_var("ip_speed_save_interval", 60)

# ip source
self.set_var("use_ipv6", "auto") #force_ipv4/force_ipv6
Expand Down
10 changes: 7 additions & 3 deletions code/default/lib/noarch/front_base/connect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,18 @@ def _create_more_connection_worker(self):
self.logger.warning("Connect creating process blocked, max connect thread increase to %d",
self.config.https_max_connect_thread)

while self.thread_num < self.config.https_max_connect_thread and self._need_more_ip():

for i in range(self.thread_num, self.config.https_max_connect_thread):
self.thread_num_lock.acquire()
self.thread_num += 1
self.thread_num_lock.release()

p = threading.Thread(target=self._connect_thread, name="%s_conn_manager__connect_th" % self.logger.name)
p.start()
time.sleep(self.config.connect_create_interval)
if self.config.connect_create_interval > 0.1:
time.sleep(self.config.connect_create_interval)

if not self._need_more_ip():
break

with self.thread_num_lock:
self.connecting_more_thread = None
Expand Down
27 changes: 22 additions & 5 deletions code/default/lib/noarch/front_base/http1.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb

def record_active(self, active=""):
self.trace_time.append([time.time(), active])
if len(self.trace_time) > self.config.http1_trace_size:
self.trace_time.pop(0)
# self.logger.debug("%s stat:%s", self.ip, active)

def get_trace(self):
Expand Down Expand Up @@ -141,7 +143,8 @@ def request_task(self, task):

task.headers[b'Host'] = self.get_host(task.host)

task.headers[b"Content-Length"] = len(task.body)
request_len = len(task.body)
task.headers[b"Content-Length"] = request_len
request_data = b'%s %s HTTP/1.1\r\n' % (task.method, task.path)
request_data += pack_headers(task.headers)
request_data += b'\r\n'
Expand Down Expand Up @@ -184,7 +187,11 @@ def request_task(self, task):
response.worker = self
task.content_length = response.content_length
task.responsed = True
task.queue.put(response)
if task.queue:
task.queue.put(response)
else:
if self.config.http2_show_debug:
self.logger.debug("got pong for %s status:%d", self.ip_str, response.status)

try:
read_target = int(response.content_length)
Expand Down Expand Up @@ -219,9 +226,19 @@ def request_task(self, task):
task.finish()

self.ssl_sock.received_size += data_len
time_cost = (time.time() - start_time)
if time_cost != 0:
speed = data_len / time_cost

time_now = time.time()
time_cost = (time_now - start_time)
xcost = float(response.headers.get(b"X-Cost", 0))
if isinstance(xcost, list):
xcost = float(xcost[0])

total_len = (request_len + data_len)
road_time = time_cost - xcost
if xcost and total_len > 10000 and road_time:
speed = total_len / road_time
self.update_speed(speed)

task.set_state("h1_finish[SP:%d]" % speed)

self.transfered_size += len(request_data) + data_len
Expand Down
4 changes: 2 additions & 2 deletions code/default/lib/noarch/front_base/http2_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,8 @@ def receive_frame(self, frame):
# code registry otherwise use the frame's additional data.
error_string = frame._extra_info()
time_cost = time.time() - self.last_recv_time
if frame.additional_data != b"session_timed_out":
self.logger.warn("goaway:%s, t:%d", error_string, time_cost)
# if frame.additional_data != b"session_timed_out":
# self.logger.warn("goaway:%s, t:%d", error_string, time_cost)

self.close("GoAway:%s inactive time:%d" % (error_string, time_cost))

Expand Down
17 changes: 11 additions & 6 deletions code/default/lib/noarch/front_base/http2_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,11 +316,13 @@ def receive_frame(self, frame):

time_now = time.time()
whole_cost = time_now - self.start_time
rtt = whole_cost - xcost
receive_cost = time_now - self.get_head_time
bytes_received = self.connection._sock.bytes_received - self.start_connection_point
if receive_cost > 0 and bytes_received > 10000 and not self.task.finished and receive_cost > 0.001 \
and xcost >= 0:
rtt = whole_cost - xcost
if b"ping" in self.task.path and self.config.http2_show_debug:
self.logger.debug("got pong for %s", self.connection.ip_str)

if rtt > 0 and bytes_received >= 10000 and not self.task.finished and xcost >= 0.0:
t_road = rtt
if t_road <= self.connection.handshake:
# adjust handshake
Expand Down Expand Up @@ -367,10 +369,13 @@ def send_response(self):
response.ssl_sock = self.connection.ssl_sock
response.worker = self.connection
response.task = self.task
if self.config.http2_show_debug:
self.logger.debug("self.task.queue.put(response)")

self.task.queue.put(response)
if self.task.queue:
self.task.queue.put(response)
else:
if self.config.http2_show_debug:
self.logger.debug("got pong for %s status:%d", self.connection.ip_str, status)

if status in self.config.http2_status_to_close:
self.connection.close("status %d" % status)

Expand Down
45 changes: 16 additions & 29 deletions code/default/lib/noarch/front_base/http_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ def response_fail(self, reason=""):
res = simple_http_client.BaseResponse(body=err_text)
res.task = self
res.worker = self.worker
self.queue.put(res)
if self.queue:
self.queue.put(res)
self.finish()

def finish(self):
Expand All @@ -198,7 +199,6 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
self.ssl_sock = ssl_sock
self.handshake = ssl_sock.handshake_time * 0.001
self.rtt = ssl_sock.handshake_time * 0.001
self.speed = 15000000
self.streams = []
self.ip_str = ssl_sock.ip_str
self.close_cb = close_cb
Expand All @@ -213,7 +213,6 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
self.continue_fail_tasks = 0
self.rtt_history = [self.rtt,]
self.adjust_history = []
self.speed_history = [self.speed, self.speed, self.speed]
self.last_recv_time = self.ssl_sock.create_time
self.last_send_time = self.ssl_sock.create_time
self.life_end_time = self.ssl_sock.create_time + \
Expand All @@ -228,12 +227,11 @@ def __str__(self):
o += " continue_fail_tasks: %s\r\n" % (self.continue_fail_tasks)
o += " handshake: %f \r\n" % self.handshake
o += " rtt_history: %s\r\n" % (self.rtt_history)
o += " speed_history: %s\r\n" % (self.speed_history)
o += " adjust_history: %s\r\n" % (self.adjust_history)
if self.version != "1.1":
o += "streams: %d\r\n" % len(self.streams)
o += " rtt: %f\r\n" % (self.rtt)
o += " speed: %f\r\n" % (self.speed)
o += " speed: %f\r\n" % (self.ip_manager.get_speed(self.ip_str))
o += " score: %f\r\n" % (self.get_score())
return o

Expand All @@ -250,20 +248,9 @@ def update_rtt(self, rtt, predict_rtt=None):
self.adjust_history.pop(0)

def update_speed(self, speed):
self.speed_history.append(speed)
if len(self.speed_history) > 10:
self.speed_history.pop(0)
self.speed = sum(self.speed_history) / len(self.speed_history)
self.ip_manager.update_speed(self.ip_str, speed)

def update_debug_data(self, rtt, sent, received, speed):
# if sent + received > 10000:
# self.speed_history.append(speed)
# if len(self.speed_history) > 10:
# self.speed_history.pop(0)
# self.speed = sum(self.speed_history) / len(self.speed_history)
# else:
# self.rtt = rtt

self.log_debug_data(rtt, sent, received)
return

Expand Down Expand Up @@ -296,23 +283,23 @@ def get_score(self):
if self.processed_tasks == 0 and len(self.streams) == 0:
score /= 3

speed = self.ip_manager.get_speed(self.ip_str)
if self.version == "1.1":
score += self.max_payload / self.speed
return score

response_body_len = self.max_payload
for _, stream in self.streams.items():
if stream.response_body_len == 0:
response_body_len += self.max_payload
else:
response_body_len += stream.response_body_len - stream.task.body_len
score += response_body_len / self.speed
score += self.max_payload / speed
else:
response_body_len = self.max_payload
for _, stream in self.streams.items():
if stream.response_body_len == 0:
response_body_len += self.max_payload
else:
response_body_len += stream.response_body_len - stream.task.body_len
score += response_body_len / speed

score += len(self.streams) * 0.06
score += len(self.streams) * 0.06

if self.config.show_state_debug:
self.logger.debug("get_score %s, speed:%f rtt:%d stream_num:%d score:%f", self.ip_str,
self.speed * 0.000001, self.rtt * 1000, len(self.streams), score)
speed * 0.000001, self.rtt * 1000, len(self.streams), score)

return score

Expand Down
Loading

0 comments on commit cd243e5

Please sign in to comment.