Skip to content

Commit

Permalink
5.4.0 refactor codes to improve performance and fix bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael-X-Net committed Jul 13, 2023
1 parent 5d3a993 commit 5b46995
Show file tree
Hide file tree
Showing 14 changed files with 198 additions and 117 deletions.
1 change: 1 addition & 0 deletions code/default/lib/noarch/front_base/boringssl_wrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, context, sock, ip_str=None, sni=None, on_close=None):

def wrap(self):
ip, port = utils.get_ip_port(self.ip_str)
self.ip = ip
if isinstance(ip, str):
ip = utils.to_bytes(ip)

Expand Down
5 changes: 3 additions & 2 deletions code/default/lib/noarch/front_base/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def set_default(self):
self.set_var("dispather_score_factor", 1)
self.set_var("dispather_max_idle_workers", 30)
self.set_var("dispather_worker_max_continue_fail", 8)
self.set_var("dispather_connect_all_workers_on_startup", 0)

self.set_var("max_task_num", 100)

Expand All @@ -32,7 +33,7 @@ def set_default(self):

# http 2 worker
self.set_var("http2_max_concurrent", 60)
self.set_var("http2_target_concurrent", 60)
self.set_var("http2_target_concurrent", 6)
self.set_var("http2_max_timeout_tasks", 5)
self.set_var("http2_max_process_tasks", 900) # Nginx will GoAway after 1000 tasks.
self.set_var("http2_timeout_active", 15)
Expand All @@ -47,7 +48,7 @@ def set_default(self):
# connect manager
self.set_var("https_max_connect_thread", 1)
self.set_var("max_connect_thread", 1)
self.set_var("connect_create_interval", 1)
self.set_var("connect_create_interval", 0.1)
self.set_var("ssl_first_use_timeout", 10)
self.set_var("connection_pool_min", 1)
self.set_var("https_keep_alive", 15) # time to pass created link to worker
Expand Down
69 changes: 31 additions & 38 deletions code/default/lib/noarch/front_base/connect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,11 @@ def __init__(self, logger, config, connect_creator, ip_manager, check_local_netw

self.thread_num_lock = threading.Lock()
self.timeout = 4
self.max_timeout = 60
self.thread_num = 0
self.running = True
self.connect_counter = 0

self.get_num_lock = threading.Lock()
self.https_get_num = 0
self._waiting_num_lock = threading.Lock()
self._connection_waiting_num = 0
self.no_ip_lock = threading.Lock()

# after new created ssl_sock timeout(50 seconds)
Expand All @@ -185,8 +183,6 @@ def __init__(self, logger, config, connect_creator, ip_manager, check_local_netw
else:
self.keep_conn_th = None

self.create_more_connection()

def stop(self):
self.running = False

Expand Down Expand Up @@ -219,71 +215,68 @@ def keep_connection_daemon(self):
time.sleep(5)
continue

self.connect_process()
self._connect_process()

def _need_more_ip(self):
if self.https_get_num:
if self._connection_waiting_num:
return True
else:
return False

def create_more_connection(self):
def _create_more_connection(self):
if not self.connecting_more_thread:
with self.thread_num_lock:
self.connecting_more_thread = threading.Thread(target=self.create_more_connection_worker)
self.connecting_more_thread = threading.Thread(target=self._create_more_connection_worker)
self.connecting_more_thread.start()

def create_more_connection_worker(self):
while self.thread_num < self.config.https_max_connect_thread and \
self._need_more_ip():
def _create_more_connection_worker(self):
while self.thread_num < self.config.https_max_connect_thread and self._need_more_ip():

self.thread_num_lock.acquire()
self.thread_num += 1
self.thread_num_lock.release()
p = threading.Thread(target=self.connect_thread)
p = threading.Thread(target=self._connect_thread)
p.start()
time.sleep(0.5)
time.sleep(self.config.connect_create_interval)

with self.thread_num_lock:
self.connecting_more_thread = None

def connect_thread(self, sleep_time=0):
def _connect_thread(self, sleep_time=0):
time.sleep(sleep_time)
try:
while self.running and self._need_more_ip():
if self.new_conn_pool.qsize() > self.config.https_connection_pool_max:
break

self.connect_process()
self._connect_process()
finally:
self.thread_num_lock.acquire()
self.thread_num -= 1
self.thread_num_lock.release()

def connect_process(self):
def _connect_process(self):
try:
ip_str, sni, host = self.ip_manager.get_ip_sni_host()
if not ip_str:
with self.no_ip_lock:
# self.logger.warning("not enough ip")
time.sleep(10)
return
return None

# self.logger.debug("create ssl conn %s", ip_str)
ssl_sock = self._create_ssl_connection(ip_str, sni, host)
if not ssl_sock:
time.sleep(1)
return
return None

self.new_conn_pool.put((ssl_sock.handshake_time, ssl_sock))
self.connect_counter += 1

if self.config.connect_create_interval > 0:
if self.connect_counter >= 2:
sleep = random.randint(self.config.connect_create_interval, self.config.connect_create_interval*2)
time.sleep(sleep)
else:
time.sleep(1)
sleep = random.uniform(self.config.connect_create_interval, self.config.connect_create_interval*2)
time.sleep(sleep)

return ssl_sock
except Exception as e:
self.logger.exception("connect_process except:%r", e)

Expand All @@ -303,8 +296,8 @@ def _create_ssl_connection(self, ip_str, sni, host):
self.logger.debug("connect %s network fail, %r", ip_str, e)
time.sleep(1)
else:
self.logger.debug("connect %s network fail:%r", ip_str, e)
self.ip_manager.report_connect_fail(ip_str, sni, str(e))
self.logger.debug("connect %s fail:%r", ip_str, e)
self.ip_manager.report_connect_fail(ip_str, sni, str(e))
except NoRescourceException as e:
self.logger.warning("create ssl for %s except:%r", ip_str, e)
self.ip_manager.report_connect_fail(ip_str, sni, str(e))
Expand All @@ -318,12 +311,11 @@ def _create_ssl_connection(self, ip_str, sni, host):
self.logger.exception("connect %s fail:%r", ip_str, e)
time.sleep(1)

def get_ssl_connection(self):
with self.get_num_lock:
self.https_get_num += 1
def get_ssl_connection(self, timeout=30):
with self._waiting_num_lock:
self._connection_waiting_num += 1

start_time = time.time()
self.create_more_connection()
end_time = time.time() + timeout
try:
while self.running:
ret = self.new_conn_pool.get(block=True, timeout=1)
Expand All @@ -336,11 +328,12 @@ def get_ssl_connection(self):
# self.logger.debug("new_conn_pool.get:%s handshake:%d timeout.", ssl_sock.ip, handshake_time)
self.ip_manager.report_connect_closed(ssl_sock.ip_str, ssl_sock.sni, "get_timeout")
ssl_sock.close()
continue
else:
if time.time() - start_time > self.max_timeout:
self.logger.debug("create ssl timeout fail.")
if time.time() > end_time:
self.logger.debug("get_ssl_connection timeout")
return None

self._create_more_connection()
finally:
with self.get_num_lock:
self.https_get_num -= 1
with self._waiting_num_lock:
self._connection_waiting_num -= 1
2 changes: 0 additions & 2 deletions code/default/lib/noarch/front_base/http1.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ def close(self, reason=""):
# Notify loop to exit
# This function may be call by out side http2
# When gae_proxy found the appid or ip is wrong
self.accept_task = False
self.keep_running = False
self.task_queue.put(None)

if self.task is not None:
Expand Down
10 changes: 6 additions & 4 deletions code/default/lib/noarch/front_base/http2_connection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

from six.moves import queue
import threading
import socket
Expand Down Expand Up @@ -220,8 +222,6 @@ def get_rtt_rate(self):
return self.rtt + len(self.streams) * 3000

def close(self, reason="conn close"):
self.keep_running = False
self.accept_task = False
# Notify loop to exit
# This function may be call by out side http2
# When gae_proxy found the appid or ip is wrong
Expand Down Expand Up @@ -286,9 +286,9 @@ def increase_remote_window_size(self, inc_size):
self.idle_cb()

def _send_cb(self, frame):
# can called by stream
# can be called by stream
# put to send_blocked if connection window not allow,
if frame.type == DataFrame.type:
if frame.type in [HeadersFrame.type, DataFrame.type]:
if len(frame.data) > self.remote_window_size:
self.blocked_send_frames.append(frame)
self.accept_task = False
Expand Down Expand Up @@ -512,9 +512,11 @@ def _update_settings(self, frame):
stream.max_frame_size += new_size

def get_trace(self):
now = time.time()
out_list = []
out_list.append(" continue_timeout:%d" % self.continue_timeout)
out_list.append(" processed:%d" % self.processed_tasks)
out_list.append(" inactive:%d, %d" % (now - self.last_send_time, now - self.last_recv_time))
out_list.append(" h2.stream_num:%d" % len(self.streams))
out_list.append(" sni:%s, host:%s" % (self.ssl_sock.sni, self.ssl_sock.host))
return ",".join(out_list)
Expand Down
6 changes: 6 additions & 0 deletions code/default/lib/noarch/front_base/http2_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,18 @@ def send_response(self):

def close(self, reason="close"):
if not self.task.responsed:
# self.task.set_state("stream close: %s, call retry" % reason)
self.connection.retry_task_cb(self.task, reason)
else:
# self.task.set_state("stream close: %s, finished" % reason)
self.task.finish()
# empty block means fail or closed.

self._close_remote()
# self.task.set_state("stream close: %s, closed remote" % reason)

self._close_cb(self.stream_id, reason)
# self.task.set_state("stream close: %s, called close_cb" % reason)

@property
def _local_closed(self):
Expand Down
25 changes: 17 additions & 8 deletions code/default/lib/noarch/front_base/http_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,7 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb
self.ip_manager = ip_manager
self.config = config
self.ssl_sock = ssl_sock
self.init_rtt = ssl_sock.handshake_time / 3
self.rtt = self.init_rtt
self.rtt = ssl_sock.handshake_time
self.speed = 200
self.ip_str = ssl_sock.ip_str
self.close_cb = close_cb
Expand Down Expand Up @@ -218,14 +217,19 @@ def __str__(self):

def update_debug_data(self, rtt, sent, received, speed):
self.rtt = rtt
self.speed_history.append(speed)
if len(self.speed_history) > 10:
self.speed_history.pop(0)
self.speed = sum(self.speed_history) / len(self.speed_history)
if sent + received > 10000:
self.speed_history.append(speed)
if len(self.speed_history) > 10:
self.speed_history.pop(0)
self.speed = sum(self.speed_history) / len(self.speed_history)

self.log_debug_data(rtt, sent, received)

def close(self, reason):
if not self.keep_running:
self.logger.warn("worker already closed %s", self.ip_str)
return

self.accept_task = False
self.keep_running = False
self.ssl_sock.close()
Expand All @@ -238,9 +242,14 @@ def close(self, reason):
self.close_cb(self)

def get_score(self):
score = (50 - (self.speed/6.0)) + (self.rtt/30.0)
# The smaller, the better
score = (50 - (self.speed/6.0)) + (self.rtt/20.0)
if self.version != "1.1":
score += len(self.streams) * 5
score += len(self.streams) * 3

if self.config.show_state_debug:
self.logger.debug("get_score %s, speed:%d rtt:%d stream_num:%d score:%d", self.ip_str,
self.speed, self.rtt, len(self.streams), score)

return score

Expand Down
Loading

0 comments on commit 5b46995

Please sign in to comment.