Skip to content

Commit

Permalink
[Importer] Fix importer for Parquet, ORC, and Avro formats (#3736)
Browse files Browse the repository at this point in the history
* [Importer] Fixing importer for Parquet, ORC, and Avro formats

* fix lint issues
  • Loading branch information
agl29 authored May 16, 2024
1 parent bb57a00 commit 32d205d
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 53 deletions.
66 changes: 29 additions & 37 deletions desktop/libs/indexer/src/indexer/indexers/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.import logging

from future import standard_library
standard_library.install_aliases()
from builtins import object
import csv
import logging
import sys
import urllib.request, urllib.error
import uuid

import logging
import urllib.error
import urllib.request
from builtins import object
from collections import OrderedDict
from urllib.parse import unquote as urllib_unquote, urlparse

from django.urls import reverse
from django.utils.translation import gettext as _

from azure.abfs.__init__ import abfspath
from desktop.lib import django_mako
from desktop.lib.exceptions_renderable import PopupException
from desktop.settings import BASE_DIR
from hadoop.fs.hadoopfs import Hdfs
from notebook.connectors.base import get_interpreter
from notebook.models import make_notebook
from useradmin.models import User

from desktop.lib import django_mako
from desktop.lib.exceptions_renderable import PopupException
from desktop.settings import BASE_DIR

if sys.version_info[0] > 2:
from urllib.parse import urlparse, unquote as urllib_unquote
from django.utils.translation import gettext as _
else:
from django.utils.translation import ugettext as _
from urllib import unquote as urllib_unquote
from urlparse import urlparse


LOG = logging.getLogger()


Expand All @@ -60,6 +49,7 @@
LOG.warning("Impala app is not enabled")
impala_conf = None


class SQLIndexer(object):

def __init__(self, user, fs):
Expand Down Expand Up @@ -139,16 +129,16 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
"escapeChar" = "\\\\"
''' % source['format']

use_temp_table = table_format in ('parquet', 'orc', 'kudu') or is_transactional or isIceberg
if use_temp_table: # We'll be using a temp table to load data
use_temp_table = table_format in ('parquet', 'orc', 'kudu', 'avro') or is_transactional or isIceberg
if use_temp_table: # We'll be using a temp table to load data
if load_data:
table_name, final_table_name = 'hue__tmp_%s' % table_name, table_name

sql += '\n\nDROP TABLE IF EXISTS `%(database)s`.`%(table_name)s`;\n' % {
'database': database,
'table_name': table_name
}
else: # Manual
else: # Manual
row_format = ''
file_format = table_format
skip_header = False
Expand All @@ -159,8 +149,8 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
collection_delimiter = None
map_delimiter = None

if external or (load_data and table_format in ('parquet', 'orc', 'kudu')): # We'll use location to load data
if not self.fs.isdir(external_path): # File selected
if external or (load_data and table_format in ('parquet', 'orc', 'kudu', 'avro')): # We'll use location to load data
if not self.fs.isdir(external_path): # File selected
external_path, external_file_name = Hdfs.split(external_path)

if len(self.fs.listdir(external_path)) > 1:
Expand All @@ -171,7 +161,7 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
self.fs.copy(source_path, external_path)
else:
self.fs.rename(source_path, external_path)
elif load_data: # We'll use load data command
elif load_data: # We'll use load data command
parent_path = self.fs.parent_path(source_path)
stats = self.fs.stats(parent_path)
split = urlparse(source_path)
Expand All @@ -180,17 +170,16 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
# check if the csv file is in encryption zone (encBit), then the scratch dir will be
# in the same directory
base_dir = parent_path if stats.encBit else self.fs.get_home_dir()
user_scratch_dir = base_dir + '/.scratchdir/%s' % str(uuid.uuid4()) # Make sure it's unique.
user_scratch_dir = base_dir + '/.scratchdir/%s' % str(uuid.uuid4()) # Make sure it's unique.
self.fs.do_as_user(self.user, self.fs.mkdir, user_scratch_dir, 0o0777)
self.fs.do_as_user(self.user, self.fs.rename, source['path'], user_scratch_dir)
if editor_type == 'impala' and impala_conf and impala_conf.USER_SCRATCH_DIR_PERMISSION.get():
self.fs.do_as_user(self.user, self.fs.chmod, user_scratch_dir, 0o0777, True)
source_path = user_scratch_dir + '/' + source['path'].split('/')[-1]

if external_path.lower().startswith("abfs"): #this is to check if its using an ABFS path
if external_path.lower().startswith("abfs"): # this is to check if its using an ABFS path
external_path = abfspath(external_path)


tbl_properties = OrderedDict()
if skip_header:
tbl_properties['skip.header.line.count'] = '1'
Expand All @@ -209,7 +198,7 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
'serde_name': serde_name,
'serde_properties': serde_properties,
'file_format': file_format,
'external': external or load_data and table_format in ('parquet', 'orc', 'kudu'),
'external': external or load_data and table_format in ('parquet', 'orc', 'kudu', 'avro'),
'path': external_path,
'primary_keys': primary_keys if table_format == 'kudu' and not load_data else [],
'tbl_properties': tbl_properties
Expand Down Expand Up @@ -269,7 +258,7 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
extra_create_properties += "\nTBLPROPERTIES('transactional'='true', 'transactional_properties'='%s')" % \
default_transactional_type

sql += '''\n\nCREATE TABLE `%(database)s`.`%(final_table_name)s`%(comment)s
sql += '''\n\nCREATE %(table_type)sTABLE `%(database)s`.`%(final_table_name)s`%(comment)s
%(extra_create_properties)s
AS SELECT %(columns_list)s
FROM `%(database)s`.`%(table_name)s`;''' % {
Expand All @@ -278,7 +267,8 @@ def create_table_from_a_file(self, source, destination, start_time=-1, file_enco
'table_name': table_name,
'extra_create_properties': extra_create_properties,
'columns_list': ', '.join(columns_list),
'comment': ' COMMENT "%s"' % comment if comment else ''
'comment': ' COMMENT "%s"' % comment if comment else '',
'table_type': 'EXTERNAL ' if external and not is_transactional else ''
}
sql += '\n\nDROP TABLE IF EXISTS `%(database)s`.`%(table_name)s`;\n' % {
'database': database,
Expand Down Expand Up @@ -377,17 +367,17 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
row = self.nomalize_booleans(row, columns)
_csv_rows.append(tuple(row))

if _csv_rows: #sql for data insertion
if _csv_rows: # sql for data insertion
csv_rows = str(_csv_rows)[1:-1]

if dialect in ('hive', 'mysql'):
sql += '''\nINSERT INTO %(database)s.%(table_name)s VALUES %(csv_rows)s;\n'''% {
sql += '''\nINSERT INTO %(database)s.%(table_name)s VALUES %(csv_rows)s;\n''' % {
'database': database,
'table_name': table_name,
'csv_rows': csv_rows
}
elif dialect == 'impala':
sql += '''\nINSERT INTO %(database)s.%(table_name)s_tmp VALUES %(csv_rows)s;\n'''% {
sql += '''\nINSERT INTO %(database)s.%(table_name)s_tmp VALUES %(csv_rows)s;\n''' % {
'database': database,
'table_name': table_name,
'csv_rows': csv_rows,
Expand All @@ -396,12 +386,12 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
if dialect == 'impala':
# casting from string to boolean is not allowed in impala so string -> int -> bool
sql_ = ',\n'.join([
' CAST ( `%(name)s` AS %(type)s ) `%(name)s`' % col if col['type'] != 'boolean' \
' CAST ( `%(name)s` AS %(type)s ) `%(name)s`' % col if col['type'] != 'boolean'
else ' CAST ( CAST ( `%(name)s` AS TINYINT ) AS boolean ) `%(name)s`' % col for col in columns
])

sql += '''\nCREATE TABLE IF NOT EXISTS %(database)s.%(table_name)s
AS SELECT\n%(sql_)s\nFROM %(database)s.%(table_name)s_tmp;\n\nDROP TABLE IF EXISTS %(database)s.%(table_name)s_tmp;'''% {
AS SELECT\n%(sql_)s\nFROM %(database)s.%(table_name)s_tmp;\n\nDROP TABLE IF EXISTS %(database)s.%(table_name)s_tmp;''' % {
'database': database,
'table_name': table_name,
'sql_': sql_
Expand All @@ -421,6 +411,7 @@ def create_table_from_local_file(self, source, destination, start_time=-1):
is_task=True
)


def _create_database(request, source, destination, start_time):
database = destination['name']
comment = destination['description']
Expand Down Expand Up @@ -465,6 +456,7 @@ def _create_table(request, source, destination, start_time=-1, file_encoding=Non
else:
return notebook.execute(request, batch=False)


def _create_table_from_local(request, source, destination, start_time=-1):
notebook = SQLIndexer(user=request.user, fs=request.fs).create_table_from_local_file(source, destination, start_time)

Expand Down
Loading

0 comments on commit 32d205d

Please sign in to comment.