Skip to content

Commit

Permalink
dev: fix ResourceWarning during tests, broken test, string formatting
Browse files Browse the repository at this point in the history
- xls2json_backends.py: both XLS and XLSX open and read the respective
  files, do processing, then close the file. But if an error occurs
  during processing then the file is left open. A ResourceWarning
  is raised when python cleans up the file handle. The ResourceWarning
  is visible during tests because test libraries remove the normal
  suppression of warnings during test runs. So normally a user probably
  wouldn't see it. The fix is to make sure the workbook object and file
  are cleaned up and closed with a contextmanager + try-except block.
- test_bugs.py: fixed test that had been broken for a while. It was not
  being executed by nose test runner for some reason. After converting
  tests to unittest, it ran and failed. After doing some git archaeology
  it seems that the 3rd warning that was expected was one about Google
  Sheets not supporting certain file names, but while that warning was
  later removed, the test was not updated. The 2nd is commented on.
- fixed strange string concatentation on the same line e.g. "a" "b".
  • Loading branch information
lindsay-stevens committed Jan 16, 2024
1 parent f491ea8 commit 4c5ee0a
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 69 deletions.
12 changes: 4 additions & 8 deletions pyxform/validators/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def _get_latest(update_info):

@staticmethod
def _get_release_message(json_data):
template = "- Tag name = {tag_name}\n" "- Tag URL = {tag_url}\n\n"
template = "- Tag name = {tag_name}\n- Tag URL = {tag_url}\n\n"
return template.format(
tag_name=json_data["tag_name"], tag_url=json_data["html_url"]
)
Expand Down Expand Up @@ -219,9 +219,7 @@ def _find_download_url(update_info, json_data, file_name):

if len(files) == 0:
raise PyXFormError(
"No files attached to release '{r}'.\n\n{h}" "".format(
r=rel_name, h=update_info.manual_msg
)
f"No files attached to release '{rel_name}'.\n\n{update_info.manual_msg}"
)

