Skip to content

Commit

Permalink
[impala] improved query timeout error logging
Browse files Browse the repository at this point in the history
we were not clearly catching the timeout error from impala query
submission. the error response was getting passed to the upper layers
that errored out when trying to get json out of it. It lead to a cryptic
error on the frontend as well. Now we are generating a clearer log
message with the timeout values and config name. The frontend also
shows a timeout error message.

Change-Id: I19fbf49fd0fccc21f8f1f2258695902f73406856
  • Loading branch information
amitsrivastava committed Jun 11, 2024
1 parent 7ee5a33 commit 342357d
Showing 1 changed file with 55 additions and 35 deletions.
90 changes: 55 additions & 35 deletions desktop/core/src/desktop/lib/thrift_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,38 @@
# limitations under the License.
#
from __future__ import division
from future import standard_library
standard_library.install_aliases()
from builtins import map
from builtins import range
from past.builtins import basestring
from builtins import object
import base64
import queue
import logging

import re
import sys
import math
import socket
import threading
import time
import re
import queue
import base64
import socket
import struct
import sys
import logging
import threading
from builtins import map, object, range

from thrift.Thrift import TType, TApplicationException
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TFramedTransport, TMemoryBuffer, TTransportException
from django.conf import settings
from past.builtins import basestring
from thrift.protocol.TBinaryProtocol import TBinaryProtocol
from thrift.protocol.TMultiplexedProtocol import TMultiplexedProtocol
from thrift.Thrift import TApplicationException, TType
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TFramedTransport, TMemoryBuffer, TTransportException

from django.conf import settings
from desktop.conf import SASL_MAX_BUFFER, CHERRYPY_SERVER_THREADS, ENABLE_SMART_THRIFT_POOL, USE_THRIFT_HTTP_JWT, ENABLE_ORGANIZATIONS

from desktop.lib.apputil import WARN_LEVEL_CALL_DURATION_MS, INFO_LEVEL_CALL_DURATION_MS
from desktop.conf import CHERRYPY_SERVER_THREADS, ENABLE_ORGANIZATIONS, ENABLE_SMART_THRIFT_POOL, SASL_MAX_BUFFER, USE_THRIFT_HTTP_JWT
from desktop.lib.apputil import INFO_LEVEL_CALL_DURATION_MS, WARN_LEVEL_CALL_DURATION_MS
from desktop.lib.exceptions import StructuredException, StructuredThriftTransportException
from desktop.lib.python_util import create_synchronous_io_multiplexer
from desktop.lib.thrift_.http_client import THttpClient
from desktop.lib.thrift_.TSSLSocketWithWildcardSAN import TSSLSocketWithWildcardSAN
from desktop.lib.thrift_sasl import TSaslClientTransport
from desktop.lib.exceptions import StructuredException, StructuredThriftTransportException

if sys.version_info[0] > 2:
from past.builtins import long
from django.utils.translation import gettext as _
from past.builtins import long
else:
from django.utils.translation import ugettext as _

Expand Down Expand Up @@ -170,6 +166,7 @@ def update_coordinator_host(self, coordinator_host):
def get_coordinator_host(self):
return self.coordinator_host


class ConnectionPooler(object):
"""
Thread-safe connection pooling for thrift. (With about 3 changes,
Expand Down Expand Up @@ -289,13 +286,15 @@ def return_client(self, conf, client):

self.pooldict[_get_pool_key(conf)].put(client)


def _get_pool_key(conf):
"""
Given a ConnectionConfig, return the tuple used as the key in the dictionary
of connections by the ConnectionPooler class.
"""
return (conf.klass, conf.host, conf.port, conf.get_coordinator_host())


def construct_superclient(conf):
"""
Constructs a thrift client, lazily.
Expand Down Expand Up @@ -340,7 +339,7 @@ def connect_to_thrift(conf):
mode.set_kerberos_auth(service=conf.kerberos_principal)

elif USE_THRIFT_HTTP_JWT.get():
from desktop.auth.backend import find_user, rewrite_user # Cyclic dependency
from desktop.auth.backend import find_user, rewrite_user # Cyclic dependency
user = rewrite_user(find_user(conf.username))

