diff --git a/desktop/core/src/desktop/lib/thrift_util.py b/desktop/core/src/desktop/lib/thrift_util.py index 90d45361da..54708e90bc 100644 --- a/desktop/core/src/desktop/lib/thrift_util.py +++ b/desktop/core/src/desktop/lib/thrift_util.py @@ -16,42 +16,38 @@ # limitations under the License. # from __future__ import division -from future import standard_library -standard_library.install_aliases() -from builtins import map -from builtins import range -from past.builtins import basestring -from builtins import object -import base64 -import queue -import logging + +import re +import sys import math -import socket -import threading import time -import re +import queue +import base64 +import socket import struct -import sys +import logging +import threading +from builtins import map, object, range -from thrift.Thrift import TType, TApplicationException -from thrift.transport.TSocket import TSocket -from thrift.transport.TTransport import TBufferedTransport, TFramedTransport, TMemoryBuffer, TTransportException +from django.conf import settings +from past.builtins import basestring from thrift.protocol.TBinaryProtocol import TBinaryProtocol from thrift.protocol.TMultiplexedProtocol import TMultiplexedProtocol +from thrift.Thrift import TApplicationException, TType +from thrift.transport.TSocket import TSocket +from thrift.transport.TTransport import TBufferedTransport, TFramedTransport, TMemoryBuffer, TTransportException -from django.conf import settings -from desktop.conf import SASL_MAX_BUFFER, CHERRYPY_SERVER_THREADS, ENABLE_SMART_THRIFT_POOL, USE_THRIFT_HTTP_JWT, ENABLE_ORGANIZATIONS - -from desktop.lib.apputil import WARN_LEVEL_CALL_DURATION_MS, INFO_LEVEL_CALL_DURATION_MS +from desktop.conf import CHERRYPY_SERVER_THREADS, ENABLE_ORGANIZATIONS, ENABLE_SMART_THRIFT_POOL, SASL_MAX_BUFFER, USE_THRIFT_HTTP_JWT +from desktop.lib.apputil import INFO_LEVEL_CALL_DURATION_MS, WARN_LEVEL_CALL_DURATION_MS +from desktop.lib.exceptions import StructuredException, StructuredThriftTransportException from desktop.lib.python_util import create_synchronous_io_multiplexer from desktop.lib.thrift_.http_client import THttpClient from desktop.lib.thrift_.TSSLSocketWithWildcardSAN import TSSLSocketWithWildcardSAN from desktop.lib.thrift_sasl import TSaslClientTransport -from desktop.lib.exceptions import StructuredException, StructuredThriftTransportException if sys.version_info[0] > 2: - from past.builtins import long from django.utils.translation import gettext as _ + from past.builtins import long else: from django.utils.translation import ugettext as _ @@ -170,6 +166,7 @@ def update_coordinator_host(self, coordinator_host): def get_coordinator_host(self): return self.coordinator_host + class ConnectionPooler(object): """ Thread-safe connection pooling for thrift. (With about 3 changes, @@ -289,6 +286,7 @@ def return_client(self, conf, client): self.pooldict[_get_pool_key(conf)].put(client) + def _get_pool_key(conf): """ Given a ConnectionConfig, return the tuple used as the key in the dictionary @@ -296,6 +294,7 @@ def _get_pool_key(conf): """ return (conf.klass, conf.host, conf.port, conf.get_coordinator_host()) + def construct_superclient(conf): """ Constructs a thrift client, lazily. @@ -340,7 +339,7 @@ def connect_to_thrift(conf): mode.set_kerberos_auth(service=conf.kerberos_principal) elif USE_THRIFT_HTTP_JWT.get(): - from desktop.auth.backend import find_user, rewrite_user # Cyclic dependency + from desktop.auth.backend import find_user, rewrite_user # Cyclic dependency user = rewrite_user(find_user(conf.username)) if user is None: @@ -364,7 +363,7 @@ def sasl_factory(): saslc.setAttr("service", str(conf.kerberos_principal)) if conf.mechanism == 'PLAIN': saslc.setAttr("username", str(conf.username)) - saslc.setAttr("password", str(conf.password)) # Defaults to 'hue' for a non-empty string unless using LDAP + saslc.setAttr("password", str(conf.password)) # Defaults to 'hue' for a non-empty string unless using LDAP else: saslc.setAttr("maxbufsize", SASL_MAX_BUFFER.get()) saslc.init() @@ -389,6 +388,7 @@ def get_client(klass, host, port, service_name, **kwargs): conf = ConnectionConfig(klass, host, port, service_name, **kwargs) return PooledClient(conf) + def _grab_transport_from_wrapper(outer_transport): if isinstance(outer_transport, TBufferedTransport): return outer_transport._TBufferedTransport__trans @@ -470,7 +470,7 @@ def wrapper(*args, **kwargs): raise finally: self._return_client(superclient) - wrapper.attr = attr_name # Save the name of the attribute as it is replaced by 'wrapper' + wrapper.attr = attr_name # Save the name of the attribute as it is replaced by 'wrapper' return wrapper @@ -522,7 +522,7 @@ def wrapper(*args, **kwargs): ret = res(*args, **kwargs) if ENABLE_SMART_THRIFT_POOL.get() and 'OpenSession' == attr and 'http_addr' in repr(ret): - coordinator_host = re.search('http_addr\':\ \'(.*:[0-9]{2,})\', \'', repr(ret)) + coordinator_host = re.search('http_addr\':\\ \'(.*:[0-9]{2,})\', \'', repr(ret)) self.coordinator_host = coordinator_host.group(1) log_msg = _unpack_guid_secret_in_handle(repr(ret)) @@ -544,7 +544,7 @@ def wrapper(*args, **kwargs): except (socket.error, socket.timeout, TTransportException) as e: self.transport.close() - if isinstance(e, socket.timeout) or 'read operation timed out' in str(e): # Can come from ssl.SSLError + if isinstance(e, socket.timeout) or 'read operation timed out' in str(e): # Can come from ssl.SSLError logging.warn("Not retrying thrift call %s due to socket timeout" % attr) raise else: @@ -557,6 +557,14 @@ def wrapper(*args, **kwargs): raise except Exception as e: logging.exception("Thrift saw exception (this may be expected).") + if "Read timed out." in str(e): + raise StructuredException( + 'TIMEOUT', + 'Current timeout of %s was hit. Please consider increasing the `server_conn_timeout` config. Request failed with "%s"' % + (self.timeout_seconds, str(e)), + data=None, + error_code=502 + ) if "'client_protocol' is unset" in str(e): raise StructuredException( 'OPEN_SESSION', @@ -580,18 +588,19 @@ def set_timeout(self, timeout_seconds): else: _grab_transport_from_wrapper(self.transport).setTimeout(None) + def _unpack_guid_secret_in_handle(str_args): if 'operationHandle' in str_args or 'sessionHandle' in str_args: if sys.version_info[0] > 2: guid = re.search('guid=(b".*"), secret', str_args) or re.search('guid=(b\'.*\'), secret', str_args) - secret = re.search('secret=(b".+?")\)', str_args) or re.search('secret=(b\'.+?\')\)', str_args) + secret = re.search(r'secret=(b".+?")\)', str_args) or re.search('secret=(b\'.+?\')\\)', str_args) else: secret = re.search('secret=(".*"), guid', str_args) or re.search('secret=(\'.*\'), guid', str_args) - guid = re.search('guid=(".*")\)\)', str_args) or re.search('guid=(\'.*\')\)\)', str_args) + guid = re.search(r'guid=(".*")\)\)', str_args) or re.search('guid=(\'.*\')\\)\\)', str_args) if secret and guid: try: - encoded_secret = eval(secret.group(1)) # Does not take null bytes, but don't know how to fix. + encoded_secret = eval(secret.group(1)) # Does not take null bytes, but don't know how to fix. encoded_guid = eval(guid.group(1)) str_args = str_args.replace(secret.group(1), unpack_guid(encoded_secret)) @@ -601,13 +610,16 @@ def _unpack_guid_secret_in_handle(str_args): return str_args + def unpack_guid(guid): return "%016x:%016x" % struct.unpack(b"QQ", guid) + def unpack_guid_base64(guid): decoded_guid = base64.b64decode(guid) if sys.version_info[0] > 2 else base64.decodestring(guid) return "%016x:%016x" % struct.unpack(b"QQ", decoded_guid) + def simpler_string(thrift_obj): """ Strips out nulls and empty arrays from the string representation. @@ -629,6 +641,7 @@ def simpler_string(thrift_obj): return '%s(%s)' % (thrift_obj.__class__.__name__, ', '.join(L)) + def from_bytes(klass, data): """Returns thrift object from a string, using standard binary representation.""" obj = klass() @@ -637,6 +650,7 @@ def from_bytes(klass, data): obj.read(p) return obj + def to_bytes(obj): """Creates the standard binary representation of a thrift object.""" b = TMemoryBuffer() @@ -644,6 +658,7 @@ def to_bytes(obj): obj.write(p) return b.getvalue() + def thrift2json(tft): """ Convert a thrift structure to a JSON compatible dictionary @@ -689,6 +704,7 @@ def thrift2json(tft): json[k] = thrift2json(v) return json + def _jsonable2thrift_helper(jsonable, type_enum, spec_args, default, recursion_depth=0): """ Recursive implementation method of jsonable2thrift. @@ -722,8 +738,8 @@ def check_bits(jsonable, bits): """ check_type(jsonable, (int, long)) # For example, for 8 bits, this yields the range -128 to 127 (inclusive). - min_val = -1 << (bits-1) - max_val = (1 << (bits-1)) - 1 + min_val = -1 << (bits - 1) + max_val = (1 << (bits - 1)) - 1 assert min_val <= jsonable, "Value %d <= %d minimum value" % (jsonable, min_val) assert max_val >= jsonable, "Value %d >= %d maxium value" % (jsonable, max_val) @@ -804,6 +820,7 @@ def check_type(jsonable, expected): else: raise Exception("Unrecognized type: %s. Value was %s." % (repr(type_enum), repr(jsonable))) + def jsonable2thrift(jsonable, thrift_class): """ Converts a JSON-able x that represents a thrift struct @@ -821,6 +838,7 @@ def jsonable2thrift(jsonable, thrift_class): recursion_depth=0 ) + def enum_as_sequence(enum): """ Returns an array whose entries are the names of the @@ -836,6 +854,7 @@ def enum_as_sequence(enum): """ return [x for x in dir(enum) if not x.startswith("__") and x not in ["_VALUES_TO_NAMES", "_NAMES_TO_VALUES", "next"]] + def fixup_enums(obj, name_class_map, suffix="AsString"): """ Relying on Todd's THRIFT-546 patch, this function adds a string @@ -851,6 +870,7 @@ def fixup_enums(obj, name_class_map, suffix="AsString"): setattr(obj, n + suffix, c._VALUES_TO_NAMES[getattr(obj, n)]) return obj + def is_thrift_struct(o): return hasattr(o.__class__, "thrift_spec") @@ -862,7 +882,7 @@ def log_if_slow_call(duration, message): elif duration >= math.floor(INFO_LEVEL_CALL_DURATION_MS / 1000): LOG.info('SLOW: %.2f - %s' % (duration, message)) else: - #Leave this as logging.debug and not logger. - #Otherwise we never get these logging messages even with debug enabled. - #Review this in the future to find out why. + # Leave this as logging.debug and not logger. + # Otherwise we never get these logging messages even with debug enabled. + # Review this in the future to find out why. logging.debug(message)