file_urls = [x["browser_download_url"] for x in files if x["name"] == file_name]
Expand Down Expand Up @@ -269,7 +267,7 @@ def _get_bin_paths(update_info, file_path):
main_bin = "*validate"
else:
raise PyXFormError(
"Did not find a supported main binary for file: {p}.\n\n{h}" "".format(
"Did not find a supported main binary for file: {p}.\n\n{h}".format(
p=file_path, h=update_info.manual_msg
)
)
Expand Down Expand Up @@ -309,9 +307,7 @@ def _unzip_find_jobs(open_zip_file, bin_paths, out_path):
zip_jobs[file_out_path] = zip_item
if len(bin_paths) != len(zip_jobs.keys()):
raise PyXFormError(
"Expected {e} zip job files, found: {c}" "".format(
e=len(bin_paths), c=len(zip_jobs.keys())
)
f"Expected {len(bin_paths)} zip job files, found: {len(zip_jobs.keys())}"
)
return zip_jobs

Expand Down
4 changes: 2 additions & 2 deletions pyxform/xls2json.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,15 +568,15 @@ def workbook_to_json(
if "name" not in option:
info = "[list_name : " + list_name + "]"
raise PyXFormError(
"On the choices sheet there is " "a option with no name. " + info
"On the choices sheet there is a option with no name. " + info
)
if "label" not in option:
info = "[list_name : " + list_name + "]"
warnings.append(
"On the choices sheet there is a option with no label. " + info
)
# chrislrobert's fix for a cryptic error message:
# see: https://code.google.com/p/opendatakit/issues/detail?id=832&start=200
# see: https://code.google.com/p/opendatakit/issues/detail?id=833&start=200
option_keys = list(option.keys())
for headername in option_keys:
# Using warnings and removing the bad columns
Expand Down
148 changes: 90 additions & 58 deletions pyxform/xls2json_backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@
"""
import csv
import datetime
import os
import re
from collections import OrderedDict
from contextlib import closing
from functools import reduce
from io import StringIO
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union
from zipfile import BadZipFile

import openpyxl
import xlrd
from openpyxl.cell import Cell as pyxlCell
from openpyxl.reader.excel import ExcelReader
from openpyxl.workbook import Workbook as pyxlWorkbook
from openpyxl.worksheet.worksheet import Worksheet as pyxlWorksheet
from xlrd.book import Book as xlrdBook
from xlrd.sheet import Cell as xlrdCell
from xlrd.sheet import Sheet as xlrdSheet
from xlrd.xldate import XLDateAmbiguous

from pyxform import constants
Expand Down Expand Up @@ -120,61 +126,76 @@ def xls_to_dict(path_or_file):
equal to the cell value for that row and column.
All the keys and leaf elements are unicode text.
"""
try:
if isinstance(path_or_file, str):
workbook = xlrd.open_workbook(filename=path_or_file)
else:
workbook = xlrd.open_workbook(file_contents=path_or_file.read())
except xlrd.XLRDError as read_err:
raise PyXFormError("Error reading .xls file: %s" % read_err) from read_err

def xls_clean_cell(cell: xlrdCell, row_n: int, col_key: str) -> Optional[str]:
def xls_clean_cell(
wb: xlrdBook, wb_sheet: xlrdSheet, cell: xlrdCell, row_n: int, col_key: str
) -> Optional[str]:
value = cell.value
if isinstance(value, str):
value = value.strip()
if not is_empty(value):
try:
return xls_value_to_unicode(value, cell.ctype, workbook.datemode)
return xls_value_to_unicode(value, cell.ctype, wb.datemode)
except XLDateAmbiguous as date_err:
raise PyXFormError(
XL_DATE_AMBIGOUS_MSG % (wb_sheet.name, col_key, row_n)
) from date_err

return None

def xls_to_dict_normal_sheet(sheet):
def xls_to_dict_normal_sheet(wb: xlrdBook, wb_sheet: xlrdSheet):
# XLS format: max cols 256, max rows 65536
first_row = (c.value for c in next(sheet.get_rows(), []))
first_row = (c.value for c in next(wb_sheet.get_rows(), []))
headers = get_excel_column_headers(first_row=first_row)
row_iter = (
tuple(sheet.cell(r, c) for c in range(len(headers)))
for r in range(1, sheet.nrows)
tuple(wb_sheet.cell(r, c) for c in range(len(headers)))
for r in range(1, wb_sheet.nrows)
)
rows = get_excel_rows(headers=headers, rows=row_iter, cell_func=xls_clean_cell)

# Inject wb/sheet as closure since functools.partial isn't typing friendly.
def clean_func(cell: xlrdCell, row_n: int, col_key: str) -> Optional[str]:
return xls_clean_cell(
wb=wb, wb_sheet=wb_sheet, cell=cell, row_n=row_n, col_key=col_key
)

rows = get_excel_rows(headers=headers, rows=row_iter, cell_func=clean_func)
column_header_list = [key for key in headers if key is not None]
return rows, _list_to_dict_list(column_header_list)

result_book = OrderedDict()
for wb_sheet in workbook.sheets():
# Note that the sheet exists but do no further processing here.
result_book[wb_sheet.name] = []
# Do not process sheets that have nothing to do with XLSForm.
if wb_sheet.name not in constants.SUPPORTED_SHEET_NAMES:
if len(workbook.sheets()) == 1:
(
result_book[constants.SURVEY],
result_book["%s_header" % constants.SURVEY],
) = xls_to_dict_normal_sheet(wb_sheet)
def process_workbook(wb: xlrdBook):
result_book = OrderedDict()
for wb_sheet in wb.sheets():
# Note that the sheet exists but do no further processing here.
result_book[wb_sheet.name] = []
# Do not process sheets that have nothing to do with XLSForm.
if wb_sheet.name not in constants.SUPPORTED_SHEET_NAMES:
if len(wb.sheets()) == 1:
(
result_book[constants.SURVEY],
result_book["%s_header" % constants.SURVEY],
) = xls_to_dict_normal_sheet(wb=wb, wb_sheet=wb_sheet)
else:
continue
else:
continue
else:
(
result_book[wb_sheet.name],
result_book["%s_header" % wb_sheet.name],
) = xls_to_dict_normal_sheet(wb_sheet)
(
result_book[wb_sheet.name],
result_book["%s_header" % wb_sheet.name],
) = xls_to_dict_normal_sheet(wb=wb, wb_sheet=wb_sheet)
return result_book

workbook.release_resources()
return result_book
try:
if isinstance(path_or_file, (str, bytes, os.PathLike)):
file = open(path_or_file, mode="rb")
else:
file = path_or_file
with closing(file) as wb_file:
workbook = xlrd.open_workbook(file_contents=wb_file.read())
try:
return process_workbook(wb=workbook)
finally:
workbook.release_resources()
except xlrd.XLRDError as read_err:
raise PyXFormError("Error reading .xls file: %s" % read_err) from read_err


def xls_value_to_unicode(value, value_type, datemode) -> str:
Expand Down Expand Up @@ -216,10 +237,6 @@ def xlsx_to_dict(path_or_file):
equal to the cell value for that row and column.
All the keys and leaf elements are strings.
"""
try:
workbook = openpyxl.open(filename=path_or_file, read_only=True, data_only=True)
except (OSError, BadZipFile, KeyError) as read_err:
raise PyXFormError("Error reading .xlsx file: %s" % read_err) from read_err

def xlsx_clean_cell(cell: pyxlCell, row_n: int, col_key: str) -> Optional[str]:
value = cell.value
Expand All @@ -230,7 +247,7 @@ def xlsx_clean_cell(cell: pyxlCell, row_n: int, col_key: str) -> Optional[str]:

return None

def xlsx_to_dict_normal_sheet(sheet):
def xlsx_to_dict_normal_sheet(sheet: pyxlWorksheet):
# XLSX format: max cols 16384, max rows 1048576
first_row = (c.value for c in next(sheet.rows, []))
headers = get_excel_column_headers(first_row=first_row)
Expand All @@ -239,28 +256,43 @@ def xlsx_to_dict_normal_sheet(sheet):
column_header_list = [key for key in headers if key is not None]
return rows, _list_to_dict_list(column_header_list)

result_book = OrderedDict()
for sheetname in workbook.sheetnames:
wb_sheet = workbook[sheetname]
# Note that the sheet exists but do no further processing here.
result_book[sheetname] = []
# Do not process sheets that have nothing to do with XLSForm.
if sheetname not in constants.SUPPORTED_SHEET_NAMES:
if len(workbook.sheetnames) == 1:
def process_workbook(wb: pyxlWorkbook):
result_book = OrderedDict()
for sheetname in wb.sheetnames:
wb_sheet = wb[sheetname]
# Note that the sheet exists but do no further processing here.
result_book[sheetname] = []
# Do not process sheets that have nothing to do with XLSForm.
if sheetname not in constants.SUPPORTED_SHEET_NAMES:
if len(wb.sheetnames) == 1:
(
result_book[constants.SURVEY],
result_book[f"{constants.SURVEY}_header"],
) = xlsx_to_dict_normal_sheet(wb_sheet)
else:
continue
else:
(
result_book[constants.SURVEY],
result_book[f"{constants.SURVEY}_header"],
result_book[sheetname],
result_book[f"{sheetname}_header"],
) = xlsx_to_dict_normal_sheet(wb_sheet)
else:
continue
else:
(
result_book[sheetname],
result_book[f"{sheetname}_header"],
) = xlsx_to_dict_normal_sheet(wb_sheet)
return result_book

workbook.close()
return result_book
try:
if isinstance(path_or_file, (str, bytes, os.PathLike)):
file = open(path_or_file, mode="rb")
else:
file = path_or_file
with closing(file) as wb_file:
reader = ExcelReader(wb_file, read_only=True, data_only=True)
reader.read()
try:
return process_workbook(wb=reader.wb)
finally:
reader.wb.close()
reader.archive.close()
except (OSError, BadZipFile, KeyError) as read_err:
raise PyXFormError("Error reading .xlsx file: %s" % read_err) from read_err


def xlsx_value_to_str(value) -> str:
Expand Down
5 changes: 4 additions & 1 deletion tests/xform_test_case/test_bugs.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ def test_conversion(self):
default_name="spaces_in_choices_header",
warnings=warnings,
)
self.assertEqual(len(warnings), 3, "Found " + str(len(warnings)) + " warnings")
# The "column with no header" warning is probably not reachable since XLS/X
# pre-processing ignores any columns without a header.
observed = [w for w in warnings if "Headers cannot include spaces" in w]
self.assertEqual(1, len(observed), warnings)

def test_values_with_spaces_are_cleaned(self):
"""
Expand Down

0 comments on commit 4c5ee0a

Please sign in to comment.