if user is None:
Expand All @@ -364,7 +363,7 @@ def sasl_factory():
saslc.setAttr("service", str(conf.kerberos_principal))
if conf.mechanism == 'PLAIN':
saslc.setAttr("username", str(conf.username))
saslc.setAttr("password", str(conf.password)) # Defaults to 'hue' for a non-empty string unless using LDAP
saslc.setAttr("password", str(conf.password)) # Defaults to 'hue' for a non-empty string unless using LDAP
else:
saslc.setAttr("maxbufsize", SASL_MAX_BUFFER.get())
saslc.init()
Expand All @@ -389,6 +388,7 @@ def get_client(klass, host, port, service_name, **kwargs):
conf = ConnectionConfig(klass, host, port, service_name, **kwargs)
return PooledClient(conf)


def _grab_transport_from_wrapper(outer_transport):
if isinstance(outer_transport, TBufferedTransport):
return outer_transport._TBufferedTransport__trans
Expand Down Expand Up @@ -470,7 +470,7 @@ def wrapper(*args, **kwargs):
raise
finally:
self._return_client(superclient)
wrapper.attr = attr_name # Save the name of the attribute as it is replaced by 'wrapper'
wrapper.attr = attr_name # Save the name of the attribute as it is replaced by 'wrapper'

return wrapper

Expand Down Expand Up @@ -522,7 +522,7 @@ def wrapper(*args, **kwargs):
ret = res(*args, **kwargs)

if ENABLE_SMART_THRIFT_POOL.get() and 'OpenSession' == attr and 'http_addr' in repr(ret):
coordinator_host = re.search('http_addr\':\ \'(.*:[0-9]{2,})\', \'', repr(ret))
coordinator_host = re.search('http_addr\':\\ \'(.*:[0-9]{2,})\', \'', repr(ret))
self.coordinator_host = coordinator_host.group(1)

log_msg = _unpack_guid_secret_in_handle(repr(ret))
Expand All @@ -544,7 +544,7 @@ def wrapper(*args, **kwargs):
except (socket.error, socket.timeout, TTransportException) as e:
self.transport.close()

