diff --git a/code/default/launcher/web_control.py b/code/default/launcher/web_control.py index 634cd366d1..9ef71d96ff 100644 --- a/code/default/launcher/web_control.py +++ b/code/default/launcher/web_control.py @@ -896,47 +896,6 @@ def req_debug_handler(self): self.send_response("text/plain", dat) def req_log_files(self): - # collect debug info and save to folders - debug_infos = { - "system_info": "http://localhost:8085/debug", - "xtunnel_info": "http://127.0.0.1:8085/module/x_tunnel/control/debug", - "xtunnel_status": "http://127.0.0.1:8085/module/x_tunnel/control/status", - "cloudflare_info": "http://127.0.0.1:8085/module/x_tunnel/control/cloudflare_front/debug", - "tls_info": "http://127.0.0.1:8085/module/x_tunnel/control/tls_relay_front/debug", - "seley_info": "http://127.0.0.1:8085/module/x_tunnel/control/seley_front/debug", - "cloudflare_log": "http://localhost:8085/module/x_tunnel/control/cloudflare_front/log?cmd=get_new&last_no=1", - "tls_log": "http://localhost:8085/module/x_tunnel/control/tls_relay_front/log?cmd=get_new&last_no=1", - "seley_log": "http://localhost:8085/module/x_tunnel/control/seley_front/log?cmd=get_new&last_no=1", - "xtunnel_log": "http://localhost:8085/module/x_tunnel/control/log?cmd=get_new&last_no=1", - "smartroute_log": "http://localhost:8085/module/smart_router/control/log?cmd=get_new&last_no=1", - "launcher_log": "http://localhost:8085/log?cmd=get_new&last_no=1" - } - - download_path = os.path.join(env_info.data_path, "downloads") - if not os.path.isdir(download_path): - os.mkdir(download_path) - - for name, url in debug_infos.items(): - # xlog.debug("fetch %s %s", name, url) - try: - res = simple_http_client.request("GET", url, timeout=1) - if name.endswith("log"): - dat = json.loads(res.text) - no_line = list(dat.items()) - no_line = [[int(line[0]), line[1]] for line in no_line] - no_line = sorted(no_line, key=operator.itemgetter(0)) - lines = [line[1] for line in no_line] - data = "".join(lines) - data = utils.to_bytes(data) - else: - data = res.text - - fn = os.path.join(download_path, name + ".txt") - with open(fn, "wb") as fd: - fd.write(data) - except Exception as e: - xlog.exception("fetch info %s fail:%r", url, e) - # pack data folder and response x_tunnel_local = os.path.abspath(os.path.join(default_path, 'x_tunnel', 'local')) sys.path.append(x_tunnel_local) diff --git a/code/default/lib/noarch/front_base/config.py b/code/default/lib/noarch/front_base/config.py index f0b8bd346b..df30dab693 100644 --- a/code/default/lib/noarch/front_base/config.py +++ b/code/default/lib/noarch/front_base/config.py @@ -22,6 +22,9 @@ def set_default(self): self.set_var("dispather_max_idle_workers", 30) self.set_var("dispather_worker_max_continue_fail", 8) self.set_var("dispather_connect_all_workers_on_startup", 0) + self.set_var("dispather_ping_check_speed_interval", 300) + self.set_var("dispather_ping_upload_size", 1024) + self.set_var("dispather_ping_download_size", 10240) self.set_var("max_task_num", 100) @@ -30,6 +33,7 @@ def set_default(self): self.set_var("http1_ping_interval", 300) self.set_var("http1_idle_time", 360) self.set_var("http1_max_process_tasks", 99999999) + self.set_var("http1_trace_size", 20) # http 2 worker self.set_var("http2_max_concurrent", 60) @@ -92,6 +96,9 @@ def set_default(self): self.set_var("long_fail_connect_interval", 180) self.set_var("short_fail_connect_interval", 10) self.set_var("shuffle_ip_on_first_load", 0) + self.set_var("ip_speed_history_size", 10) + self.set_var("ip_initial_speed", 1000000) + self.set_var("ip_speed_save_interval", 60) # ip source self.set_var("use_ipv6", "auto") #force_ipv4/force_ipv6 diff --git a/code/default/lib/noarch/front_base/connect_manager.py b/code/default/lib/noarch/front_base/connect_manager.py index 61fb7dbe12..559ae11bd6 100644 --- a/code/default/lib/noarch/front_base/connect_manager.py +++ b/code/default/lib/noarch/front_base/connect_manager.py @@ -243,14 +243,18 @@ def _create_more_connection_worker(self): self.logger.warning("Connect creating process blocked, max connect thread increase to %d", self.config.https_max_connect_thread) - while self.thread_num < self.config.https_max_connect_thread and self._need_more_ip(): - + for i in range(self.thread_num, self.config.https_max_connect_thread): self.thread_num_lock.acquire() self.thread_num += 1 self.thread_num_lock.release() + p = threading.Thread(target=self._connect_thread, name="%s_conn_manager__connect_th" % self.logger.name) p.start() - time.sleep(self.config.connect_create_interval) + if self.config.connect_create_interval > 0.1: + time.sleep(self.config.connect_create_interval) + + if not self._need_more_ip(): + break with self.thread_num_lock: self.connecting_more_thread = None diff --git a/code/default/lib/noarch/front_base/http1.py b/code/default/lib/noarch/front_base/http1.py index e907f3c966..412abc0c17 100644 --- a/code/default/lib/noarch/front_base/http1.py +++ b/code/default/lib/noarch/front_base/http1.py @@ -40,6 +40,8 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb def record_active(self, active=""): self.trace_time.append([time.time(), active]) + if len(self.trace_time) > self.config.http1_trace_size: + self.trace_time.pop(0) # self.logger.debug("%s stat:%s", self.ip, active) def get_trace(self): @@ -141,7 +143,8 @@ def request_task(self, task): task.headers[b'Host'] = self.get_host(task.host) - task.headers[b"Content-Length"] = len(task.body) + request_len = len(task.body) + task.headers[b"Content-Length"] = request_len request_data = b'%s %s HTTP/1.1\r\n' % (task.method, task.path) request_data += pack_headers(task.headers) request_data += b'\r\n' @@ -184,7 +187,11 @@ def request_task(self, task): response.worker = self task.content_length = response.content_length task.responsed = True - task.queue.put(response) + if task.queue: + task.queue.put(response) + else: + if self.config.http2_show_debug: + self.logger.debug("got pong for %s status:%d", self.ip_str, response.status) try: read_target = int(response.content_length) @@ -219,9 +226,19 @@ def request_task(self, task): task.finish() self.ssl_sock.received_size += data_len - time_cost = (time.time() - start_time) - if time_cost != 0: - speed = data_len / time_cost + + time_now = time.time() + time_cost = (time_now - start_time) + xcost = float(response.headers.get(b"X-Cost", 0)) + if isinstance(xcost, list): + xcost = float(xcost[0]) + + total_len = (request_len + data_len) + road_time = time_cost - xcost + if xcost and total_len > 10000 and road_time: + speed = total_len / road_time + self.update_speed(speed) + task.set_state("h1_finish[SP:%d]" % speed) self.transfered_size += len(request_data) + data_len diff --git a/code/default/lib/noarch/front_base/http2_connection.py b/code/default/lib/noarch/front_base/http2_connection.py index c73aff210a..05ba8bd380 100644 --- a/code/default/lib/noarch/front_base/http2_connection.py +++ b/code/default/lib/noarch/front_base/http2_connection.py @@ -462,8 +462,8 @@ def receive_frame(self, frame): # code registry otherwise use the frame's additional data. error_string = frame._extra_info() time_cost = time.time() - self.last_recv_time - if frame.additional_data != b"session_timed_out": - self.logger.warn("goaway:%s, t:%d", error_string, time_cost) + # if frame.additional_data != b"session_timed_out": + # self.logger.warn("goaway:%s, t:%d", error_string, time_cost) self.close("GoAway:%s inactive time:%d" % (error_string, time_cost)) diff --git a/code/default/lib/noarch/front_base/http2_stream.py b/code/default/lib/noarch/front_base/http2_stream.py index d98154c258..e8245e9949 100644 --- a/code/default/lib/noarch/front_base/http2_stream.py +++ b/code/default/lib/noarch/front_base/http2_stream.py @@ -316,11 +316,13 @@ def receive_frame(self, frame): time_now = time.time() whole_cost = time_now - self.start_time + rtt = whole_cost - xcost receive_cost = time_now - self.get_head_time bytes_received = self.connection._sock.bytes_received - self.start_connection_point - if receive_cost > 0 and bytes_received > 10000 and not self.task.finished and receive_cost > 0.001 \ - and xcost >= 0: - rtt = whole_cost - xcost + if b"ping" in self.task.path and self.config.http2_show_debug: + self.logger.debug("got pong for %s", self.connection.ip_str) + + if rtt > 0 and bytes_received >= 10000 and not self.task.finished and xcost >= 0.0: t_road = rtt if t_road <= self.connection.handshake: # adjust handshake @@ -367,10 +369,13 @@ def send_response(self): response.ssl_sock = self.connection.ssl_sock response.worker = self.connection response.task = self.task - if self.config.http2_show_debug: - self.logger.debug("self.task.queue.put(response)") - self.task.queue.put(response) + if self.task.queue: + self.task.queue.put(response) + else: + if self.config.http2_show_debug: + self.logger.debug("got pong for %s status:%d", self.connection.ip_str, status) + if status in self.config.http2_status_to_close: self.connection.close("status %d" % status) diff --git a/code/default/lib/noarch/front_base/http_common.py b/code/default/lib/noarch/front_base/http_common.py index dd162fd68d..debebaa7dc 100644 --- a/code/default/lib/noarch/front_base/http_common.py +++ b/code/default/lib/noarch/front_base/http_common.py @@ -177,7 +177,8 @@ def response_fail(self, reason=""): res = simple_http_client.BaseResponse(body=err_text) res.task = self res.worker = self.worker - self.queue.put(res) + if self.queue: + self.queue.put(res) self.finish() def finish(self): @@ -198,7 +199,6 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb self.ssl_sock = ssl_sock self.handshake = ssl_sock.handshake_time * 0.001 self.rtt = ssl_sock.handshake_time * 0.001 - self.speed = 15000000 self.streams = [] self.ip_str = ssl_sock.ip_str self.close_cb = close_cb @@ -213,7 +213,6 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb self.continue_fail_tasks = 0 self.rtt_history = [self.rtt,] self.adjust_history = [] - self.speed_history = [self.speed, self.speed, self.speed] self.last_recv_time = self.ssl_sock.create_time self.last_send_time = self.ssl_sock.create_time self.life_end_time = self.ssl_sock.create_time + \ @@ -228,12 +227,11 @@ def __str__(self): o += " continue_fail_tasks: %s\r\n" % (self.continue_fail_tasks) o += " handshake: %f \r\n" % self.handshake o += " rtt_history: %s\r\n" % (self.rtt_history) - o += " speed_history: %s\r\n" % (self.speed_history) o += " adjust_history: %s\r\n" % (self.adjust_history) if self.version != "1.1": o += "streams: %d\r\n" % len(self.streams) o += " rtt: %f\r\n" % (self.rtt) - o += " speed: %f\r\n" % (self.speed) + o += " speed: %f\r\n" % (self.ip_manager.get_speed(self.ip_str)) o += " score: %f\r\n" % (self.get_score()) return o @@ -250,20 +248,9 @@ def update_rtt(self, rtt, predict_rtt=None): self.adjust_history.pop(0) def update_speed(self, speed): - self.speed_history.append(speed) - if len(self.speed_history) > 10: - self.speed_history.pop(0) - self.speed = sum(self.speed_history) / len(self.speed_history) + self.ip_manager.update_speed(self.ip_str, speed) def update_debug_data(self, rtt, sent, received, speed): - # if sent + received > 10000: - # self.speed_history.append(speed) - # if len(self.speed_history) > 10: - # self.speed_history.pop(0) - # self.speed = sum(self.speed_history) / len(self.speed_history) - # else: - # self.rtt = rtt - self.log_debug_data(rtt, sent, received) return @@ -296,23 +283,23 @@ def get_score(self): if self.processed_tasks == 0 and len(self.streams) == 0: score /= 3 + speed = self.ip_manager.get_speed(self.ip_str) if self.version == "1.1": - score += self.max_payload / self.speed - return score - - response_body_len = self.max_payload - for _, stream in self.streams.items(): - if stream.response_body_len == 0: - response_body_len += self.max_payload - else: - response_body_len += stream.response_body_len - stream.task.body_len - score += response_body_len / self.speed + score += self.max_payload / speed + else: + response_body_len = self.max_payload + for _, stream in self.streams.items(): + if stream.response_body_len == 0: + response_body_len += self.max_payload + else: + response_body_len += stream.response_body_len - stream.task.body_len + score += response_body_len / speed - score += len(self.streams) * 0.06 + score += len(self.streams) * 0.06 if self.config.show_state_debug: self.logger.debug("get_score %s, speed:%f rtt:%d stream_num:%d score:%f", self.ip_str, - self.speed * 0.000001, self.rtt * 1000, len(self.streams), score) + speed * 0.000001, self.rtt * 1000, len(self.streams), score) return score diff --git a/code/default/lib/noarch/front_base/http_dispatcher.py b/code/default/lib/noarch/front_base/http_dispatcher.py index 372df63380..34991d36a7 100644 --- a/code/default/lib/noarch/front_base/http_dispatcher.py +++ b/code/default/lib/noarch/front_base/http_dispatcher.py @@ -17,7 +17,7 @@ sorted by rtt and pipeline task on load. """ from six.moves import queue - +import random import operator import threading import time @@ -65,6 +65,8 @@ def __init__(self, logger, config, ip_manager, connection_manager, self.request_queue = Queue() self.workers = [] self.working_tasks = {} + self.account = "" + self.last_host = None self.h1_num = 0 self.h2_num = 0 self.last_request_time = time.time() @@ -95,6 +97,7 @@ def __init__(self, logger, config, ip_manager, connection_manager, "sent": 0, "received": 0 } + self.ping_speed_ip_str_last_active = {} # ip_str => last_active self.trigger_create_worker_cv = SimpleCondition() self.wait_a_worker_cv = SimpleCondition() @@ -138,9 +141,34 @@ def on_ssl_created_cb(self, ssl_sock, remove_slowest_worker=True): self.workers.append(worker) - if remove_slowest_worker: + if time.time() - self.ping_speed_ip_str_last_active.get(worker.ip_str, 0) > self.config.dispather_ping_check_speed_interval: + self.ping_speed(worker) + self.ping_speed_ip_str_last_active[worker.ip_str] = time.time() + + elif remove_slowest_worker: self._remove_slowest_worker() + def ping_speed(self, worker): + if not self.last_host: + return + + method = b"POST" + path = b"/ping?content_length=%d" % self.config.dispather_ping_download_size + body = utils.to_bytes(utils.generate_random_lowercase(self.config.dispather_ping_upload_size)) + headers = { + b"Padding": utils.to_str(utils.generate_random_lowercase(random.randint(64, 256))), + b"Xx-Account": self.account, + b"X-Host": self.last_host, + b"X-Path": path + } + + task = http_common.Task(self.logger, self.config, method, self.last_host, path, + headers, body, None, "/ping", 5) + task.set_state("start_ping_request") + # self.logger.debug("send ping for %s", worker.ip_str) + + worker.request(task) + def _on_worker_idle_cb(self): self.wait_a_worker_cv.notify() @@ -261,7 +289,7 @@ def get_worker(self, nowait=False): def _remove_slowest_worker(self): # close slowest worker, - # give change for better worker + # give chance for better worker while True: slowest_score = 9999 slowest_worker = None @@ -307,6 +335,8 @@ def request(self, method, host, path, headers, body, url=b"", timeout=60): with self.task_count_lock: self.task_count += 1 + self.last_host = host + try: if not url: url = b"%s %s%s" % (method, host, path) @@ -411,6 +441,7 @@ def dispatcher(self): get_worker_time = time.time() get_cost = get_worker_time - get_time + self.ping_speed_ip_str_last_active[worker.ip_str] = get_worker_time task.set_state("get_worker(%d):%s" % (get_cost, worker.ip_str)) task.worker = worker task.predict_rtt = worker.get_score() @@ -524,7 +555,7 @@ def to_string(self): w_r = sorted(list(worker_rate.items()), key=operator.itemgetter(1)) - out_str = 'thread num:%d\r\n' % threading.activeCount() + out_str = 'thread num:%d\r\n' % threading.active_count() for w, r in w_r: out_str += "%s score:%d rtt:%d running:%d accept:%d live:%d inactive:%d processed:%d" % \ (w.ip_str, w.get_score(), w.rtt, w.keep_running, w.accept_task, @@ -536,10 +567,6 @@ def to_string(self): elif w.version == "1.1": out_str += " Trace:%s" % w.get_trace() - out_str += "\r\n Speed:" - for speed in w.speed_history: - out_str += "%d," % speed - out_str += "\r\n" out_str += "\r\n working_tasks:\r\n" diff --git a/code/default/lib/noarch/front_base/ip_manager.py b/code/default/lib/noarch/front_base/ip_manager.py index b67d23b061..16a50c039c 100644 --- a/code/default/lib/noarch/front_base/ip_manager.py +++ b/code/default/lib/noarch/front_base/ip_manager.py @@ -1,5 +1,6 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import json from six.moves import queue import operator import os @@ -12,7 +13,7 @@ class IpManagerBase(): - def __init__(self, config, ip_source, logger): + def __init__(self, config, ip_source, logger, speed_fn=None): self.scan_thread_lock = threading.Lock() self.ip_lock = threading.Lock() @@ -20,6 +21,48 @@ def __init__(self, config, ip_source, logger): self.ip_source = ip_source self.logger = logger self.ips = [] + self.speed_fn = speed_fn + self.speed_info = self.load_speed_info() + self.speed_info_last_save_time = time.time() + + def load_speed_info(self): + if not self.speed_fn or not os.path.isfile(self.speed_fn): + return {} + + try: + with open(self.speed_fn, "r") as fd: + info = json.load(fd) + return info + except Exception as e: + self.logger.exception("load speed info %s failed:%r", self.speed_fn, e) + return {} + + def save_speed_info(self): + if not self.speed_fn: + return + + try: + with open(self.speed_fn, "w") as fd: + json.dump(self.speed_info, fd, indent=2) + except Exception as e: + self.logger.exception("save speed info %s fail:%r", self.speed_fn, e) + + def update_speed(self, ip_str, speed): + ip_str = utils.to_str(ip_str) + ip_info = self.speed_info.setdefault(ip_str, {}) + ip_info.setdefault("history", []).append(speed) + if len(ip_info["history"]) > self.config.ip_speed_history_size: + ip_info["history"].pop(0) + ip_info["speed"] = sum(ip_info["history"]) / len(ip_info["history"]) + # self.logger.debug("update speed %s %d", ip_str, ip_info["speed"]) + + if time.time() - self.speed_info_last_save_time > self.config.ip_speed_save_interval: + self.save_speed_info() + self.speed_info_last_save_time = time.time() + + def get_speed(self, ip_str): + ip_str = utils.to_str(ip_str) + return self.speed_info.get(ip_str, {}).get("speed", self.config.ip_initial_speed) def load_config(self): pass @@ -62,7 +105,7 @@ def recheck_ip(self, ip_str): # good case is 60ms # bad case is 1300ms and more. -class IpManager(): +class IpManager(IpManagerBase): # Functions: # 1. Scan ip in back ground # 2. sort ip by RTT and fail times @@ -76,9 +119,7 @@ class IpManager(): def __init__(self, logger, config, ip_source, host_manager, check_local_network, check_ip, default_ip_list_fn, ip_list_fn, scan_ip_log=None): - self.logger = logger - self.config = config - self.ip_source = ip_source + super().__init__(config, ip_source, logger) self.host_manager = host_manager self.check_local_network = check_local_network self.check_ip = check_ip diff --git a/code/default/lib/noarch/utils.py b/code/default/lib/noarch/utils.py index efa7ba29b5..e04e91a19f 100644 --- a/code/default/lib/noarch/utils.py +++ b/code/default/lib/noarch/utils.py @@ -96,6 +96,14 @@ def get_ip_port(ip_str, port=443): return ip, int(port) +def get_ip_str(ip, port=443): + ip = to_str(ip) + if ":" in ip: + ip = "[" + ip + "]" + ip_str = ip + ":" + str(port) + return ip_str + + domain_allowed = re.compile("(?!-)[A-Z\d-]{1,63}(? +