if isinstance(e, socket.timeout) or 'read operation timed out' in str(e): # Can come from ssl.SSLError
if isinstance(e, socket.timeout) or 'read operation timed out' in str(e): # Can come from ssl.SSLError
logging.warn("Not retrying thrift call %s due to socket timeout" % attr)
raise
else:
Expand All @@ -557,6 +557,14 @@ def wrapper(*args, **kwargs):
raise
except Exception as e:
logging.exception("Thrift saw exception (this may be expected).")
if "Read timed out." in str(e):
raise StructuredException(
'TIMEOUT',
'Current timeout of %s was hit. Please consider increasing the `server_conn_timeout` config. Request failed with "%s"' %
(self.timeout_seconds, str(e)),
data=None,
error_code=502
)
if "'client_protocol' is unset" in str(e):
raise StructuredException(
'OPEN_SESSION',
Expand All @@ -580,18 +588,19 @@ def set_timeout(self, timeout_seconds):
else:
_grab_transport_from_wrapper(self.transport).setTimeout(None)


def _unpack_guid_secret_in_handle(str_args):
if 'operationHandle' in str_args or 'sessionHandle' in str_args:
if sys.version_info[0] > 2:
guid = re.search('guid=(b".*"), secret', str_args) or re.search('guid=(b\'.*\'), secret', str_args)
secret = re.search('secret=(b".+?")\)', str_args) or re.search('secret=(b\'.+?\')\)', str_args)
secret = re.search(r'secret=(b".+?")\)', str_args) or re.search('secret=(b\'.+?\')\\)', str_args)
else:
secret = re.search('secret=(".*"), guid', str_args) or re.search('secret=(\'.*\'), guid', str_args)
guid = re.search('guid=(".*")\)\)', str_args) or re.search('guid=(\'.*\')\)\)', str_args)
guid = re.search(r'guid=(".*")\)\)', str_args) or re.search('guid=(\'.*\')\\)\\)', str_args)

if secret and guid:
try:
encoded_secret = eval(secret.group(1)) # Does not take null bytes, but don't know how to fix.
encoded_secret = eval(secret.group(1)) # Does not take null bytes, but don't know how to fix.
encoded_guid = eval(guid.group(1))

str_args = str_args.replace(secret.group(1), unpack_guid(encoded_secret))
Expand All @@ -601,13 +610,16 @@ def _unpack_guid_secret_in_handle(str_args):

return str_args


def unpack_guid(guid):
return "%016x:%016x" % struct.unpack(b"QQ", guid)


def unpack_guid_base64(guid):
decoded_guid = base64.b64decode(guid) if sys.version_info[0] > 2 else base64.decodestring(guid)
return "%016x:%016x" % struct.unpack(b"QQ", decoded_guid)


def simpler_string(thrift_obj):
"""
Strips out nulls and empty arrays from the string representation.
Expand All @@ -629,6 +641,7 @@ def simpler_string(thrift_obj):

return '%s(%s)' % (thrift_obj.__class__.__name__, ', '.join(L))


def from_bytes(klass, data):
"""Returns thrift object from a string, using standard binary representation."""
obj = klass()
Expand All @@ -637,13 +650,15 @@ def from_bytes(klass, data):
obj.read(p)
return obj


def to_bytes(obj):
"""Creates the standard binary representation of a thrift object."""
b = TMemoryBuffer()
p = TBinaryProtocol(b)
obj.write(p)
return b.getvalue()


def thrift2json(tft):
"""
Convert a thrift structure to a JSON compatible dictionary
Expand Down Expand Up @@ -689,6 +704,7 @@ def thrift2json(tft):
json[k] = thrift2json(v)
return json


def _jsonable2thrift_helper(jsonable, type_enum, spec_args, default, recursion_depth=0):
"""
Recursive implementation method of jsonable2thrift.
Expand Down Expand Up @@ -722,8 +738,8 @@ def check_bits(jsonable, bits):
"""
check_type(jsonable, (int, long))
# For example, for 8 bits, this yields the range -128 to 127 (inclusive).
min_val = -1 << (bits-1)
max_val = (1 << (bits-1)) - 1
min_val = -1 << (bits - 1)
max_val = (1 << (bits - 1)) - 1
assert min_val <= jsonable, "Value %d <= %d minimum value" % (jsonable, min_val)
assert max_val >= jsonable, "Value %d >= %d maxium value" % (jsonable, max_val)

Expand Down Expand Up @@ -804,6 +820,7 @@ def check_type(jsonable, expected):
else:
raise Exception("Unrecognized type: %s. Value was %s." % (repr(type_enum), repr(jsonable)))


def jsonable2thrift(jsonable, thrift_class):
"""
Converts a JSON-able x that represents a thrift struct
Expand All @@ -821,6 +838,7 @@ def jsonable2thrift(jsonable, thrift_class):
recursion_depth=0
)


def enum_as_sequence(enum):
"""
Returns an array whose entries are the names of the
Expand All @@ -836,6 +854,7 @@ def enum_as_sequence(enum):
"""
return [x for x in dir(enum) if not x.startswith("__") and x not in ["_VALUES_TO_NAMES", "_NAMES_TO_VALUES", "next"]]


def fixup_enums(obj, name_class_map, suffix="AsString"):
"""
Relying on Todd's THRIFT-546 patch, this function adds a string
Expand All @@ -851,6 +870,7 @@ def fixup_enums(obj, name_class_map, suffix="AsString"):
setattr(obj, n + suffix, c._VALUES_TO_NAMES[getattr(obj, n)])
return obj


def is_thrift_struct(o):
return hasattr(o.__class__, "thrift_spec")

Expand All @@ -862,7 +882,7 @@ def log_if_slow_call(duration, message):
elif duration >= math.floor(INFO_LEVEL_CALL_DURATION_MS / 1000):
LOG.info('SLOW: %.2f - %s' % (duration, message))
else:
#Leave this as logging.debug and not logger.
#Otherwise we never get these logging messages even with debug enabled.
#Review this in the future to find out why.
# Leave this as logging.debug and not logger.
# Otherwise we never get these logging messages even with debug enabled.
# Review this in the future to find out why.
logging.debug(message)

0 comments on commit 342357d

Please sign in to comment.