diff --git a/.github/workflows/python-app-linux.yml b/.github/workflows/python-app-linux.yml index 457cdb5..0c89eae 100644 --- a/.github/workflows/python-app-linux.yml +++ b/.github/workflows/python-app-linux.yml @@ -40,6 +40,9 @@ jobs: - name: Lint with ruff run: | ruff check . + - name: Check formatting with ruff + run: | + ruff format --check . - name: Test with pytest run: | pytest diff --git a/.github/workflows/python-app-macos-windows.yml b/.github/workflows/python-app-macos-windows.yml index 085393d..9d0d720 100644 --- a/.github/workflows/python-app-macos-windows.yml +++ b/.github/workflows/python-app-macos-windows.yml @@ -32,6 +32,9 @@ jobs: - name: Lint with ruff run: | ruff check . + - name: Check formatting with ruff + run: | + ruff format --check . - name: Test with pytest - exclude MySQL integration tests run: | pytest --ignore "test/test_multitag_mapping.py" --ignore "test/test_percent_symbol.py" --ignore "test/test_tags_length.py" --ignore "test/test_step_05.py" diff --git a/01-Load-Automated-Archive-into-Mysql.py b/01-Load-Automated-Archive-into-Mysql.py index 76f8783..a33fab0 100755 --- a/01-Load-Automated-Archive-into-Mysql.py +++ b/01-Load-Automated-Archive-into-Mysql.py @@ -3,12 +3,15 @@ from automated_archive import aa if __name__ == "__main__": - args_obj = Args() - args = args_obj.args_for_01() - log = args_obj.logger_with_filename() - sql = Sql(args, log) - - # eg: python 01-Load-Automated-Archive-into-Mysql.py -dh localhost -du root -dt dsa -dd temp_python -a AA -f /path/to/ARCHIVE_DB.pl -o . - log.info('Loading Automated Archive file "{0}" into database "{1}"'.format(args.db_input_file, args.temp_db_database)) - aa.clean_and_load_data(args, log) + args_obj = Args() + args = args_obj.args_for_01() + log = args_obj.logger_with_filename() + sql = Sql(args, log) + # eg: python 01-Load-Automated-Archive-into-Mysql.py -dh localhost -du root -dt dsa -dd temp_python -a AA -f /path/to/ARCHIVE_DB.pl -o . + log.info( + 'Loading Automated Archive file "{0}" into database "{1}"'.format( + args.db_input_file, args.temp_db_database + ) + ) + aa.clean_and_load_data(args, log) diff --git a/02a-Load-Chapters-to-Working-Table.py b/02a-Load-Chapters-to-Working-Table.py index 62beb47..a2bac7a 100755 --- a/02a-Load-Chapters-to-Working-Table.py +++ b/02a-Load-Chapters-to-Working-Table.py @@ -8,20 +8,19 @@ # Given an existing final chapter table, this will use the URL field and chapter location to load the chapter contents def __current_table(table_name, db): - query = "SELECT * FROM `{0}`.`{1}`".format(args.output_database, table_name) - dict_cursor = db.cursor(cursors.DictCursor) - dict_cursor.execute(query) - return dict_cursor.fetchall() + query = "SELECT * FROM `{0}`.`{1}`".format(args.output_database, table_name) + dict_cursor = db.cursor(cursors.DictCursor) + dict_cursor.execute(query) + return dict_cursor.fetchall() if __name__ == "__main__": - # TODO the eFiction process now loads chapters into MySQL before further processing in ODAP, all others should too - args_obj = Args() - args = args_obj.args_for_07() - log = args_obj.logger_with_filename() - sql = Sql(args, log) - chaps = Chapters(args, sql, log) + # TODO the eFiction process now loads chapters into MySQL before further processing in ODAP, all others should too + args_obj = Args() + args = args_obj.args_for_07() + log = args_obj.logger_with_filename() + sql = Sql(args, log) + chaps = Chapters(args, sql, log) - - log.info("Loading chapters from {0}...".format(args.chapters_path)) - chaps.populate_chapters() + log.info("Loading chapters from {0}...".format(args.chapters_path)) + chaps.populate_chapters() diff --git a/02b-Extract-Tags-From-Stories.py b/02b-Extract-Tags-From-Stories.py index c486fdc..dce273e 100755 --- a/02b-Extract-Tags-From-Stories.py +++ b/02b-Extract-Tags-From-Stories.py @@ -6,43 +6,65 @@ if __name__ == "__main__": - """ + """ Only for non-eFiction archives. This script creates a table called tags in the temporary database and denormalises all the tags for each story. This table is the basis for the Tag Wrangling sheet and is used to map the tags back to the story when the final tables are created. """ - args_obj = Args() - args = args_obj.args_for_02() - log = args_obj.logger_with_filename() - sql = Sql(args, log) - tags = Tags(args, sql, log) - log.info('Processing tags from stories and bookmarks table in {0}'.format(args.temp_db_database)) - tags.create_tags_table() - - tag_col_list = {} - stories_id_name = "" - stories_table_name = "" - - # AUTOMATED ARCHIVE - if args.archive_type == 'AA': - - story_table_name = input('Story table name (default: "stories"): ') - if story_table_name is None or story_table_name == '': - story_table_name = 'stories' - - bookmark_table_name = input('Bookmark table name (default: "story_links"): ') - if bookmark_table_name is None or bookmark_table_name == '': - bookmark_table_name = 'story_links' - - tag_columns = input('Column names containing tags \n (delimited by commas - default: "rating, tags, warnings, characters, fandoms, relationships"): ') - if tag_columns is None or tag_columns == '': - tag_columns = "rating, tags, warnings, characters, fandoms, relationships" - # fancy footwork to ensure compatibility with eFiction - tag_col_list = re.split(r", ?", tag_columns) - tag_columns_dict = dict(zip(tag_col_list, tag_col_list)) - fields_with_fandom = args.fields_with_fandom.split(", ") if args.fields_with_fandom is not None else [] - tags.populate_tag_table(args.temp_db_database, "id", story_table_name, tag_columns_dict, fields_with_fandom) - tags.populate_tag_table(args.temp_db_database, "id", bookmark_table_name, tag_columns_dict, fields_with_fandom, False) - - log.info("Done extracting tags.") + args_obj = Args() + args = args_obj.args_for_02() + log = args_obj.logger_with_filename() + sql = Sql(args, log) + tags = Tags(args, sql, log) + log.info( + "Processing tags from stories and bookmarks table in {0}".format( + args.temp_db_database + ) + ) + tags.create_tags_table() + + tag_col_list = {} + stories_id_name = "" + stories_table_name = "" + + # AUTOMATED ARCHIVE + if args.archive_type == "AA": + story_table_name = input('Story table name (default: "stories"): ') + if story_table_name is None or story_table_name == "": + story_table_name = "stories" + + bookmark_table_name = input('Bookmark table name (default: "story_links"): ') + if bookmark_table_name is None or bookmark_table_name == "": + bookmark_table_name = "story_links" + + tag_columns = input( + 'Column names containing tags \n (delimited by commas - default: "rating, tags, warnings, characters, fandoms, relationships"): ' + ) + if tag_columns is None or tag_columns == "": + tag_columns = "rating, tags, warnings, characters, fandoms, relationships" + # fancy footwork to ensure compatibility with eFiction + tag_col_list = re.split(r", ?", tag_columns) + tag_columns_dict = dict(zip(tag_col_list, tag_col_list)) + fields_with_fandom = ( + args.fields_with_fandom.split(", ") + if args.fields_with_fandom is not None + else [] + ) + tags.populate_tag_table( + args.temp_db_database, + "id", + story_table_name, + tag_columns_dict, + fields_with_fandom, + ) + tags.populate_tag_table( + args.temp_db_database, + "id", + bookmark_table_name, + tag_columns_dict, + fields_with_fandom, + False, + ) + + log.info("Done extracting tags.") diff --git a/03-Export-Tags-Authors-Stories.py b/03-Export-Tags-Authors-Stories.py index 9c0171b..4604b78 100755 --- a/03-Export-Tags-Authors-Stories.py +++ b/03-Export-Tags-Authors-Stories.py @@ -7,87 +7,166 @@ def write_csv(data, filename, columns): - with open(filename, "w", encoding="utf_8_sig", newline="") as fp: - myFile = csv.writer(fp) - myFile.writerow(columns) - if data: - for row in data: - r = [] - for s in row: - r.append('' if s is None else html.unescape(str(s))) - myFile.writerows([r]) - log.info(f"...Data written to {filename}") - else: - log.error(f"...No data to write to {filename}") - fp.close() + with open(filename, "w", encoding="utf_8_sig", newline="") as fp: + myFile = csv.writer(fp) + myFile.writerow(columns) + if data: + for row in data: + r = [] + for s in row: + r.append("" if s is None else html.unescape(str(s))) + myFile.writerows([r]) + log.info(f"...Data written to {filename}") + else: + log.error(f"...No data to write to {filename}") + fp.close() if __name__ == "__main__": - """ + """ This step exports the Tag Wrangling and Authors with stories CSV files which you then have to import into Google Spreadsheet and share with the rest of the Open Doors committee. """ - args_obj = Args() - args = args_obj.args_for_03() - log = args_obj.logger_with_filename() - sql = Sql(args, log) - tags = Tags(args, sql, log) + args_obj = Args() + args = args_obj.args_for_03() + log = args_obj.logger_with_filename() + sql = Sql(args, log) + tags = Tags(args, sql, log) + + # Tags + log.info(f"Exporting tags from {args.temp_db_database} to {args.output_folder}") + cols = tags.tag_export_map + results = tags.distinct_tags(args.temp_db_database) + write_csv( + results, + "{0}/{1} - tags.csv".format(args.output_folder, args.archive_name), + [ + cols["id"], + cols["original_tag"], + cols["original_table"], + cols["original_parent"], + cols["ao3_tag_fandom"], + cols["ao3_tag"], + cols["ao3_tag_type"], + cols["ao3_tag_category"], + cols["original_description"], + "TW Notes", + ], + ) - # Tags - log.info(f'Exporting tags from {args.temp_db_database} to {args.output_folder}') - cols = tags.tag_export_map - results = tags.distinct_tags(args.temp_db_database) - write_csv(results, '{0}/{1} - tags.csv'.format(args.output_folder, args.archive_name), - [cols['id'], cols['original_tag'], cols['original_table'], cols['original_parent'], - cols['ao3_tag_fandom'], cols['ao3_tag'], cols['ao3_tag_type'], cols['ao3_tag_category'], - cols['original_description'], "TW Notes"]) + # Stories with authors + log.debug( + f"Exporting authors with stories from {args.temp_db_database} to {args.output_folder}" + ) - # Stories with authors - log.debug(f'Exporting authors with stories from {args.temp_db_database} to {args.output_folder}') + author_table = f"{args.temp_db_database}.authors" + stories_table = f"{args.temp_db_database}.stories" + item_authors_table = f"{args.temp_db_database}.item_authors" + author_name = "name" + story_id = "id" + story_author_col = "author_id" + story_coauthor_col = "coauthor_id" + author_id = "id" + ia_author_col = "author_id" + ia_item_col = "item_id" - author_table = f'{args.temp_db_database}.authors' - stories_table = f'{args.temp_db_database}.stories' - item_authors_table = f'{args.temp_db_database}.item_authors' - author_name = 'name' - story_id = 'id' - story_author_col = 'author_id' - story_coauthor_col = 'coauthor_id' - author_id = 'id' - ia_author_col = 'author_id' - ia_item_col = 'item_id' - - results = sql.execute_and_fetchall(args.temp_db_database, """ + results = sql.execute_and_fetchall( + args.temp_db_database, + """ SELECT s.{0} as "Story ID", s.title as "Title", s.summary as "Summary", a.{1} as "Creator", a.email as "Creator Email", "" as "New Email address", "" as "AO3 Account? (& does email match?)", "" as "Searched/Found", "" as "Work on AO3?", "" as "Import status", "" as "importer/inviter", "" as "import/invite date", "" as "AO3 link", "" as "Notes (if any)" FROM {2} ia join {3} a on ia.{4} = a.{5} join {6} s on ia.{7} = s.{8} where ia.item_type = "story"; - """.format(story_id, author_name, item_authors_table, author_table, ia_author_col, author_id, stories_table, ia_item_col, story_id)) - write_csv(results, '{0}/{1} - authors with stories.csv'.format(args.output_folder, args.archive_name), - ["Story ID", "Title", "Summary", "Creator", "Creator Email", "New Email address", - "AO3 Account? (& does email match?)", "Searched/Found", "Work on AO3?", "Import status", - "importer/inviter", "import/invite date", "AO3 link", "Notes (if any)"]) + """.format( + story_id, + author_name, + item_authors_table, + author_table, + ia_author_col, + author_id, + stories_table, + ia_item_col, + story_id, + ), + ) + write_csv( + results, + "{0}/{1} - authors with stories.csv".format( + args.output_folder, args.archive_name + ), + [ + "Story ID", + "Title", + "Summary", + "Creator", + "Creator Email", + "New Email address", + "AO3 Account? (& does email match?)", + "Searched/Found", + "Work on AO3?", + "Import status", + "importer/inviter", + "import/invite date", + "AO3 link", + "Notes (if any)", + ], + ) - # Bookmarks with authors - log.debug(f'Exporting authors with bookmarks from {args.temp_db_database} to {args.output_folder}') - author_table = '{0}.authors'.format(args.temp_db_database) - bookmarks_table = '{0}.story_links'.format(args.temp_db_database) - item_authors_table = '{0}.item_authors'.format(args.temp_db_database) - author_name = 'name' - bookmark_id = 'id' - bookmark_author_col = 'author_id' - bookmark_coauthor_col = 'coauthor_id' - author_id = 'id' - ia_author_col = 'author_id' - ia_item_col = 'item_id' + # Bookmarks with authors + log.debug( + f"Exporting authors with bookmarks from {args.temp_db_database} to {args.output_folder}" + ) + author_table = "{0}.authors".format(args.temp_db_database) + bookmarks_table = "{0}.story_links".format(args.temp_db_database) + item_authors_table = "{0}.item_authors".format(args.temp_db_database) + author_name = "name" + bookmark_id = "id" + bookmark_author_col = "author_id" + bookmark_coauthor_col = "coauthor_id" + author_id = "id" + ia_author_col = "author_id" + ia_item_col = "item_id" - results = sql.execute_and_fetchall(args.temp_db_database, """ + results = sql.execute_and_fetchall( + args.temp_db_database, + """ SELECT s.{0} as "Bookmark ID", s.title as "Title", s.summary as "Summary", a.{1} as "Creator", a.email as "Creator Email", s.url as "URL", "" as "New Email address", "" as "AO3 Account? (& does email match?)", "" as "Searched/Found", "" as "Work on AO3?", "" as "Import status", "" as "importer/inviter", "" as "import/invite date", "" as "AO3 link", "" as "Notes (if any)" FROM {2} ia join {3} a on ia.{4} = a.{5} join {6} s on ia.{7} = s.{8} where ia.item_type = "story_link"; - """.format(bookmark_id, author_name, item_authors_table, author_table, ia_author_col, author_id, bookmarks_table, ia_item_col, bookmark_id)) - write_csv(results, '{0}/{1} - authors with bookmarks.csv'.format(args.output_folder, args.archive_name), - ["Bookmark ID", "Title", "Summary", "Creator", "Creator Email", "URL", "New Email address", - "AO3 Account? (& does email match?)", "Searched/Found", "Work on AO3?", "Import status", - "importer/inviter", "import/invite date", "AO3 link", "Notes (if any)"]) + """.format( + bookmark_id, + author_name, + item_authors_table, + author_table, + ia_author_col, + author_id, + bookmarks_table, + ia_item_col, + bookmark_id, + ), + ) + write_csv( + results, + "{0}/{1} - authors with bookmarks.csv".format( + args.output_folder, args.archive_name + ), + [ + "Bookmark ID", + "Title", + "Summary", + "Creator", + "Creator Email", + "URL", + "New Email address", + "AO3 Account? (& does email match?)", + "Searched/Found", + "Work on AO3?", + "Import status", + "importer/inviter", + "import/invite date", + "AO3 link", + "Notes (if any)", + ], + ) diff --git a/04-Rename-Tags.py b/04-Rename-Tags.py index 166215c..2316ca5 100755 --- a/04-Rename-Tags.py +++ b/04-Rename-Tags.py @@ -8,21 +8,21 @@ if __name__ == "__main__": - """ + """ When Tag Wrangling have finished mapping the tags in Google Drive, export the spreadsheet as a CSV file. This script then copies the AO3 tags from that file into the tags table in the temporary database. """ - args_obj = Args() - args = args_obj.args_for_04() - log = args_obj.logger_with_filename() - sql = Sql(args, log) - tags = Tags(args, sql, log) + args_obj = Args() + args = args_obj.args_for_04() + log = args_obj.logger_with_filename() + sql = Sql(args, log) + tags = Tags(args, sql, log) - with open(args.tag_input_file, 'r', encoding='utf-8-sig') as csvfile: - tw_tags = list(csv.DictReader(csvfile)) - tag_headers = tags.tag_export_map - total = len(tw_tags) + with open(args.tag_input_file, "r", encoding="utf-8-sig") as csvfile: + tw_tags = list(csv.DictReader(csvfile)) + tag_headers = tags.tag_export_map + total = len(tw_tags) - for cur, row in enumerate(tw_tags): - tags.update_tag_row(row) - print_progress(cur, total, "tags") + for cur, row in enumerate(tw_tags): + tags.update_tag_row(row) + print_progress(cur, total, "tags") diff --git a/05-Create-Open-Doors-Tables.py b/05-Create-Open-Doors-Tables.py index 809a0ec..eea69ba 100755 --- a/05-Create-Open-Doors-Tables.py +++ b/05-Create-Open-Doors-Tables.py @@ -12,12 +12,15 @@ def _clean_email(author): :param author: row from the authors table :return: """ - email = author['email'] - if email is None or email == '': - email = u'{0}{1}Archive@ao3.org'.format(author['name'], args.archive_name) \ - .replace(' ', '').replace("'", "") - if email.startswith('mailto:'): - email = author['email'].replace('mailto:', '') + email = author["email"] + if email is None or email == "": + email = ( + "{0}{1}Archive@ao3.org".format(author["name"], args.archive_name) + .replace(" ", "") + .replace("'", "") + ) + if email.startswith("mailto:"): + email = author["email"].replace("mailto:", "") return email @@ -28,101 +31,158 @@ def main(args, log): log.info("Creating final destination tables in {0}".format(args.output_database)) table_names = { - 'authors': 'authors', - 'stories': 'stories', - 'chapters': 'chapters', - 'story_links': 'story_links' + "authors": "authors", + "stories": "stories", + "chapters": "chapters", + "story_links": "story_links", } - filter = 'WHERE id NOT IN ' + filter = "WHERE id NOT IN " - sql.run_script_from_file('shared_python/create-open-doors-tables.sql', - database=args.output_database) + sql.run_script_from_file( + "shared_python/create-open-doors-tables.sql", database=args.output_database + ) # Filter out DNI stories - story_ids_to_remove must be comma-separated list of DNI ids - story_exclusion_filter = '' + story_exclusion_filter = "" if os.path.exists(args.story_ids_to_remove): with open(args.story_ids_to_remove, "rt") as f: - log.info("Removing {0} Do Not Import stories...".format(sum(line.count(",") for line in f) + 1)) + log.info( + "Removing {0} Do Not Import stories...".format( + sum(line.count(",") for line in f) + 1 + ) + ) f.seek(0) for line in f: - story_exclusion_filter = filter + '(' + line + ')' + story_exclusion_filter = filter + "(" + line + ")" # Filter out DNI stories - bookmark_ids_to_remove must be comma-separated list of DNI ids - bookmark_exclusion_filter = '' + bookmark_exclusion_filter = "" if args.bookmark_ids_to_remove and os.path.exists(args.bookmark_ids_to_remove): with open(args.bookmark_ids_to_remove, "rt") as f: - log.info("Removing {0} Do Not Import bookmarks...".format(sum(line.count(",") for line in f) + 1)) + log.info( + "Removing {0} Do Not Import bookmarks...".format( + sum(line.count(",") for line in f) + 1 + ) + ) f.seek(0) for line in f: - bookmark_exclusion_filter = filter + '(' + line + ')' + bookmark_exclusion_filter = filter + "(" + line + ")" # Load filtered tables into variables - stories_without_tags = final.original_table(table_names['stories'], story_exclusion_filter) - log.info("Stories without tags after removing DNI: {0}".format(len(stories_without_tags))) - bookmarks_without_tags = final.original_table(table_names['story_links'], bookmark_exclusion_filter) + stories_without_tags = final.original_table( + table_names["stories"], story_exclusion_filter + ) + log.info( + "Stories without tags after removing DNI: {0}".format(len(stories_without_tags)) + ) + bookmarks_without_tags = final.original_table( + table_names["story_links"], bookmark_exclusion_filter + ) if bookmarks_without_tags: - log.info("Bookmarks without tags after removing DNI: {0}".format(len(bookmarks_without_tags))) + log.info( + "Bookmarks without tags after removing DNI: {0}".format( + len(bookmarks_without_tags) + ) + ) else: log.info("No bookmarks to remove") # STORIES - log.info("Copying stories to final table {0}.stories...".format(args.output_database)) + log.info( + "Copying stories to final table {0}.stories...".format(args.output_database) + ) final_stories = [] for story in stories_without_tags: - story_authors = final.original_table('item_authors', f"WHERE item_id={story['id']} and item_type='story'") + story_authors = final.original_table( + "item_authors", f"WHERE item_id={story['id']} and item_type='story'" + ) # Add additional story processing here if len(story_authors) > 0: - final_stories.append(final.story_to_final_without_tags(story, story_authors)) - else: - log.warning(f"Story with id {story['id']} has no authors, and will not be imported") - final.insert_into_final('stories', final_stories) + final_stories.append( + final.story_to_final_without_tags(story, story_authors) + ) + else: + log.warning( + f"Story with id {story['id']} has no authors, and will not be imported" + ) + final.insert_into_final("stories", final_stories) # BOOKMARKS if bookmarks_without_tags is not None: - log.info("Copying bookmarks to final table {0}.story_links...".format(args.output_database)) + log.info( + "Copying bookmarks to final table {0}.story_links...".format( + args.output_database + ) + ) final_bookmarks = [] for bookmark in bookmarks_without_tags: # Add additional bookmark processing here - bookmark_authors = final.original_table('item_authors', - f"WHERE item_id={bookmark['id']} and item_type='story_link'") - final_bookmarks.append(final.story_to_final_without_tags(bookmark, bookmark_authors, False)) - if final_bookmarks: final.insert_into_final('story_links', final_bookmarks) # noqa: E701 + bookmark_authors = final.original_table( + "item_authors", + f"WHERE item_id={bookmark['id']} and item_type='story_link'", + ) + final_bookmarks.append( + final.story_to_final_without_tags(bookmark, bookmark_authors, False) + ) + if final_bookmarks: + final.insert_into_final("story_links", final_bookmarks) # noqa: E701 # AUTHORS - log.info("Copying authors to final table {0}.authors, cleaning emails and removing authors with no works...".format( - args.output_database)) + log.info( + "Copying authors to final table {0}.authors, cleaning emails and removing authors with no works...".format( + args.output_database + ) + ) final_authors = [] - authors = final.original_table(table_names['authors']) + authors = final.original_table(table_names["authors"]) for final_author in authors: - if any(story['author_id'] == final_author['id'] or story['coauthor_id'] == final_author['id'] for story in - final_stories) \ - or any(bookmark['author_id'] == final_author['id'] for bookmark in final_bookmarks): - final_author['email'] = _clean_email(final_author) + if any( + story["author_id"] == final_author["id"] + or story["coauthor_id"] == final_author["id"] + for story in final_stories + ) or any( + bookmark["author_id"] == final_author["id"] for bookmark in final_bookmarks + ): + final_author["email"] = _clean_email(final_author) final_authors.append(final_author) - final.insert_into_final('authors', final_authors) + final.insert_into_final("authors", final_authors) # CHAPTERS - chapters = final.original_table(table_names['chapters'], '') + chapters = final.original_table(table_names["chapters"], "") if chapters: dest_chapter_table = f"{args.output_database}.{table_names['chapters']}" - log.info("Copying chapters table {0} from source chapters table...".format(dest_chapter_table)) + log.info( + "Copying chapters table {0} from source chapters table...".format( + dest_chapter_table + ) + ) sql.execute("drop table if exists {0}".format(dest_chapter_table)) - truncate_and_insert = "create table {0} (unique(id), key(story_id)) select * from {1}.{2}".format( - dest_chapter_table, - args.temp_db_database, - table_names['chapters']) + truncate_and_insert = ( + "create table {0} (unique(id), key(story_id)) select * from {1}.{2}".format( + dest_chapter_table, args.temp_db_database, table_names["chapters"] + ) + ) sql.execute(truncate_and_insert) - add_auto_increment = "alter table {0} modify id int not null auto_increment".format(dest_chapter_table) + add_auto_increment = ( + "alter table {0} modify id int not null auto_increment".format( + dest_chapter_table + ) + ) sql.execute(add_auto_increment) else: - log.info("Creating chapters table {0}.chapters from source stories table...".format(args.output_database)) + log.info( + "Creating chapters table {0}.chapters from source stories table...".format( + args.output_database + ) + ) final_chapters = final.dummy_chapters(final_stories) - final.insert_into_final('chapters', final_chapters) + final.insert_into_final("chapters", final_chapters) + if __name__ == "__main__": args_obj = Args() args = args_obj.args_for_05() log = args_obj.logger_with_filename() - main(args, log) \ No newline at end of file + main(args, log) diff --git a/06-Update-Tags-In-Story-Table.py b/06-Update-Tags-In-Story-Table.py index e6122c8..5ac5e8c 100755 --- a/06-Update-Tags-In-Story-Table.py +++ b/06-Update-Tags-In-Story-Table.py @@ -7,15 +7,15 @@ if __name__ == "__main__": - """ + """ Denormalize tags out of the working tags table into comma-separated lists in the stories or story_link tables """ - args_obj = Args() - args = args_obj.args_for_06() - log = args_obj.logger_with_filename() - sql = Sql(args, log) - tags = Tags(args, sql, log) - final = FinalTables(args, sql, log) - populate_tags = PopulateTags(args, sql, log, tags, final) + args_obj = Args() + args = args_obj.args_for_06() + log = args_obj.logger_with_filename() + sql = Sql(args, log) + tags = Tags(args, sql, log) + final = FinalTables(args, sql, log) + populate_tags = PopulateTags(args, sql, log, tags, final) - populate_tags.populate_tags() \ No newline at end of file + populate_tags.populate_tags() diff --git a/automated_archive/aa.py b/automated_archive/aa.py index 2dee737..83c082c 100755 --- a/automated_archive/aa.py +++ b/automated_archive/aa.py @@ -12,180 +12,237 @@ def _escape_quote(text): - return text.replace("(?', '":') - .replace(';\n', ',\n') - .replace(',\n"\n},\n1,', '}') - ) - # Replace line breaks within fields (followed by a character that isn't a space, tab, digit, } or ") - step3 = re.sub(r"\n(?=[^ \t\d\}\"])", " ", step2) - - # Edit these to fix dodgy data specific to this archive - final_replace = step3.replace("0,/2,/25", "01/30/00").replace('\t"PrintTime": \'P\',\n', "") - final_regex = re.sub(r"00,02,\d(.*?)',", "02/26/00',", final_replace) - - archive_db_python = eval(final_regex) - - # List fields in AA db file - keys = [dict.keys() for dict in archive_db_python.values()] - unique_keys = set([val for sublist in keys for val in sublist]) - log.info("Fields in ARCHIVE_DB.pl: {0}".format(", ".join(str(e) for e in unique_keys))) - - return archive_db_python + """ + Convert the Perl hash into a Python dictionary + :param filepath: Path to ARCHIVE_DB.pl + :return: Python dictionary keyed by original story id + """ + h = HTMLParser() + archive_db = codecs.open(filepath, "r", encoding="utf-8").read() + + # Manually escape single quote entity and reformat file as a Python dictionary + step1 = h.unescape(archive_db.replace("'", "\\'")) + + # Indent the file with a single tab instead of whatever is currently used + step15 = re.sub(r"^\s+", "\t", step1) + + step2 = ( + step15.replace("%FILES = (\n\n", '{\n"') + .replace("\n)", "\n}") + .replace("},\n", '},\n"') + .replace("\t\n", "") + .replace("\t", '\t"') + .replace(" =>", '":') + .replace(";\n", ",\n") + .replace(',\n"\n},\n1,', "}") + ) + # Replace line breaks within fields (followed by a character that isn't a space, tab, digit, } or ") + step3 = re.sub(r"\n(?=[^ \t\d\}\"])", " ", step2) + + # Edit these to fix dodgy data specific to this archive + final_replace = step3.replace("0,/2,/25", "01/30/00").replace( + "\t\"PrintTime\": 'P',\n", "" + ) + final_regex = re.sub(r"00,02,\d(.*?)',", "02/26/00',", final_replace) + + archive_db_python = eval(final_regex) + + # List fields in AA db file + keys = [dict.keys() for dict in archive_db_python.values()] + unique_keys = set([val for sublist in keys for val in sublist]) + log.info( + "Fields in ARCHIVE_DB.pl: {0}".format(", ".join(str(e) for e in unique_keys)) + ) + + return archive_db_python def _is_external(record): - """ - AA is pretty flexible - define the bookmark criteria here, whatever it is - :param record: - :return: whether this record is an external link - """ - # Spooky 2003 - # return record.get('Offsite', 'none') != 'none' - # or record.get('FileType', 'none') == 'none' \ - # Spooky 2004 - # return record.get('Offsite', 'none') == 'offsite' - # Spooky 2005 - return record.get('LocationURL', '').startswith('http') + """ + AA is pretty flexible - define the bookmark criteria here, whatever it is + :param record: + :return: whether this record is an external link + """ + # Spooky 2003 + # return record.get('Offsite', 'none') != 'none' + # or record.get('FileType', 'none') == 'none' \ + # Spooky 2004 + # return record.get('Offsite', 'none') == 'offsite' + # Spooky 2005 + return record.get("LocationURL", "").startswith("http") def _extract_tags(args, record): - tags = "" - if args.tag_fields is not None: - for tag_field in args.tag_fields.split(', '): - tags += record.get(tag_field, '').replace("'", "\\'").replace('"', '\\"') + ', ' - return tags.strip(', ') + tags = "" + if args.tag_fields is not None: + for tag_field in args.tag_fields.split(", "): + tags += ( + record.get(tag_field, "").replace("'", "\\'").replace('"', '\\"') + ", " + ) + return tags.strip(", ") def _extract_characters(args, record): - tags = "" - if args.character_fields is not None: - for character_field in args.character_fields.split(', '): - tags += record.get(character_field, '').replace("'", "\\'").replace('"', '\\"') + ', ' - return tags.strip(', ') + tags = "" + if args.character_fields is not None: + for character_field in args.character_fields.split(", "): + tags += ( + record.get(character_field, "").replace("'", "\\'").replace('"', '\\"') + + ", " + ) + return tags.strip(", ") def _extract_relationships(args, record): - tags = "" - if args.relationship_fields is not None: - for relationship_field in args.relationship_fields.split(', '): - tags += record.get(relationship_field, '').replace("'", "\\'").replace('"', '\\"') + ', ' - return tags.strip(', ') + tags = "" + if args.relationship_fields is not None: + for relationship_field in args.relationship_fields.split(", "): + tags += ( + record.get(relationship_field, "") + .replace("'", "\\'") + .replace('"', '\\"') + + ", " + ) + return tags.strip(", ") def _extract_fandoms(args, record): - tags = "" - if args.fandom_fields is not None: - for fandom_field in args.fandom_fields.split(', '): - tags += record.get(fandom_field, '').replace("'", "\\'").replace('"', '\\"') + ', ' - return tags.strip(', ') + tags = "" + if args.fandom_fields is not None: + for fandom_field in args.fandom_fields.split(", "): + tags += ( + record.get(fandom_field, "").replace("'", "\\'").replace('"', '\\"') + + ", " + ) + return tags.strip(", ") def _create_mysql(args, FILES, log): - db = connect(args.db_host, args.db_user, args.db_password, "") - cursor = db.cursor() - DATABASE_NAME = args.temp_db_database - - # Use the database and empty all the tables - cursor.execute(u"drop database if exists {0};".format(DATABASE_NAME)) - cursor.execute(u"create database {0};".format(DATABASE_NAME)) - cursor.execute(u"use {0}".format(DATABASE_NAME)) - - sql = Sql(args) - sql.run_script_from_file('shared_python/create-open-doors-tables.sql', DATABASE_NAME) - db.commit() - - authors = [(FILES[i].get('Author', '').strip(), FILES[i].get('Email', FILES[i].get('EmailAuthor', '')).lower().strip()) for i in FILES] - auth = u"INSERT INTO authors (name, email) VALUES(%s, %s);" - cursor.executemany(auth, set(authors)) - db.commit() - - # Authors - auth = u"SELECT * FROM authors;" - cursor.execute(auth) - db_authors = cursor.fetchall() - - # Stories and bookmarks - stories = [(i, - FILES[i].get('Title', '').replace("'", "\\'"), - FILES[i].get('Summary', '').replace("'", "\\'"), - _extract_tags(args, FILES[i]), - _extract_characters(args, FILES[i]), - datetime.datetime.strptime( - FILES[i].get('PrintTime', - FILES[i].get('DatePrint', - FILES[i].get('Date', - str(datetime.datetime.now().strftime('%m/%d/%y'))))), - '%m/%d/%y').strftime('%Y-%m-%d'), - FILES[i].get('Location', '').replace("'", "\\'"), - FILES[i].get('LocationURL', FILES[i].get('StoryURL', '')).replace("'", "\\'"), - FILES[i].get('Notes', '').replace("'", "\\'"), - _extract_relationships(args, FILES[i]), - FILES[i].get('Rating', ''), - FILES[i].get('Warnings', '').replace("'", "\\'"), - FILES[i].get('Author', '').strip(), - FILES[i].get('Email', FILES[i].get('EmailAuthor', '')).lower().strip(), - FILES[i].get('FileType', args.chapters_file_extensions) if not _is_external(FILES[i]) else 'bookmark', - _extract_fandoms(args, FILES[i]), - ) - for i in FILES] - - cur = 0 - total = len(FILES) - for (original_id, title, summary, tags, characters, date, location, url, notes, pairings, rating, warnings, author, - email, filetype, fandoms) in set(stories): - - cur = Common.print_progress(cur, total) - try: - # For AA archives with external links: - if filetype != 'bookmark': - if location == '': - filename = url - else: - filename = location + '.' + filetype - table_name = 'stories' - else: - filename = url - table_name = 'bookmarks' - - # Clean up fandoms and add default fandom if it exists - final_fandoms = fandoms.replace("'", r"\'") - if args.default_fandom is not None: - if final_fandoms == '' or final_fandoms == args.default_fandom: - final_fandoms = args.default_fandom - else: - final_fandoms = args.default_fandom + ', ' + final_fandoms - - result = [element for element in db_authors if element[1] == author and element[2] == email] - authorid = result[0][0] - - stor = u""" + db = connect(args.db_host, args.db_user, args.db_password, "") + cursor = db.cursor() + DATABASE_NAME = args.temp_db_database + + # Use the database and empty all the tables + cursor.execute("drop database if exists {0};".format(DATABASE_NAME)) + cursor.execute("create database {0};".format(DATABASE_NAME)) + cursor.execute("use {0}".format(DATABASE_NAME)) + + sql = Sql(args) + sql.run_script_from_file( + "shared_python/create-open-doors-tables.sql", DATABASE_NAME + ) + db.commit() + + authors = [ + ( + FILES[i].get("Author", "").strip(), + FILES[i].get("Email", FILES[i].get("EmailAuthor", "")).lower().strip(), + ) + for i in FILES + ] + auth = "INSERT INTO authors (name, email) VALUES(%s, %s);" + cursor.executemany(auth, set(authors)) + db.commit() + + # Authors + auth = "SELECT * FROM authors;" + cursor.execute(auth) + db_authors = cursor.fetchall() + + # Stories and bookmarks + stories = [ + ( + i, + FILES[i].get("Title", "").replace("'", "\\'"), + FILES[i].get("Summary", "").replace("'", "\\'"), + _extract_tags(args, FILES[i]), + _extract_characters(args, FILES[i]), + datetime.datetime.strptime( + FILES[i].get( + "PrintTime", + FILES[i].get( + "DatePrint", + FILES[i].get( + "Date", str(datetime.datetime.now().strftime("%m/%d/%y")) + ), + ), + ), + "%m/%d/%y", + ).strftime("%Y-%m-%d"), + FILES[i].get("Location", "").replace("'", "\\'"), + FILES[i] + .get("LocationURL", FILES[i].get("StoryURL", "")) + .replace("'", "\\'"), + FILES[i].get("Notes", "").replace("'", "\\'"), + _extract_relationships(args, FILES[i]), + FILES[i].get("Rating", ""), + FILES[i].get("Warnings", "").replace("'", "\\'"), + FILES[i].get("Author", "").strip(), + FILES[i].get("Email", FILES[i].get("EmailAuthor", "")).lower().strip(), + FILES[i].get("FileType", args.chapters_file_extensions) + if not _is_external(FILES[i]) + else "bookmark", + _extract_fandoms(args, FILES[i]), + ) + for i in FILES + ] + + cur = 0 + total = len(FILES) + for ( + original_id, + title, + summary, + tags, + characters, + date, + location, + url, + notes, + pairings, + rating, + warnings, + author, + email, + filetype, + fandoms, + ) in set(stories): + cur = Common.print_progress(cur, total) + try: + # For AA archives with external links: + if filetype != "bookmark": + if location == "": + filename = url + else: + filename = location + "." + filetype + table_name = "stories" + else: + filename = url + table_name = "bookmarks" + + # Clean up fandoms and add default fandom if it exists + final_fandoms = fandoms.replace("'", r"\'") + if args.default_fandom is not None: + if final_fandoms == "" or final_fandoms == args.default_fandom: + final_fandoms = args.default_fandom + else: + final_fandoms = args.default_fandom + ", " + final_fandoms + + result = [ + element + for element in db_authors + if element[1] == author and element[2] == email + ] + authorid = result[0][0] + + stor = """ INSERT INTO {0} (id, fandoms, title, summary, tags, characters, date, url, notes, relationships, rating, warnings, author_id) - VALUES({1}, '{2}', '{3}', '{4}', '{5}', '{6}', '{7}', '{8}', '{9}', '{10}', '{11}', '{12}', '{13}');\n""" \ - .format(table_name, + VALUES({1}, '{2}', '{3}', '{4}', '{5}', '{6}', '{7}', '{8}', '{9}', '{10}', '{11}', '{12}', '{13}');\n""".format( + table_name, original_id, final_fandoms.replace(r"\\", "\\"), title.replace(r"\\", "\\"), @@ -198,35 +255,39 @@ def _create_mysql(args, FILES, log): pairings, rating, warnings, - authorid) - cursor.execute(stor) - except: - log.error("table name: {0}\noriginal id: {1}\nfinal fandoms: '{2}'\ntitle: '{3}'\nsummary: '{4}'\ntags: '{5}'" \ - "\ncharacters: '{6}'\ndate: '{7}'\nfilename: '{8}'\nnotes: '{9}'\npairings: '{10}'\nrating: '{11}'" \ - "\nwarnings: '{12}'\nauthor id: '{13}'"\ - .format(table_name, - original_id, - final_fandoms, - title, - summary, - tags, - characters, - date, - filename, - notes, - pairings, - rating, - warnings, - authorid)) - raise - db.commit() + authorid, + ) + cursor.execute(stor) + except: + log.error( + "table name: {0}\noriginal id: {1}\nfinal fandoms: '{2}'\ntitle: '{3}'\nsummary: '{4}'\ntags: '{5}'" + "\ncharacters: '{6}'\ndate: '{7}'\nfilename: '{8}'\nnotes: '{9}'\npairings: '{10}'\nrating: '{11}'" + "\nwarnings: '{12}'\nauthor id: '{13}'".format( + table_name, + original_id, + final_fandoms, + title, + summary, + tags, + characters, + date, + filename, + notes, + pairings, + rating, + warnings, + authorid, + ) + ) + raise + db.commit() def clean_and_load_data(args, log): - data = _clean_file(args.db_input_file, log) - _create_mysql(args, data, log) + data = _clean_file(args.db_input_file, log) + _create_mysql(args, data, log) if __name__ == "__main__": - args = Args().process_args() - data = _clean_file(args.filepath) + args = Args().process_args() + data = _clean_file(args.filepath) diff --git a/shared_python/Args.py b/shared_python/Args.py index f9e8078..9f76fe0 100755 --- a/shared_python/Args.py +++ b/shared_python/Args.py @@ -4,150 +4,255 @@ from shared_python.Logging import logger -class Args(object): - - def __init__(self): - self.args = self._process_args() - self.log = logger(self.args.archive_name) - - def logger_with_filename(self): - return self.log - - @staticmethod - def _load_args_from_file(filepath): - """ - Read the file passed as parameter as a properties file. - """ - with open(filepath, "rt") as f: - return yaml.safe_load(f) - def _process_args(self): - - argdict = { - 'db_host': 'MySQL host name and port', - 'db_user': 'MySQL user', - 'db_password': 'MySQL password', - 'temp_db_database': 'MySQL temporary database name to use for processing (will be destroyed if it exists)', - } - parser = argparse.ArgumentParser(description='Process an archive database') - for name, helptext in argdict.items(): - parser.add_argument('-d' + name.split('_')[1][0], '--' + name, type=str, help=helptext) - - # Pass in a file with all the properties - parser.add_argument('-p', '--properties_file', type=str, help='Load properties from specified file (ignores all other arguments)') - - # General archive-specific settings - parser.add_argument('-a', '--archive_type', type=str, choices=['AA'], help='Type of archive: AA') - parser.add_argument('-df', '--default_fandom', type=str, help='Default fandom to use') - parser.add_argument('-n', '--archive_name', type=str, help='Name of the original archive (used in the temporary site)') - - # Database settings - parser.add_argument('-i', '--db_input_file', type=str, help='Path to input file (ARCHIVE_DB.pl for AA)') - parser.add_argument('-o', '--output_folder', type=str, help='Path for output files') - parser.add_argument('-od', '--output_database', type=str, help='Name of the database the final tables should be created in (default "od_sgf")') - - # Tag settings - parser.add_argument('-ft', '--tag_fields', type=str, help='List of tag field(s) in original db (comma-delimited)') - parser.add_argument('-fc', '--character_fields', type=str, help='List of character field(s) in original db (comma-delimited)') - parser.add_argument('-fr', '--relationship_fields', type=str, help='List of relationship field(s) in original db (comma-delimited)') - parser.add_argument('-ff', '--fandom_fields', type=str, help='List of fandom field(s) in original db (comma-delimited)') - parser.add_argument('-wf', '--fields_with_fandom', type=str, help='List of output tag fields where the fandom should be listed too (comma-delimited)') - - # Wrangling and search processing - parser.add_argument('-t', '--tag_input_file', type=str, help='Path to tag renaming input CSV') - parser.add_argument('-si', '--story_ids_to_remove', type=str, help='Location of the text file containing the story ids to remove') - parser.add_argument('-bi', '--bookmark_ids_to_remove', type=str, help='Location of the text file containing the bookmark ids to remove') - - # Chapters - parser.add_argument('-cp', '--chapters_path', type=str, help='Location of the text files containing the stories') - parser.add_argument('-cf', '--chapters_file_extensions', type=str, help='File extension(s) of the text files containing the stories (eg: "txt, html")') - - - args = parser.parse_args() - if args.properties_file is not None and os.path.isfile(args.properties_file): - props = self._load_args_from_file(args.properties_file) - for k, v in props.items(): - if v == '': - setattr(args, k, None) - else: - setattr(args, k, v) - - for arg_name in argdict.keys(): - if getattr(args, arg_name) is None: - setattr(args, arg_name, input(argdict[arg_name] + ': ')) - - args.archive_name = input('Name of the original archive (used in export file names): ') if args.archive_name is None else args.archive_name - - while args.archive_type is None or args.archive_type not in ['AA']: - args.archive_type = input('Type of archive (AA): ') - - return args - - - def _print_args(self, args): - self.log.info('----------- Open Door Archive Import Parameters --------------') - for arg in vars(args): - self.log.info('{0} = {1}'.format(arg, getattr(args, arg))) - self.log.info('--------------------------------------------------------------') - - - def args_for_01(self): - while self.args.db_input_file is None or not os.path.isfile(self.args.db_input_file): - self.args.db_input_file = input('Path to the input file (ARCHIVE_DB.pl for AA): ') - self._print_args(self.args) - return self.args - - - def args_for_02(self): - self._print_args(self.args) - return self.args - - - def args_for_03(self): - if not os.path.exists(self.args.output_folder): - os.makedirs(self.args.output_folder) - while self.args.output_folder is None or not os.path.isdir(self.args.output_folder): - self.args.output_folder = input('Path for output files: ') - if not os.path.exists(self.args.output_folder): - os.makedirs(self.args.output_folder) - self._print_args(self.args) - return self.args - - - def args_for_04(self): - while self.args.tag_input_file is None or not os.path.isfile(self.args.tag_input_file): - self.args.tag_input_file = input('Path to tag renaming csv file: ') - self._print_args(self.args) - return self.args - - - def args_for_05(self): - if self.args.output_database is None: - self.args.output_database = input('Name of the database the final tables should be created in (default "od_sgf"):') - self.args.output_database = "od_sgf" if self.args.output_database == "" else self.args.output_database - if self.args.story_ids_to_remove is None: - self.args.story_ids_to_remove = input('Location of the text file containing the story ids to remove:') - self._print_args(self.args) - return self.args - - - def args_for_06(self): - if self.args.output_database is None: - self.args.output_database = input('Name of the database the final tables should be created in (default "od_sgf"):') - self.args.output_database = "od_sgf" if self.args.output_database == "" else self.args.output_database - if self.args.default_fandom is None: - self.args.default_fandom = input('Default fandom:') - self.args.default_fandom = '' if self.args.default_fandom is None else self.args.default_fandom - self._print_args(self.args) - return self.args - - def args_for_07(self): - if self.args.output_database is None: - self.args.output_database = input('Name of the database the final tables should be created in (default "od_sgf"):') - self.args.output_database = "od_sgf" if self.args.output_database == "" else self.args.output_database - if self.args.chapters_path is None: - self.args.chapters_path = input('Location of the text files containing the stories:') - if self.args.chapters_path is not None and self.args.chapters_file_extensions is None: - self.args.chapters_file_extensions = input('File extension(s) of the text files containing the stories (eg: "txt, html"):') - self._print_args(self.args) - return self.args +class Args(object): + def __init__(self): + self.args = self._process_args() + self.log = logger(self.args.archive_name) + + def logger_with_filename(self): + return self.log + + @staticmethod + def _load_args_from_file(filepath): + """ + Read the file passed as parameter as a properties file. + """ + with open(filepath, "rt") as f: + return yaml.safe_load(f) + + def _process_args(self): + argdict = { + "db_host": "MySQL host name and port", + "db_user": "MySQL user", + "db_password": "MySQL password", + "temp_db_database": "MySQL temporary database name to use for processing (will be destroyed if it exists)", + } + parser = argparse.ArgumentParser(description="Process an archive database") + for name, helptext in argdict.items(): + parser.add_argument( + "-d" + name.split("_")[1][0], "--" + name, type=str, help=helptext + ) + + # Pass in a file with all the properties + parser.add_argument( + "-p", + "--properties_file", + type=str, + help="Load properties from specified file (ignores all other arguments)", + ) + + # General archive-specific settings + parser.add_argument( + "-a", "--archive_type", type=str, choices=["AA"], help="Type of archive: AA" + ) + parser.add_argument( + "-df", "--default_fandom", type=str, help="Default fandom to use" + ) + parser.add_argument( + "-n", + "--archive_name", + type=str, + help="Name of the original archive (used in the temporary site)", + ) + + # Database settings + parser.add_argument( + "-i", + "--db_input_file", + type=str, + help="Path to input file (ARCHIVE_DB.pl for AA)", + ) + parser.add_argument( + "-o", "--output_folder", type=str, help="Path for output files" + ) + parser.add_argument( + "-od", + "--output_database", + type=str, + help='Name of the database the final tables should be created in (default "od_sgf")', + ) + + # Tag settings + parser.add_argument( + "-ft", + "--tag_fields", + type=str, + help="List of tag field(s) in original db (comma-delimited)", + ) + parser.add_argument( + "-fc", + "--character_fields", + type=str, + help="List of character field(s) in original db (comma-delimited)", + ) + parser.add_argument( + "-fr", + "--relationship_fields", + type=str, + help="List of relationship field(s) in original db (comma-delimited)", + ) + parser.add_argument( + "-ff", + "--fandom_fields", + type=str, + help="List of fandom field(s) in original db (comma-delimited)", + ) + parser.add_argument( + "-wf", + "--fields_with_fandom", + type=str, + help="List of output tag fields where the fandom should be listed too (comma-delimited)", + ) + + # Wrangling and search processing + parser.add_argument( + "-t", "--tag_input_file", type=str, help="Path to tag renaming input CSV" + ) + parser.add_argument( + "-si", + "--story_ids_to_remove", + type=str, + help="Location of the text file containing the story ids to remove", + ) + parser.add_argument( + "-bi", + "--bookmark_ids_to_remove", + type=str, + help="Location of the text file containing the bookmark ids to remove", + ) + + # Chapters + parser.add_argument( + "-cp", + "--chapters_path", + type=str, + help="Location of the text files containing the stories", + ) + parser.add_argument( + "-cf", + "--chapters_file_extensions", + type=str, + help='File extension(s) of the text files containing the stories (eg: "txt, html")', + ) + + args = parser.parse_args() + if args.properties_file is not None and os.path.isfile(args.properties_file): + props = self._load_args_from_file(args.properties_file) + for k, v in props.items(): + if v == "": + setattr(args, k, None) + else: + setattr(args, k, v) + + for arg_name in argdict.keys(): + if getattr(args, arg_name) is None: + setattr(args, arg_name, input(argdict[arg_name] + ": ")) + + args.archive_name = ( + input("Name of the original archive (used in export file names): ") + if args.archive_name is None + else args.archive_name + ) + + while args.archive_type is None or args.archive_type not in ["AA"]: + args.archive_type = input("Type of archive (AA): ") + + return args + + def _print_args(self, args): + self.log.info("----------- Open Door Archive Import Parameters --------------") + for arg in vars(args): + self.log.info("{0} = {1}".format(arg, getattr(args, arg))) + self.log.info("--------------------------------------------------------------") + + def args_for_01(self): + while self.args.db_input_file is None or not os.path.isfile( + self.args.db_input_file + ): + self.args.db_input_file = input( + "Path to the input file (ARCHIVE_DB.pl for AA): " + ) + self._print_args(self.args) + return self.args + + def args_for_02(self): + self._print_args(self.args) + return self.args + + def args_for_03(self): + if not os.path.exists(self.args.output_folder): + os.makedirs(self.args.output_folder) + while self.args.output_folder is None or not os.path.isdir( + self.args.output_folder + ): + self.args.output_folder = input("Path for output files: ") + if not os.path.exists(self.args.output_folder): + os.makedirs(self.args.output_folder) + self._print_args(self.args) + return self.args + + def args_for_04(self): + while self.args.tag_input_file is None or not os.path.isfile( + self.args.tag_input_file + ): + self.args.tag_input_file = input("Path to tag renaming csv file: ") + self._print_args(self.args) + return self.args + + def args_for_05(self): + if self.args.output_database is None: + self.args.output_database = input( + 'Name of the database the final tables should be created in (default "od_sgf"):' + ) + self.args.output_database = ( + "od_sgf" + if self.args.output_database == "" + else self.args.output_database + ) + if self.args.story_ids_to_remove is None: + self.args.story_ids_to_remove = input( + "Location of the text file containing the story ids to remove:" + ) + self._print_args(self.args) + return self.args + + def args_for_06(self): + if self.args.output_database is None: + self.args.output_database = input( + 'Name of the database the final tables should be created in (default "od_sgf"):' + ) + self.args.output_database = ( + "od_sgf" if self.args.output_database == "" else self.args.output_database + ) + if self.args.default_fandom is None: + self.args.default_fandom = input("Default fandom:") + self.args.default_fandom = ( + "" if self.args.default_fandom is None else self.args.default_fandom + ) + self._print_args(self.args) + return self.args + + def args_for_07(self): + if self.args.output_database is None: + self.args.output_database = input( + 'Name of the database the final tables should be created in (default "od_sgf"):' + ) + self.args.output_database = ( + "od_sgf" + if self.args.output_database == "" + else self.args.output_database + ) + if self.args.chapters_path is None: + self.args.chapters_path = input( + "Location of the text files containing the stories:" + ) + if ( + self.args.chapters_path is not None + and self.args.chapters_file_extensions is None + ): + self.args.chapters_file_extensions = input( + 'File extension(s) of the text files containing the stories (eg: "txt, html"):' + ) + self._print_args(self.args) + return self.args diff --git a/shared_python/Chapters.py b/shared_python/Chapters.py index 99856eb..7bd44f6 100755 --- a/shared_python/Chapters.py +++ b/shared_python/Chapters.py @@ -8,117 +8,153 @@ from shared_python import Common + # TODO this code is no longer needed for eFiction and will need to be reviewed for other archive types class Chapters(object): - - def __init__(self, args, sql, log): - self.args = args - self.sql = sql - self.log = log - - def _ends_with(self, filename, extensions): - return any(filename.endswith(ext) for ext in extensions) - - def _gather_and_dedupe(self, chapters_path, extensions, has_ids=False): - self.log.info("\nFinding chapters and identifying duplicates") - extensions = re.split(r", ?", extensions) - story_folder = os.walk(chapters_path) - file_paths = {} - duplicate_chapters = {} - has_duplicates = False - messages = [] - sql_messages = [] - cur = 0 - - for root, _, filenames in story_folder: - total = len(filenames) - Common.print_progress(cur, total) - - for filename in filenames: - if has_ids and self._ends_with(filename, extensions): - file_path = os.path.join(root, filename) - cid = os.path.splitext(filename)[0] - if cid not in file_paths.keys(): - file_paths[cid] = file_path - else: - duplicate_folder = os.path.split(os.path.split(file_path)[0])[1] - messages.append(file_path + " is a duplicate of " + file_paths[cid]) - sql_messages.append("SELECT * FROM chapters WHERE id = {0}".format(cid)) - duplicate_chapters[cid] = [ - {'folder_name': os.path.split(os.path.split(file_paths[cid])[0])[1], 'filename': filename, - 'path': file_paths[cid]}, - {'folder_name': duplicate_folder, 'filename': filename, 'path': file_path} - ] - has_duplicates = True + def __init__(self, args, sql, log): + self.args = args + self.sql = sql + self.log = log + + def _ends_with(self, filename, extensions): + return any(filename.endswith(ext) for ext in extensions) + + def _gather_and_dedupe(self, chapters_path, extensions, has_ids=False): + self.log.info("\nFinding chapters and identifying duplicates") + extensions = re.split(r", ?", extensions) + story_folder = os.walk(chapters_path) + file_paths = {} + duplicate_chapters = {} + has_duplicates = False + messages = [] + sql_messages = [] + cur = 0 + + for root, _, filenames in story_folder: + total = len(filenames) + Common.print_progress(cur, total) + + for filename in filenames: + if has_ids and self._ends_with(filename, extensions): + file_path = os.path.join(root, filename) + cid = os.path.splitext(filename)[0] + if cid not in file_paths.keys(): + file_paths[cid] = file_path + else: + duplicate_folder = os.path.split(os.path.split(file_path)[0])[1] + messages.append( + file_path + " is a duplicate of " + file_paths[cid] + ) + sql_messages.append( + "SELECT * FROM chapters WHERE id = {0}".format(cid) + ) + duplicate_chapters[cid] = [ + { + "folder_name": os.path.split( + os.path.split(file_paths[cid])[0] + )[1], + "filename": filename, + "path": file_paths[cid], + }, + { + "folder_name": duplicate_folder, + "filename": filename, + "path": file_path, + }, + ] + has_duplicates = True + else: + file_path = os.path.join(root, filename) + name = os.path.splitext(filename)[0] + file_paths[name] = file_path + + if has_duplicates: + self.log.warn("\n".join(messages + sql_messages)) + self.log.warn(duplicate_chapters) + folder_name_type = raw_input( + "Resolving duplicates: pick the type of the folder name under {0} " + "\n1 = author id\n2 = author name\n3 = skip duplicates check\n".format( + chapters_path + ) + ) + if folder_name_type == "1": + for cid, duplicate in duplicate_chapters.items(): + # look up the author id and add that one to the file_names list + sql_author_id = self.sql.execute_and_fetchall( + "SELECT author_id FROM chapters WHERE id = {0}".format(cid) + ) + if len(sql_author_id) > 0: + author_id = sql_author_id[0][0] + file_paths[cid] = [ + dc["path"] + for dc in duplicate_chapters[cid] + if dc["folder_name"] == str(author_id) + ][0] + elif folder_name_type == "2": + self.log.warn("Not implemented") + + return file_paths + + # TODO this is no longer needed to load eFiction chapters - see if it's still useful for other archive types + def populate_chapters(self, folder=None, extensions=None): + if folder is None: + folder = self.args.chapters_path + if extensions is None: + extensions = self.args.chapters_file_extensions + + self.log.info("Processing chapters...") + + filenames_are_ids = raw_input("\nChapter file names are chapter ids? Y/N\n") + has_ids = True if str.lower(filenames_are_ids) == "y" else False + file_paths = self._gather_and_dedupe(folder, extensions, has_ids) + + char_encoding = raw_input( + "\n\nImporting chapters: pick character encoding (check for curly quotes):\n" + "1 = Windows 1252\nenter = UTF-8\n" + ) + + if char_encoding == "1": + char_encoding = "cp1252" + else: + char_encoding = "utf8" + + cur = 0 + total = len(file_paths) + + if has_ids: + for cid, chapter_path in file_paths.items(): + with codecs.open(chapter_path, "r", encoding=char_encoding) as c: + try: + cur = Common.print_progress(cur, total) + file_contents = c.read() + query = "UPDATE {0}.chapters SET text=%s WHERE id=%s".format( + self.args.output_database + ) + self.sql.execute(query, (file_contents, int(cid))) + except Exception as e: + self.log.error( + "Error = chapter id: {0} - chapter: {1}\n{2}".format( + cid, chapter_path, str(e) + ) + ) + finally: + pass else: - file_path = os.path.join(root, filename) - name = os.path.splitext(filename)[0] - file_paths[name] = file_path - - if has_duplicates: - self.log.warn('\n'.join(messages + sql_messages)) - self.log.warn(duplicate_chapters) - folder_name_type = raw_input("Resolving duplicates: pick the type of the folder name under {0} " - "\n1 = author id\n2 = author name\n3 = skip duplicates check\n" - .format(chapters_path)) - if folder_name_type == '1': - for cid, duplicate in duplicate_chapters.items(): - # look up the author id and add that one to the file_names list - sql_author_id = self.sql.execute_and_fetchall("SELECT author_id FROM chapters WHERE id = {0}".format(cid)) - if len(sql_author_id) > 0: - author_id = sql_author_id[0][0] - file_paths[cid] = [dc['path'] for dc in duplicate_chapters[cid] if dc['folder_name'] == str(author_id)][0] - elif folder_name_type == '2': - self.log.warn("Not implemented") - - return file_paths - - # TODO this is no longer needed to load eFiction chapters - see if it's still useful for other archive types - def populate_chapters(self, folder = None, extensions = None): - if folder is None: - folder = self.args.chapters_path - if extensions is None: - extensions = self.args.chapters_file_extensions - - self.log.info("Processing chapters...") - - filenames_are_ids = raw_input("\nChapter file names are chapter ids? Y/N\n") - has_ids = True if str.lower(filenames_are_ids) == 'y' else False - file_paths = self._gather_and_dedupe(folder, extensions, has_ids) - - char_encoding = raw_input("\n\nImporting chapters: pick character encoding (check for curly quotes):\n" - "1 = Windows 1252\nenter = UTF-8\n") - - if char_encoding == '1': - char_encoding = 'cp1252' - else: - char_encoding = 'utf8' - - cur = 0 - total = len(file_paths) - - if has_ids: - for cid, chapter_path in file_paths.items(): - with codecs.open(chapter_path, 'r', encoding=char_encoding) as c: - try: - cur = Common.print_progress(cur, total) - file_contents = c.read() - query = "UPDATE {0}.chapters SET text=%s WHERE id=%s".format(self.args.output_database) - self.sql.execute(query, (file_contents, int(cid))) - except Exception as e: - self.log.error("Error = chapter id: {0} - chapter: {1}\n{2}".format(cid, chapter_path, str(e))) - finally: - pass - else: - for _, chapter_path in file_paths.items(): - path = chapter_path.replace(self.args.chapters_path, '')[1:] - with codecs.open(chapter_path, 'r', encoding=char_encoding) as c: - try: - cur = Common.print_progress(cur, total) - file_contents = c.read() - query = "UPDATE {0}.chapters SET text=%s WHERE url=%s and text=''".format(self.args.output_database) - self.sql.execute(query, (file_contents, path)) - except Exception as e: - self.log.error("Error = chapter id: {0} - chapter: {1}\n{2}".format(path, chapter_path, str(e))) - finally: - pass + for _, chapter_path in file_paths.items(): + path = chapter_path.replace(self.args.chapters_path, "")[1:] + with codecs.open(chapter_path, "r", encoding=char_encoding) as c: + try: + cur = Common.print_progress(cur, total) + file_contents = c.read() + query = "UPDATE {0}.chapters SET text=%s WHERE url=%s and text=''".format( + self.args.output_database + ) + self.sql.execute(query, (file_contents, path)) + except Exception as e: + self.log.error( + "Error = chapter id: {0} - chapter: {1}\n{2}".format( + path, chapter_path, str(e) + ) + ) + finally: + pass diff --git a/shared_python/Common.py b/shared_python/Common.py index 2f1bf53..1214162 100755 --- a/shared_python/Common.py +++ b/shared_python/Common.py @@ -3,11 +3,13 @@ from importlib import reload reload(sys) -#sys.setdefaultencoding('utf8') #setdefaultencoding is disabled in Python 3. UTF-8 is also default coding. - -def print_progress(cur, total, prog_type = "stories"): - cur += 1 - import sys - sys.stdout.write('\r{0}/{1} {2}'.format(cur, total, prog_type)) - sys.stdout.flush() - return cur +# sys.setdefaultencoding('utf8') #setdefaultencoding is disabled in Python 3. UTF-8 is also default coding. + + +def print_progress(cur, total, prog_type="stories"): + cur += 1 + import sys + + sys.stdout.write("\r{0}/{1} {2}".format(cur, total, prog_type)) + sys.stdout.flush() + return cur diff --git a/shared_python/FinalTables.py b/shared_python/FinalTables.py index 15ab4db..3d9b0e6 100755 --- a/shared_python/FinalTables.py +++ b/shared_python/FinalTables.py @@ -5,7 +5,6 @@ class FinalTables(object): - def __init__(self, args, sql: Sql, log: Logger): self.args = args self.sql = sql @@ -13,18 +12,26 @@ def __init__(self, args, sql: Sql, log: Logger): self.final_database = args.output_database self.log = log - def original_table(self, table_name, filter='', database_name=None): + def original_table(self, table_name, filter="", database_name=None): if table_name is None: return None if database_name is None: original_database = self.original_database else: original_database = database_name - query = "SELECT * FROM `{0}`.`{1}` {2}".format(original_database, table_name, filter) + query = "SELECT * FROM `{0}`.`{1}` {2}".format( + original_database, table_name, filter + ) return self.sql.execute_dict(query) def _escape_unescape(self, item): - return html.unescape(item).replace('\\', '\\\\').replace('"', '\\"').replace("'", "\\'").replace("%", "%%") + return ( + html.unescape(item) + .replace("\\", "\\\\") + .replace('"', '\\"') + .replace("'", "\\'") + .replace("%", "%%") + ) def _value(self, row): value = [] @@ -34,8 +41,8 @@ def _value(self, row): elif type(item) is datetime.datetime: value.append('"' + str(item) + '"') elif item is None: - value.append('null') - elif item == '': + value.append("null") + elif item == "": value.append('""') else: value.append(str(item)) @@ -46,54 +53,73 @@ def insert_into_final(self, output_table_name, rows, target_database=None): final_database = target_database else: final_database = self.final_database - self.sql.execute("TRUNCATE `{0}`.`{1}`".format(final_database, output_table_name)) + self.sql.execute( + "TRUNCATE `{0}`.`{1}`".format(final_database, output_table_name) + ) columns = rows[0].keys() values = [] for row in rows: col = self._value(row.values()) - values.append('(' + ', '.join(col) + ')') + values.append("(" + ", ".join(col) + ")") - self.sql.execute(f""" + self.sql.execute( + f""" INSERT INTO `{final_database}`.`{output_table_name}` ({', '.join(columns)}) VALUES {', '.join(values)} - """) + """ + ) def populate_story_tags(self, story_id, output_table_name, story_tags): cols_with_tags = [] - for (col, tags) in story_tags.items(): - cols_with_tags.append(u"{0}='{1}'".format(col, tags.replace("'", "\\'").strip())) + for col, tags in story_tags.items(): + cols_with_tags.append( + "{0}='{1}'".format(col, tags.replace("'", "\\'").strip()) + ) if cols_with_tags: - self.sql.execute(""" + self.sql.execute( + """ UPDATE `{0}`.`{1}` SET {2} WHERE id={3} - """.format(self.final_database, output_table_name, ", ".join(cols_with_tags), story_id)) + """.format( + self.final_database, + output_table_name, + ", ".join(cols_with_tags), + story_id, + ) + ) def story_to_final_without_tags(self, story, story_authors, is_story=True): - type = 'story' if is_story else 'story_link' + type = "story" if is_story else "story_link" authors_count = len(story_authors) - notes = story['notes'] + notes = story["notes"] if authors_count > 2: # AO3 works can't currently be imported with more than two authors - self.log.warning(f"{type} {story['id']} has {authors_count} authors - listing all authors in notes...") - story_authors_ids = [str(x['author_id']) for x in story_authors] - author_names = "Creators: {} and {}".format(", ".join(story_authors_ids[:-1]), story_authors_ids[-1]) + self.log.warning( + f"{type} {story['id']} has {authors_count} authors - listing all authors in notes..." + ) + story_authors_ids = [str(x["author_id"]) for x in story_authors] + author_names = "Creators: {} and {}".format( + ", ".join(story_authors_ids[:-1]), story_authors_ids[-1] + ) notes = "{author_names}

{notes}" if notes else author_names final_story = { - 'id': story['id'], - 'title': story['title'], - 'summary ': story['summary'], - 'notes': notes, - 'author_id': story_authors[0]["author_id"], - 'date': story['date'], - 'updated': story['updated'], - 'url': story['url'], - 'ao3_url': story['ao3_url'], - 'imported': 0, - 'do_not_import': 0, + "id": story["id"], + "title": story["title"], + "summary ": story["summary"], + "notes": notes, + "author_id": story_authors[0]["author_id"], + "date": story["date"], + "updated": story["updated"], + "url": story["url"], + "ao3_url": story["ao3_url"], + "imported": 0, + "do_not_import": 0, } if is_story: # AO3 bookmarks can't currently be imported with multiple authors, so only populate the coauthor for works - final_story['coauthor_id'] = story_authors[1]["author_id"] if authors_count > 1 else None + final_story["coauthor_id"] = ( + story_authors[1]["author_id"] if authors_count > 1 else None + ) return final_story def dummy_chapters(self, stories): @@ -102,13 +128,13 @@ def dummy_chapters(self, stories): def _dummy_chapter(self, story): chapter = {k.lower(): v for k, v in story.items()} final_chapter = { - 'id': chapter['id'], - 'position': chapter.get('position', 1), - 'title': chapter['title'], - 'text': chapter.get('text', ''), - 'date': chapter['date'], - 'story_id': chapter['id'], - 'notes': chapter['notes'], - 'url': chapter['url'] + "id": chapter["id"], + "position": chapter.get("position", 1), + "title": chapter["title"], + "text": chapter.get("text", ""), + "date": chapter["date"], + "story_id": chapter["id"], + "notes": chapter["notes"], + "url": chapter["url"], } return final_chapter diff --git a/shared_python/Logging.py b/shared_python/Logging.py index f43d79d..6bad80e 100644 --- a/shared_python/Logging.py +++ b/shared_python/Logging.py @@ -2,18 +2,19 @@ import sys from colorlog import ColoredFormatter + def logger(filename): - log = logging.getLogger() - log.setLevel(logging.INFO) + log = logging.getLogger() + log.setLevel(logging.INFO) - color_formatter = ColoredFormatter('%(log_color)s%(message)s%(reset)s') - stream = logging.StreamHandler(sys.stdout) - stream.setLevel(logging.INFO) - stream.setFormatter(color_formatter) - log.addHandler(stream) + color_formatter = ColoredFormatter("%(log_color)s%(message)s%(reset)s") + stream = logging.StreamHandler(sys.stdout) + stream.setLevel(logging.INFO) + stream.setFormatter(color_formatter) + log.addHandler(stream) - formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s') - fh = logging.FileHandler("{0}.log".format(filename)) - fh.setFormatter(formatter) - log.addHandler(fh) - return log + formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") + fh = logging.FileHandler("{0}.log".format(filename)) + fh.setFormatter(formatter) + log.addHandler(fh) + return log diff --git a/shared_python/PopulateTags.py b/shared_python/PopulateTags.py index 8cf1157..f69c012 100644 --- a/shared_python/PopulateTags.py +++ b/shared_python/PopulateTags.py @@ -2,64 +2,73 @@ CNTW = "Choose Not To Use Archive Warnings" -class PopulateTags(object): - def __init__(self, args, db, log, tags, final): - self.args = args - self.db = db - self.database = args.temp_db_database - self.log = log - self.tags = tags - self.final = final - @staticmethod - def valid_tags(key, tag_type_list): - return [d[key].strip() for d in tag_type_list - if key in d - and d[key] is not None - and d[key] != ''] +class PopulateTags(object): + def __init__(self, args, db, log, tags, final): + self.args = args + self.db = db + self.database = args.temp_db_database + self.log = log + self.tags = tags + self.final = final - def tags_for_story(self, story_id, tags_by_type): - story_tags = {'categories': '', 'fandoms': ''} - categories = [] - fandoms = [] - for (tag_type, tag_type_tags) in tags_by_type.items(): - if tag_type is None or tag_type == '': - self.log.warn("\nStory {2} has a None tag type\n {0} -> {1}".format(tag_type, tag_type_tags, story_id)) - else: - tag_list = [d['ao3_tag'] for d in tag_type_tags if 'ao3_tag' in d and d['ao3_tag'] is not None] - categories += self.valid_tags('ao3_tag_category', tag_type_tags) - if tag_type == 'fandoms': - fandoms += tag_list - if tag_type == 'warnings' and CNTW not in tag_list: - tag_list.append(CNTW) - story_tags[tag_type] = ', '.join(set(tag_list)) - if not fandoms: - fandoms = [self.args.default_fandom] - story_tags['categories'] = ', '.join(set(categories)) - story_tags['fandoms'] = ', '.join(set(fandoms)) - if 'warnings' not in story_tags.keys(): - story_tags['warnings'] = CNTW - return story_tags + @staticmethod + def valid_tags(key, tag_type_list): + return [ + d[key].strip() + for d in tag_type_list + if key in d and d[key] is not None and d[key] != "" + ] - def write_tags_for_story(self, tags_by_story_id, item_type='story'): - output_table = 'stories' if item_type == 'story' else 'story_links' - for (story_id, story_tags) in tags_by_story_id.items(): + def tags_for_story(self, story_id, tags_by_type): + story_tags = {"categories": "", "fandoms": ""} + categories = [] + fandoms = [] + for tag_type, tag_type_tags in tags_by_type.items(): + if tag_type is None or tag_type == "": + self.log.warn( + "\nStory {2} has a None tag type\n {0} -> {1}".format( + tag_type, tag_type_tags, story_id + ) + ) + else: + tag_list = [ + d["ao3_tag"] + for d in tag_type_tags + if "ao3_tag" in d and d["ao3_tag"] is not None + ] + categories += self.valid_tags("ao3_tag_category", tag_type_tags) + if tag_type == "fandoms": + fandoms += tag_list + if tag_type == "warnings" and CNTW not in tag_list: + tag_list.append(CNTW) + story_tags[tag_type] = ", ".join(set(tag_list)) + if not fandoms: + fandoms = [self.args.default_fandom] + story_tags["categories"] = ", ".join(set(categories)) + story_tags["fandoms"] = ", ".join(set(fandoms)) + if "warnings" not in story_tags.keys(): + story_tags["warnings"] = CNTW + return story_tags - # group tags by type into comma-separated lists - # generate and run SQL to populate story table - tags_by_type = defaultdict(list) - for tag in story_tags: - tags_by_type[tag['ao3_tag_type']].append(tag) + def write_tags_for_story(self, tags_by_story_id, item_type="story"): + output_table = "stories" if item_type == "story" else "story_links" + for story_id, story_tags in tags_by_story_id.items(): + # group tags by type into comma-separated lists + # generate and run SQL to populate story table + tags_by_type = defaultdict(list) + for tag in story_tags: + tags_by_type[tag["ao3_tag_type"]].append(tag) - story_tags = self.tags_for_story(story_id, tags_by_type) + story_tags = self.tags_for_story(story_id, tags_by_type) - self.final.populate_story_tags(story_id, output_table, story_tags) + self.final.populate_story_tags(story_id, output_table, story_tags) - def populate_tags(self): - self.log.info("Getting all tags per story or story_link...") - # Story - tags_by_story_id = self.tags.tags_by_story_id("story") - self.write_tags_for_story(tags_by_story_id, "story") - # Story Link - tags_by_story_id = self.tags.tags_by_story_id("story_link") - self.write_tags_for_story(tags_by_story_id, "story_link") + def populate_tags(self): + self.log.info("Getting all tags per story or story_link...") + # Story + tags_by_story_id = self.tags.tags_by_story_id("story") + self.write_tags_for_story(tags_by_story_id, "story") + # Story Link + tags_by_story_id = self.tags.tags_by_story_id("story_link") + self.write_tags_for_story(tags_by_story_id, "story_link") diff --git a/shared_python/Sql.py b/shared_python/Sql.py index cc341ce..1fef285 100755 --- a/shared_python/Sql.py +++ b/shared_python/Sql.py @@ -4,86 +4,97 @@ # ignore unhelpful MySQL warnings from pymysql import connect, cursors, OperationalError -warnings.filterwarnings('ignore', category=Warning) +warnings.filterwarnings("ignore", category=Warning) -class Sql(object): - - def __init__(self, args, log): - self.tag_count = 0 - conn = connect(args.db_host, args.db_user, args.db_password) - cursor = conn.cursor() - cursor.execute('CREATE DATABASE IF NOT EXISTS `{0}`'.format(args.temp_db_database)) - self.log = log - - self.conn = connect(args.db_host, args.db_user, args.db_password, args.temp_db_database, charset='utf8', - use_unicode=True, autocommit=True) - self.cursor = self.conn.cursor() - self.database = args.temp_db_database - - - def execute(self, script, parameters = (), database = None): - cursor = self.conn.cursor() - cursor.execute(f"USE {database or self.database}") - cursor.execute(script, parameters) - self.conn.commit() - - - def execute_dict(self, script, parameters = ()): - dict_cursor = self.conn.cursor(cursors.DictCursor) - dict_cursor.execute(script, parameters) - return dict_cursor.fetchall() - - def execute_and_fetchall(self, database: str, statement: str): - """ - Execute a SQL statement and then fetch its results. - :param database: The database to run the statement against. - :param statement: The SQL statement to execute. - :return: The fetched result of the SQL statement as a dict. - """ - cursor = self.conn.cursor() - cursor.execute(f"USE {database}") - cursor.execute(statement) - self.conn.commit() - return cursor.fetchall() - def run_script_from_file(self, filename, database, initial_load = False): - # Open and read the file as a single buffer - fd = open(filename, 'r') - sqlFile = fd.read() - fd.close() - - # replace placeholders and return all SQL commands (split on ';') - sqlCommands = sqlFile.replace('$DATABASE$', database).split(';\n') - - # Start a transaction - self.cursor.execute("START TRANSACTION") - self.cursor.execute("CREATE DATABASE IF NOT EXISTS {0}".format(database)) - self.cursor.execute("USE {0}".format(database)) - - # Execute every command from the input file - for command in sqlCommands: - # This will skip and report errors - # For example, if the tables do not yet exist, this will skip over - # the DROP TABLE commands - try: - # Strip out commented out lines - end_command = re.sub(r'(--|#|\/\*).*?\n', '', command) - lc_command = end_command.lower().strip().replace("\n", "") - if initial_load and (lc_command.startswith("create database ") or lc_command.startswith("use ")): - self.log.info("Skipping command - {0}".format(lc_command)) - elif lc_command is None or lc_command == '': - self.log.info(lc_command) - else: - self.cursor.execute(lc_command) - except OperationalError as e: - self.log.info("Command skipped: {0} [{1}]".format(command, e)) - - self.conn.commit() - - - def col_exists(self, col, table, database): - self.cursor.execute(""" +class Sql(object): + def __init__(self, args, log): + self.tag_count = 0 + conn = connect(args.db_host, args.db_user, args.db_password) + cursor = conn.cursor() + cursor.execute( + "CREATE DATABASE IF NOT EXISTS `{0}`".format(args.temp_db_database) + ) + self.log = log + + self.conn = connect( + args.db_host, + args.db_user, + args.db_password, + args.temp_db_database, + charset="utf8", + use_unicode=True, + autocommit=True, + ) + self.cursor = self.conn.cursor() + self.database = args.temp_db_database + + def execute(self, script, parameters=(), database=None): + cursor = self.conn.cursor() + cursor.execute(f"USE {database or self.database}") + cursor.execute(script, parameters) + self.conn.commit() + + def execute_dict(self, script, parameters=()): + dict_cursor = self.conn.cursor(cursors.DictCursor) + dict_cursor.execute(script, parameters) + return dict_cursor.fetchall() + + def execute_and_fetchall(self, database: str, statement: str): + """ + Execute a SQL statement and then fetch its results. + :param database: The database to run the statement against. + :param statement: The SQL statement to execute. + :return: The fetched result of the SQL statement as a dict. + """ + cursor = self.conn.cursor() + cursor.execute(f"USE {database}") + cursor.execute(statement) + self.conn.commit() + return cursor.fetchall() + + def run_script_from_file(self, filename, database, initial_load=False): + # Open and read the file as a single buffer + fd = open(filename, "r") + sqlFile = fd.read() + fd.close() + + # replace placeholders and return all SQL commands (split on ';') + sqlCommands = sqlFile.replace("$DATABASE$", database).split(";\n") + + # Start a transaction + self.cursor.execute("START TRANSACTION") + self.cursor.execute("CREATE DATABASE IF NOT EXISTS {0}".format(database)) + self.cursor.execute("USE {0}".format(database)) + + # Execute every command from the input file + for command in sqlCommands: + # This will skip and report errors + # For example, if the tables do not yet exist, this will skip over + # the DROP TABLE commands + try: + # Strip out commented out lines + end_command = re.sub(r"(--|#|\/\*).*?\n", "", command) + lc_command = end_command.lower().strip().replace("\n", "") + if initial_load and ( + lc_command.startswith("create database ") + or lc_command.startswith("use ") + ): + self.log.info("Skipping command - {0}".format(lc_command)) + elif lc_command is None or lc_command == "": + self.log.info(lc_command) + else: + self.cursor.execute(lc_command) + except OperationalError as e: + self.log.info("Command skipped: {0} [{1}]".format(command, e)) + + self.conn.commit() + + def col_exists(self, col, table, database): + self.cursor.execute( + """ SELECT * FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = '{0}' AND TABLE_NAME = '{1}' AND COLUMN_NAME = '{2}' - """.format(database, table, col)) - result = self.cursor.fetchone() - return result is not None + """.format(database, table, col) + ) + result = self.cursor.fetchone() + return result is not None diff --git a/shared_python/TagValidator.py b/shared_python/TagValidator.py index 241972a..12b2ac2 100644 --- a/shared_python/TagValidator.py +++ b/shared_python/TagValidator.py @@ -1,4 +1,3 @@ - class TagValidator(object): def __init__(self, log): self.log = log @@ -12,14 +11,14 @@ def __init__(self, log): "fandoms": 4, "characters": 5, "relationships": 6, - "tags": 7 + "tags": 7, } dict_rating = { "Not Rated": 1, "General Audiences": 2, "Teen And Up Audiences": 3, "Mature": 4, - "Explicit": 5 + "Explicit": 5, } dict_warning = { "Choose Not To Use Archive Warnings": 1, @@ -27,16 +26,9 @@ def __init__(self, log): "Major Character Death": 3, "No Archive Warnings Apply": 4, "Rape/Non-Con": 5, - "Underage": 6 - } - dict_category = { - "Gen": 1, - "F/M": 2, - "M/M": 3, - "F/F": 4, - "Multi": 5, - "Other": 6 + "Underage": 6, } + dict_category = {"Gen": 1, "F/M": 2, "M/M": 3, "F/F": 4, "Multi": 5, "Other": 6} ## Functions for accessing dictionaries def identify_tag_type(self, tag_type): @@ -53,37 +45,50 @@ def identify_category(self, tag): ## Print statements def print_tag_correction(self, before, after): - self.log.info('Correction successful. "' + before + '" is now "' + after+ '"') + self.log.info('Correction successful. "' + before + '" is now "' + after + '"') def print_tag_warning(self, tag, tag_type, isType): if isType: - self.log.warning('Warning: "' + tag + '" is not a valid TAG TYPE.' - + ' Attempting self correction...') + self.log.warning( + 'Warning: "' + + tag + + '" is not a valid TAG TYPE.' + + " Attempting self correction..." + ) else: - self.log.warning('Warning: "' + tag + '" is not a valid ' - + tag_type.upper() + ' tag. Attempting self correction...') + self.log.warning( + 'Warning: "' + + tag + + '" is not a valid ' + + tag_type.upper() + + " tag. Attempting self correction..." + ) def print_fail_self(self): - self.log.error('All attempts at self correction have failed.' - + ' Manual correction required.') + self.log.error( + "All attempts at self correction have failed." + + " Manual correction required." + ) def print_fail(self, tag_name): if tag_name: - self.log.error('Manual Input Failed. "' + tag_name + '" has failed re-check.') + self.log.error( + 'Manual Input Failed. "' + tag_name + '" has failed re-check.' + ) else: - self.log.error('Manual Input Failed. This field cannot be empty.') + self.log.error("Manual Input Failed. This field cannot be empty.") def prompt_correction(self, tag_name, tag_type, isType): - print ('\rThe following values are accepted:') + print("\rThe following values are accepted:") if isType: - print('\r' + self.list_dicts(None, tag_type)) + print("\r" + self.list_dicts(None, tag_type)) else: - print('\r' + self.list_dicts(tag_name, tag_type)) + print("\r" + self.list_dicts(tag_name, tag_type)) prompt = '\rPlease enter the correct name for "' + tag_name + '"' if isType: prompt += ' (Press ENTER to default to "tags"): ' else: - prompt += ': ' + prompt += ": " tag_correction = input(prompt) if isType and not tag_correction: tag_correction = "tags" @@ -95,20 +100,20 @@ def correct_tag_type(self, tag_type): # Attempt self correction by making whole word lowercase tag_correction = tag_type.lower() if self.classify_tag(None, tag_correction) > 0: - self.print_tag_correction(tag_type, tag_correction) - return tag_correction + self.print_tag_correction(tag_type, tag_correction) + return tag_correction # Attempt self correction by adding last character if self.classify_tag(None, tag_correction + "s") > 0: - self.print_tag_correction(tag_type, tag_correction + "s") - return tag_correction + "s" + self.print_tag_correction(tag_type, tag_correction + "s") + return tag_correction + "s" # Attempt self correction by removing last character if self.classify_tag(None, tag_correction[:-1]) > 0: - self.print_tag_correction(tag_type, tag_correction[:-1]) - return tag_correction[:-1] + self.print_tag_correction(tag_type, tag_correction[:-1]) + return tag_correction[:-1] - #All attempts failed + # All attempts failed return None def correct_tag(self, tag, tag_type): @@ -145,39 +150,39 @@ def classify_tag(self, tag, tag_type): def list_dicts(self, tag, tag_type): if not tag: return str(list(self.dict_tag_type.keys())) - elif tag_type == 'rating': + elif tag_type == "rating": return str(list(self.dict_rating.keys())) - elif tag_type == 'categories': + elif tag_type == "categories": return str(list(self.dict_category.keys())) - elif tag_type == 'warnings': + elif tag_type == "warnings": return str(list(self.dict_warning.keys())) ## Validate methods to use. def validate_and_fix_tag_type(self, tag_type): if self.classify_tag(None, tag_type) < 1: - print() - self.print_tag_warning(tag_type, None, True) - - # Attempt self correction - tag_correction = self.correct_tag_type(tag_type) - if tag_correction: - return tag_correction - - # Prompt manual correction. - self.print_fail_self() - tag_correction = self.prompt_correction(tag_type, None, self.IS_TAGTYPE) - while self.classify_tag(None, tag_correction) < 1: - self.print_tag_warning(tag_correction, None, True) - # Attempt correction of failed manual correction - tag_selfcorrect = self.correct_tag_type(tag_correction) - if tag_selfcorrect: - self.print_tag_correction(tag_type, tag_selfcorrect) - return tag_selfcorrect - # Else prompt manual correction again - self.print_fail(tag_type) - tag_correction = self.prompt_correction(tag_type, None, self.IS_TAGTYPE) - self.print_tag_correction(tag_type, tag_correction) - return tag_correction + print() + self.print_tag_warning(tag_type, None, True) + + # Attempt self correction + tag_correction = self.correct_tag_type(tag_type) + if tag_correction: + return tag_correction + + # Prompt manual correction. + self.print_fail_self() + tag_correction = self.prompt_correction(tag_type, None, self.IS_TAGTYPE) + while self.classify_tag(None, tag_correction) < 1: + self.print_tag_warning(tag_correction, None, True) + # Attempt correction of failed manual correction + tag_selfcorrect = self.correct_tag_type(tag_correction) + if tag_selfcorrect: + self.print_tag_correction(tag_type, tag_selfcorrect) + return tag_selfcorrect + # Else prompt manual correction again + self.print_fail(tag_type) + tag_correction = self.prompt_correction(tag_type, None, self.IS_TAGTYPE) + self.print_tag_correction(tag_type, tag_correction) + return tag_correction return tag_type def validate_and_fix_tag(self, tag, tag_type): diff --git a/shared_python/Tags.py b/shared_python/Tags.py index 3d33d51..8878578 100755 --- a/shared_python/Tags.py +++ b/shared_python/Tags.py @@ -8,38 +8,38 @@ from shared_python.Sql import Sql from shared_python.TagValidator import TagValidator -class Tags(object): - def __init__(self, args, sql: Sql, log: Logger): - self.tag_count = 0 - self.sql = sql - self.database = args.temp_db_database - self.html_parser = HTMLParser() - self.log = log - - self.tag_export_map = { - 'id': 'Original Tag ID', - 'original_tag': 'Original Tag', - 'original_parent': 'Original Parent Tag', - 'original_table': 'Original Tag Type', - 'original_description': 'Original Tag Description', - 'ao3_tag': 'Recommended AO3 Tag', - 'ao3_tag_category': 'Recommended AO3 Category (for relationships)', - 'ao3_tag_type': 'Recommended AO3 Type', - 'ao3_tag_fandom': 'Related Fandom' - } - - - def create_tags_table(self, database = None): - """ - Used only in step 02 for non-eFiction archives - """ - try: - database = self.database if database is None else database - self.sql.execute("DROP TABLE IF EXISTS {0}.`tags`".format(database)) - except OperationalError as e: - self.log.info("Command skipped: {}".format(e)) - self.sql.execute(""" +class Tags(object): + def __init__(self, args, sql: Sql, log: Logger): + self.tag_count = 0 + self.sql = sql + self.database = args.temp_db_database + self.html_parser = HTMLParser() + self.log = log + + self.tag_export_map = { + "id": "Original Tag ID", + "original_tag": "Original Tag", + "original_parent": "Original Parent Tag", + "original_table": "Original Tag Type", + "original_description": "Original Tag Description", + "ao3_tag": "Recommended AO3 Tag", + "ao3_tag_category": "Recommended AO3 Category (for relationships)", + "ao3_tag_type": "Recommended AO3 Type", + "ao3_tag_fandom": "Related Fandom", + } + + def create_tags_table(self, database=None): + """ + Used only in step 02 for non-eFiction archives + """ + try: + database = self.database if database is None else database + self.sql.execute("DROP TABLE IF EXISTS {0}.`tags`".format(database)) + except OperationalError as e: + self.log.info("Command skipped: {}".format(e)) + self.sql.execute( + """ CREATE TABLE IF NOT EXISTS {0}.`tags` ( `id` int(11) AUTO_INCREMENT, `original_tagid` int(11) DEFAULT NULL, @@ -53,50 +53,76 @@ def create_tags_table(self, database = None): `ao3_tag_fandom` VARCHAR(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; - """.format(database)) - - - def populate_tag_table(self, database_name, story_id_col_name, table_name, tag_col_lookup, tags_with_fandoms, truncate = True): - """ - Used only in step 02 for non-eFiction archives - """ - self.sql.execute('USE {0}'.format(database_name)) - if truncate: - self.sql.execute('TRUNCATE {0}.`tags`'.format(database_name)) - - tag_columns = tag_col_lookup.keys() # [d['col'] for d in tag_col_lookup if 'col' in d] - - # Get all values from all tag columns in the stories table and load as denormalised values in `tags` table - data = self.sql.execute_dict('SELECT {0}, {1} FROM {2}'.format(story_id_col_name, ', '.join(tag_columns), table_name)) - - for story_tags_row in data : - values = [] - for col in tag_columns: - needs_fandom = col in tags_with_fandoms - if story_tags_row[col] is not None: - for val in re.split(r", ?", story_tags_row[col]): - if val != '': - if type(tag_col_lookup[col]) is str: # noqa: E721 # Probably AA or a custom archive - cleaned_tag = val.encode('utf-8').replace("'", "\'").strip() - - values.append('({0}, "{1}", "{2}", "{3}")' - .format(story_tags_row[story_id_col_name], - re.sub(r'(? 0: - self.sql.execute(""" + """.format(database) + ) + + def populate_tag_table( + self, + database_name, + story_id_col_name, + table_name, + tag_col_lookup, + tags_with_fandoms, + truncate=True, + ): + """ + Used only in step 02 for non-eFiction archives + """ + self.sql.execute("USE {0}".format(database_name)) + if truncate: + self.sql.execute("TRUNCATE {0}.`tags`".format(database_name)) + + tag_columns = ( + tag_col_lookup.keys() + ) # [d['col'] for d in tag_col_lookup if 'col' in d] + + # Get all values from all tag columns in the stories table and load as denormalised values in `tags` table + data = self.sql.execute_dict( + "SELECT {0}, {1} FROM {2}".format( + story_id_col_name, ", ".join(tag_columns), table_name + ) + ) + + for story_tags_row in data: + values = [] + for col in tag_columns: + needs_fandom = col in tags_with_fandoms + if story_tags_row[col] is not None: + for val in re.split(r", ?", story_tags_row[col]): + if val != "": + if ( + type(tag_col_lookup[col]) is str # noqa: E721 + ): # Probably AA or a custom archive + cleaned_tag = ( + val.encode("utf-8").replace("'", "'").strip() + ) + + values.append( + '({0}, "{1}", "{2}", "{3}")'.format( + story_tags_row[story_id_col_name], + re.sub(r'(? 0: + self.sql.execute( + """ INSERT INTO tags (storyid, original_tag, original_table, ao3_tag_fandom) VALUES {0} - """.format(', '.join(values))) - - - def distinct_tags(self, database): - """ - Used in step 03. Maps table columns to the names used in the Tag Wrangling sheet. - :return: distinct rows from the tags table with renamed columns - """ - return self.sql.execute_and_fetchall(database, """ + """.format(", ".join(values)) + ) + + def distinct_tags(self, database): + """ + Used in step 03. Maps table columns to the names used in the Tag Wrangling sheet. + :return: distinct rows from the tags table with renamed columns + """ + return self.sql.execute_and_fetchall( + database, + """ SELECT DISTINCT id as "Original Tag ID", original_tag as "Original Tag Name", @@ -108,90 +134,103 @@ def distinct_tags(self, database): ao3_tag_category as "Recommended AO3 Category", original_description as "Original Description", '' as "TW Notes" FROM tags - """) - - - def update_tag_row(self, row: dict): - """ - Used in step 04. - :param row: a row from the Tag Wrangling spreadsheet as a dict - :return: number of newly inserted rows to item_tags - """ - tag_headers = self.tag_export_map - tag = str(row[tag_headers['original_tag']]).replace("'", r"\'") - tag_id = row[tag_headers['id']] - validate_tag = TagValidator(self.log) - - if tag_id == '' or tag_id is None or not tag_id.isnumeric(): - tagid_filter = f"original_tag = '{tag}'" - else: - tagid_filter = f"id={tag_id}" - - fandom = row[tag_headers['ao3_tag_fandom']].replace("'", r"\'") - ao3_tags = row[tag_headers['ao3_tag']].replace("'", r"\'").split(",") - ao3_tag_types = row[tag_headers['ao3_tag_type']].split(",") - number_types = len(ao3_tag_types) - - num_insert = 0 - # If tags length > types length -> there are remapped tags - # Iterate over all the provided AO3 tags: - # - First tag -> update the existing row - # - Other tags -> create new row in tags table - for idx, ao3_tag in enumerate(ao3_tags): - ao3_tag = ao3_tag.strip() - if number_types >= idx + 1: - ao3_tag_type = ao3_tag_types[idx].strip() - else: - ao3_tag_type = ao3_tag_types[0].strip() - - ao3_tag_type = validate_tag.validate_and_fix_tag_type(ao3_tag_type) - ao3_tag = validate_tag.validate_and_fix_tag(ao3_tag, ao3_tag_type) - - self.sql.execute(f"USE {self.database}") - - if idx > 0: - self.sql.execute(f""" + """, + ) + + def update_tag_row(self, row: dict): + """ + Used in step 04. + :param row: a row from the Tag Wrangling spreadsheet as a dict + :return: number of newly inserted rows to item_tags + """ + tag_headers = self.tag_export_map + tag = str(row[tag_headers["original_tag"]]).replace("'", r"\'") + tag_id = row[tag_headers["id"]] + validate_tag = TagValidator(self.log) + + if tag_id == "" or tag_id is None or not tag_id.isnumeric(): + tagid_filter = f"original_tag = '{tag}'" + else: + tagid_filter = f"id={tag_id}" + + fandom = row[tag_headers["ao3_tag_fandom"]].replace("'", r"\'") + ao3_tags = row[tag_headers["ao3_tag"]].replace("'", r"\'").split(",") + ao3_tag_types = row[tag_headers["ao3_tag_type"]].split(",") + number_types = len(ao3_tag_types) + + num_insert = 0 + # If tags length > types length -> there are remapped tags + # Iterate over all the provided AO3 tags: + # - First tag -> update the existing row + # - Other tags -> create new row in tags table + for idx, ao3_tag in enumerate(ao3_tags): + ao3_tag = ao3_tag.strip() + if number_types >= idx + 1: + ao3_tag_type = ao3_tag_types[idx].strip() + else: + ao3_tag_type = ao3_tag_types[0].strip() + + ao3_tag_type = validate_tag.validate_and_fix_tag_type(ao3_tag_type) + ao3_tag = validate_tag.validate_and_fix_tag(ao3_tag, ao3_tag_type) + + self.sql.execute(f"USE {self.database}") + + if idx > 0: + self.sql.execute( + f""" INSERT INTO tags (ao3_tag, ao3_tag_type, ao3_tag_category, ao3_tag_fandom, original_tag, original_tagid) VALUES ('{ao3_tag}', '{ao3_tag_type}', '{row[tag_headers['ao3_tag_category']]}', '{fandom}', '{tag}', '{tag_id}') - """) - # get last auto increment tag id - sql_dict = self.sql.execute_dict("""select LAST_INSERT_ID();""") - new_tag_id = sql_dict[0]['LAST_INSERT_ID()'] - # get all associated items from item_tags - items = self.sql.execute_dict(f"""SELECT item_id, item_type - FROM item_tags WHERE tag_id = {row['Original Tag ID']}""") - - # insert into item_tags table - for item in items: - item_id, item_type = item['item_id'], item['item_type'] - self.sql.execute(f"""INSERT INTO item_tags (item_id, item_type, tag_id) VALUES ('{item_id}', '{item_type}', '{new_tag_id}')""") - num_insert += 1 - else: - self.sql.execute(f""" + """ + ) + # get last auto increment tag id + sql_dict = self.sql.execute_dict("""select LAST_INSERT_ID();""") + new_tag_id = sql_dict[0]["LAST_INSERT_ID()"] + # get all associated items from item_tags + items = self.sql.execute_dict( + f"""SELECT item_id, item_type + FROM item_tags WHERE tag_id = {row['Original Tag ID']}""" + ) + + # insert into item_tags table + for item in items: + item_id, item_type = item["item_id"], item["item_type"] + self.sql.execute( + f"""INSERT INTO item_tags (item_id, item_type, tag_id) VALUES ('{item_id}', '{item_type}', '{new_tag_id}')""" + ) + num_insert += 1 + else: + self.sql.execute( + f""" UPDATE tags SET ao3_tag='{str(ao3_tag)}', ao3_tag_type='{ao3_tag_type}', ao3_tag_category='{row[tag_headers['ao3_tag_category']]}', ao3_tag_fandom='{fandom}' WHERE {tagid_filter} - """) - return num_insert - - def tags_by_story_id(self, item_type: str = 'story'): - story_ids = \ - self.sql.execute_and_fetchall(self.database, - f""" + """ + ) + return num_insert + + def tags_by_story_id(self, item_type: str = "story"): + story_ids = self.sql.execute_and_fetchall( + self.database, + f""" SELECT item_id, item_type, GROUP_CONCAT(tag_id) as tag_ids - FROM item_tags WHERE item_type='{item_type}' GROUP BY item_id, item_type ;""") - cur = 0 - total = len(story_ids) - - tags_by_story_id = {} - for story_id in story_ids: - cur += 1 - sys.stdout.write(f'\rCollecting tags for {item_type}: {cur}/{total} (including Do Not Import)') - sys.stdout.flush() - tags = self.sql.execute_dict(f"SELECT * FROM tags WHERE id in ({story_id[2]}) AND ao3_tag != ''") - tags_by_story_id[story_id[0]] = tags - return tags_by_story_id + FROM item_tags WHERE item_type='{item_type}' GROUP BY item_id, item_type ;""", + ) + cur = 0 + total = len(story_ids) + + tags_by_story_id = {} + for story_id in story_ids: + cur += 1 + sys.stdout.write( + f"\rCollecting tags for {item_type}: {cur}/{total} (including Do Not Import)" + ) + sys.stdout.flush() + tags = self.sql.execute_dict( + f"SELECT * FROM tags WHERE id in ({story_id[2]}) AND ao3_tag != ''" + ) + tags_by_story_id[story_id[0]] = tags + return tags_by_story_id diff --git a/test/test_multiple_authors.py b/test/test_multiple_authors.py index b27e4ac..f2337b5 100644 --- a/test/test_multiple_authors.py +++ b/test/test_multiple_authors.py @@ -5,36 +5,59 @@ import argparse import datetime + def testArgs(): - args = argparse.Namespace() - setattr(args, "temp_db_database", "test_final_open_doors") - setattr(args, "output_database", "test_final_open_doors") - return args + args = argparse.Namespace() + setattr(args, "temp_db_database", "test_final_open_doors") + setattr(args, "output_database", "test_final_open_doors") + return args + class TestMultipleAuthors(TestCase): - args = testArgs() - log = logger("test") - sql = None - final_tables = FinalTables(args, sql, log) - - def test_multiple_authors(self): - - story = {'id': 1, 'title': 'A Story', - 'summary': "summary", - 'notes': '', - 'date': datetime.datetime(2022, 2, 27, 15, 48, 28), - 'updated': datetime.datetime(2022, 4, 15, 22, 4, 47), - 'categories': None, - 'tags': '', 'warnings': '', - 'fandoms': '', - 'characters': '', - 'relationships': '', 'language_code': '', - 'url': None, 'imported': 0, 'do_not_import': 0, 'ao3_url': None, 'import_notes': ''} - - story_authors = [{'id': 4114, 'author_id': 1, 'item_id': 1, 'item_type': 'story'}, {'id': 4115, 'author_id': 2, 'item_id': 1, 'item_type': 'story'}, {'id': 4116, 'author_id': 3, 'item_id': 1, 'item_type': 'story'}, {'id': 4117, 'author_id': 4, 'item_id': 1, 'item_type': 'story'}, {'id': 5, 'author_id': 5, 'item_id': 1, 'item_type': 'story'}, {'id': 4119, 'author_id': 6, 'item_id': 1, 'item_type': 'story'}, {'id': 4120, 'author_id': 7, 'item_id': 1, 'item_type': 'story'}, {'id': 4121, 'author_id': 8, 'item_id': 1, 'item_type': 'story'}, {'id': 4122, 'author_id': 9, 'item_id': 1, 'item_type': 'story'}] - - final_story = self.final_tables.story_to_final_without_tags(story, story_authors) - self.assertEqual(final_story['notes'], 'Creators: 1, 2, 3, 4, 5, 6, 7, 8 and 9') - -if __name__ == '__main__': - unittest.main() + args = testArgs() + log = logger("test") + sql = None + final_tables = FinalTables(args, sql, log) + + def test_multiple_authors(self): + story = { + "id": 1, + "title": "A Story", + "summary": "summary", + "notes": "", + "date": datetime.datetime(2022, 2, 27, 15, 48, 28), + "updated": datetime.datetime(2022, 4, 15, 22, 4, 47), + "categories": None, + "tags": "", + "warnings": "", + "fandoms": "", + "characters": "", + "relationships": "", + "language_code": "", + "url": None, + "imported": 0, + "do_not_import": 0, + "ao3_url": None, + "import_notes": "", + } + + story_authors = [ + {"id": 4114, "author_id": 1, "item_id": 1, "item_type": "story"}, + {"id": 4115, "author_id": 2, "item_id": 1, "item_type": "story"}, + {"id": 4116, "author_id": 3, "item_id": 1, "item_type": "story"}, + {"id": 4117, "author_id": 4, "item_id": 1, "item_type": "story"}, + {"id": 5, "author_id": 5, "item_id": 1, "item_type": "story"}, + {"id": 4119, "author_id": 6, "item_id": 1, "item_type": "story"}, + {"id": 4120, "author_id": 7, "item_id": 1, "item_type": "story"}, + {"id": 4121, "author_id": 8, "item_id": 1, "item_type": "story"}, + {"id": 4122, "author_id": 9, "item_id": 1, "item_type": "story"}, + ] + + final_story = self.final_tables.story_to_final_without_tags( + story, story_authors + ) + self.assertEqual(final_story["notes"], "Creators: 1, 2, 3, 4, 5, 6, 7, 8 and 9") + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_multitag_mapping.py b/test/test_multitag_mapping.py index 32e4234..51fa29d 100644 --- a/test/test_multitag_mapping.py +++ b/test/test_multitag_mapping.py @@ -5,57 +5,59 @@ from shared_python.Tags import Tags import argparse + def testArgs(): - parser = argparse.ArgumentParser(description='Test an archive database') - args = parser.parse_args() - setattr(args, "archive_type", "AA") - setattr(args, "db_host", "localhost") - setattr(args, "db_user", "root") - setattr(args, "db_password", "test") - setattr(args, "temp_db_database", "test_working_open_doors") - setattr(args, "output_database", "unit_test_output") - setattr(args, "default_fandom", "Fandom C (TV)") - setattr(args, "sql_path", "./test/test_data/test.sql") - return args + parser = argparse.ArgumentParser(description="Test an archive database") + args = parser.parse_args() + setattr(args, "archive_type", "AA") + setattr(args, "db_host", "localhost") + setattr(args, "db_user", "root") + setattr(args, "db_password", "test") + setattr(args, "temp_db_database", "test_working_open_doors") + setattr(args, "output_database", "unit_test_output") + setattr(args, "default_fandom", "Fandom C (TV)") + setattr(args, "sql_path", "./test/test_data/test.sql") + return args + class TestMultiTagMapping(TestCase): - args = testArgs() - log = logger("test") - sql = Sql(args, log) - sql.run_script_from_file(args.sql_path, args.temp_db_database, initial_load=False) - tags = Tags(args, sql, log) - - def test_multi_tag_mapping(self): - - row = { - 'Original Tag ID': '10', - 'Original Tag': 'original-tag-1', - 'Original Tag Type': 'classes', - 'Original Parent Tag': 'Genres', - 'Related Fandom': 'Fandom-1', - 'Recommended AO3 Tag': 'AO3-tag-1, AO3-tag-2, AO3-tag-3', - 'Recommended AO3 Type': 'characters, tags, tags', - 'Recommended AO3 Category (for relationships)': 'M/M, F/M', - 'Original Tag Description': '', - 'TW Notes': '' - } - num_insert = self.tags.update_tag_row(row) - self.assertEqual(num_insert, 4) - - row = { - 'Original Tag ID': '11', - 'Original Tag': 'original-tag-2', - 'Original Tag Type': 'classes', - 'Original Parent Tag': 'Genres', - 'Related Fandom': 'Fandom-2', - 'Recommended AO3 Tag': 'AO3-tag-2, AO3-tag-4', - 'Recommended AO3 Type': 'tags, fandoms', - 'Recommended AO3 Category (for relationships)': 'M/M, F/M', - 'Original Tag Description': '', - 'TW Notes': '' - } - num_insert = self.tags.update_tag_row(row) - self.assertEqual(num_insert, 3) - -if __name__ == '__main__': - unittest.main() \ No newline at end of file + args = testArgs() + log = logger("test") + sql = Sql(args, log) + sql.run_script_from_file(args.sql_path, args.temp_db_database, initial_load=False) + tags = Tags(args, sql, log) + + def test_multi_tag_mapping(self): + row = { + "Original Tag ID": "10", + "Original Tag": "original-tag-1", + "Original Tag Type": "classes", + "Original Parent Tag": "Genres", + "Related Fandom": "Fandom-1", + "Recommended AO3 Tag": "AO3-tag-1, AO3-tag-2, AO3-tag-3", + "Recommended AO3 Type": "characters, tags, tags", + "Recommended AO3 Category (for relationships)": "M/M, F/M", + "Original Tag Description": "", + "TW Notes": "", + } + num_insert = self.tags.update_tag_row(row) + self.assertEqual(num_insert, 4) + + row = { + "Original Tag ID": "11", + "Original Tag": "original-tag-2", + "Original Tag Type": "classes", + "Original Parent Tag": "Genres", + "Related Fandom": "Fandom-2", + "Recommended AO3 Tag": "AO3-tag-2, AO3-tag-4", + "Recommended AO3 Type": "tags, fandoms", + "Recommended AO3 Category (for relationships)": "M/M, F/M", + "Original Tag Description": "", + "TW Notes": "", + } + num_insert = self.tags.update_tag_row(row) + self.assertEqual(num_insert, 3) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_percent_symbol.py b/test/test_percent_symbol.py index cda9be7..ad3d01e 100644 --- a/test/test_percent_symbol.py +++ b/test/test_percent_symbol.py @@ -6,48 +6,55 @@ import argparse import datetime + def testArgs(): - parser = argparse.ArgumentParser(description='Test an archive database') - args = parser.parse_args() - setattr(args, "archive_type", "AA") - setattr(args, "db_host", "localhost") - setattr(args, "db_user", "root") - setattr(args, "db_password", "test") - setattr(args, "temp_db_database", "test_final_open_doors") - setattr(args, "output_database", "test_final_open_doors") - setattr(args, "default_fandom", "Fandom C (TV)") - setattr(args, "sql_path", "./test/test_data/test_final_tables.sql") - return args + parser = argparse.ArgumentParser(description="Test an archive database") + args = parser.parse_args() + setattr(args, "archive_type", "AA") + setattr(args, "db_host", "localhost") + setattr(args, "db_user", "root") + setattr(args, "db_password", "test") + setattr(args, "temp_db_database", "test_final_open_doors") + setattr(args, "output_database", "test_final_open_doors") + setattr(args, "default_fandom", "Fandom C (TV)") + setattr(args, "sql_path", "./test/test_data/test_final_tables.sql") + return args + class TestPercentSymbol(TestCase): - args = testArgs() - log = logger("test") - sql = Sql(args, log) - sql.run_script_from_file(args.sql_path, args.temp_db_database, initial_load=False) - final_tables = FinalTables(args, sql, log) - - def test_percent_symbol(self): - - test_item = [ - { 'id': 1, - 'title': 'story title', - 'summary ': '

This is a story summary with percent % symobol

', - 'notes': '', - 'author_id': 2, - 'date': datetime.datetime(2022, 9, 4, 22, 38, 47), - 'updated': datetime.datetime(2022, 9, 4, 22, 38, 47), - 'url': None, - 'ao3_url': None, - 'imported': 0, - 'do_not_import': 0, - 'coauthor_id': None - } - ] - self.final_tables.insert_into_final("stories", test_item) - extract_summary = self.sql.execute_and_fetchall(self.args.temp_db_database, - """SELECT summary FROM stories""") - - self.assertEqual(extract_summary[0][0], '

This is a story summary with percent % symobol

') - -if __name__ == '__main__': - unittest.main() \ No newline at end of file + args = testArgs() + log = logger("test") + sql = Sql(args, log) + sql.run_script_from_file(args.sql_path, args.temp_db_database, initial_load=False) + final_tables = FinalTables(args, sql, log) + + def test_percent_symbol(self): + test_item = [ + { + "id": 1, + "title": "story title", + "summary ": "

This is a story summary with percent % symobol

", + "notes": "", + "author_id": 2, + "date": datetime.datetime(2022, 9, 4, 22, 38, 47), + "updated": datetime.datetime(2022, 9, 4, 22, 38, 47), + "url": None, + "ao3_url": None, + "imported": 0, + "do_not_import": 0, + "coauthor_id": None, + } + ] + self.final_tables.insert_into_final("stories", test_item) + extract_summary = self.sql.execute_and_fetchall( + self.args.temp_db_database, """SELECT summary FROM stories""" + ) + + self.assertEqual( + extract_summary[0][0], + "

This is a story summary with percent % symobol

", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_populate_tags.py b/test/test_populate_tags.py index de4dcae..abcf0f5 100644 --- a/test/test_populate_tags.py +++ b/test/test_populate_tags.py @@ -10,60 +10,74 @@ def testArgs(): - parser = argparse.ArgumentParser(description='Test an archive database') - args = parser.parse_args([]) - setattr(args, "archive_type", "AA") - setattr(args, "db_host", "localhost") - setattr(args, "db_user", "root") - setattr(args, "db_password", "") - setattr(args, "temp_db_database", "unit_test") - setattr(args, "output_database", "unit_test_output") - setattr(args, "default_fandom", "Fandom C (TV)") - return args + parser = argparse.ArgumentParser(description="Test an archive database") + args = parser.parse_args([]) + setattr(args, "archive_type", "AA") + setattr(args, "db_host", "localhost") + setattr(args, "db_user", "root") + setattr(args, "db_password", "") + setattr(args, "temp_db_database", "unit_test") + setattr(args, "output_database", "unit_test_output") + setattr(args, "default_fandom", "Fandom C (TV)") + return args + class TestPopulate_tags(TestCase): - args = testArgs() - log = logger("test") - sql = MagicMock() - tags = Tags(args, sql, log) - final = MagicMock() - populate_tags = PopulateTags(args, sql, log, tags, final) + args = testArgs() + log = logger("test") + sql = MagicMock() + tags = Tags(args, sql, log) + final = MagicMock() + populate_tags = PopulateTags(args, sql, log, tags, final) + + basic_tags = { + "fandoms": [ + {"original_tag": "Fandom A", "ao3_tag": "Fandom A (TV)"}, + {"original_tag": "Fandom B", "ao3_tag": "Fandom B (TV)"}, + ], + "tags": [{"original_tag": "a tag", "ao3_tag": "A Tag"}], + "rating": [{"original_tag": "PG", "ao3_tag": "General Audiences"}], + "warnings": [ + {"original_tag": "Violence", "ao3_tag": "Graphic Depictions Of Violence"} + ], + } - basic_tags = { - 'fandoms': [ - {'original_tag': 'Fandom A', 'ao3_tag': 'Fandom A (TV)'}, - {'original_tag': 'Fandom B', 'ao3_tag': 'Fandom B (TV)'} - ], - 'tags': [ - {'original_tag': 'a tag', 'ao3_tag': 'A Tag'} - ], - 'rating': [{'original_tag': 'PG', 'ao3_tag': 'General Audiences'}], - 'warnings': [{'original_tag': 'Violence', 'ao3_tag': 'Graphic Depictions Of Violence'}] - } + def test_default_fandom_ignored_if_fandoms_present(self): + story_tags = self.populate_tags.tags_for_story(1, self.basic_tags) + self.assertCountEqual( + ["Fandom A (TV)", "Fandom B (TV)"], + story_tags["fandoms"].split(", "), + "Fandoms should be a comma-separated string of specified AO3 tags", + ) - def test_default_fandom_ignored_if_fandoms_present(self): - story_tags = self.populate_tags.tags_for_story(1, self.basic_tags) - self.assertCountEqual(['Fandom A (TV)', 'Fandom B (TV)'], story_tags['fandoms'].split(', '), 'Fandoms should be a comma-separated string of specified AO3 tags') + def test_default_fandom_used_if_no_fandoms_present(self): + tags_without_fandom = self.basic_tags.copy() + tags_without_fandom.pop("fandoms") + story_tags = self.populate_tags.tags_for_story(1, tags_without_fandom) + self.assertEqual( + "Fandom C (TV)", + story_tags["fandoms"], + "Fandoms should be a comma-separated string of specified AO3 tags", + ) - def test_default_fandom_used_if_no_fandoms_present(self): - tags_without_fandom = self.basic_tags.copy() - tags_without_fandom.pop('fandoms') - story_tags = self.populate_tags.tags_for_story(1, tags_without_fandom) - self.assertEqual('Fandom C (TV)', story_tags['fandoms'], 'Fandoms should be a comma-separated string of specified AO3 tags') + def test_cntw_added_if_warnings_present(self): + story_tags = self.populate_tags.tags_for_story(1, self.basic_tags) + self.assertCountEqual( + ["Graphic Depictions Of Violence", "Choose Not To Use Archive Warnings"], + story_tags["warnings"].split(", "), + "Warnings should be a comma-separated string of AO3 warnings that includes CNTW", + ) - def test_cntw_added_if_warnings_present(self): - story_tags = self.populate_tags.tags_for_story(1, self.basic_tags) - self.assertCountEqual( - ['Graphic Depictions Of Violence', 'Choose Not To Use Archive Warnings'], - story_tags['warnings'].split(', '), - 'Warnings should be a comma-separated string of AO3 warnings that includes CNTW' - ) + def test_cntw_added_if_no_warnings_present(self): + tags_without_warnings = self.basic_tags.copy() + tags_without_warnings.pop("warnings") + story_tags = self.populate_tags.tags_for_story(1, tags_without_warnings) + self.assertEqual( + "Choose Not To Use Archive Warnings", + story_tags["warnings"], + "Warnings should be a comma-separated string of AO3 warnings that includes CNTW", + ) - def test_cntw_added_if_no_warnings_present(self): - tags_without_warnings = self.basic_tags.copy() - tags_without_warnings.pop('warnings') - story_tags = self.populate_tags.tags_for_story(1, tags_without_warnings) - self.assertEqual('Choose Not To Use Archive Warnings', story_tags['warnings'], 'Warnings should be a comma-separated string of AO3 warnings that includes CNTW') -if __name__ == '__main__': - unittest.main() +if __name__ == "__main__": + unittest.main() diff --git a/test/test_step_05.py b/test/test_step_05.py index c4de8e3..bd7a2a9 100644 --- a/test/test_step_05.py +++ b/test/test_step_05.py @@ -15,8 +15,9 @@ step_05 = importlib.import_module("05-Create-Open-Doors-Tables") + def testArgs(): - parser = argparse.ArgumentParser(description='Test an archive database') + parser = argparse.ArgumentParser(description="Test an archive database") args = parser.parse_args() setattr(args, "archive_type", "AA") setattr(args, "db_host", "localhost") @@ -30,6 +31,7 @@ def testArgs(): setattr(args, "bookmark_ids_to_remove", "") return args + class TestStepFive(TestCase): args = testArgs() log = logger("test") @@ -54,5 +56,6 @@ def tearDown(self): drop_final = "drop database {}".format(self.args.output_database) self.sql.execute(drop_final, database=self.args.output_database) -if __name__ == '__main__': - unittest.main() \ No newline at end of file + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_tag_validator.py b/test/test_tag_validator.py index e723ff4..45d4e4e 100644 --- a/test/test_tag_validator.py +++ b/test/test_tag_validator.py @@ -5,6 +5,7 @@ from shared_python.Logging import logger from shared_python.TagValidator import TagValidator + class testTag_Validator(TestCase): log = logger("test") tag_validator = TagValidator(log) @@ -78,7 +79,9 @@ def test_identify_rating_wrong_word(self): # Warning Dictionary Tests def test_identify_warning_choose_no_archive_warnings(self): tag = self.tag_validator.identify_warning("Choose Not To Use Archive Warnings") - self.assertEqual(tag, 1, "warning Choose Not To Use Archive Warnings should be 1") + self.assertEqual( + tag, 1, "warning Choose Not To Use Archive Warnings should be 1" + ) def test_identify_warning_graphic_violence(self): tag = self.tag_validator.identify_warning("Graphic Depictions Of Violence") @@ -164,7 +167,9 @@ def test_validate_and_fix_tag_type_characters(self): def test_validate_and_fix_tag_type_relationships(self): tag = self.tag_validator.validate_and_fix_tag_type("relationships") - self.assertEqual(tag, "relationships", "relationships should pass with no fixes") + self.assertEqual( + tag, "relationships", "relationships should pass with no fixes" + ) def test_validate_and_fix_tag_type_tags(self): tag = self.tag_validator.validate_and_fix_tag_type("tags") @@ -177,11 +182,15 @@ def test_validate_and_fix_tag_type_correct_missing_s(self): def test_validate_and_fix_tag_type_correct_extra_s(self): tag = self.tag_validator.validate_and_fix_tag_type("ratings") - self.assertEqual(tag, "rating", "ratings should pass with successful self-correction") + self.assertEqual( + tag, "rating", "ratings should pass with successful self-correction" + ) def test_validate_and_fix_tag_type_correct_wrong_case(self): tag = self.tag_validator.validate_and_fix_tag_type("Rating") - self.assertEqual(tag, "rating", "Rating should pass with successful self-correction") + self.assertEqual( + tag, "rating", "Rating should pass with successful self-correction" + ) def test_validate_and_fix_tag_type_correct_wrong_case_and_missing_s(self): tag = self.tag_validator.validate_and_fix_tag_type("Tag") @@ -189,23 +198,39 @@ def test_validate_and_fix_tag_type_correct_wrong_case_and_missing_s(self): def test_validate_and_fix_tag_type_correct_wrong_case_and_extra_s(self): tag = self.tag_validator.validate_and_fix_tag_type("Ratings") - self.assertEqual(tag, "rating", "Ratings should pass with successful self-correction") + self.assertEqual( + tag, "rating", "Ratings should pass with successful self-correction" + ) # Validate and Fix Tag Type Tests: Pass with Prompts - @patch('builtins.input', return_value='tags') + @patch("builtins.input", return_value="tags") def test_validate_and_fix_tag_type_one_prompt_no_selfcorrect(self, mock_input): tag = self.tag_validator.validate_and_fix_tag_type("foobar") - self.assertEqual(tag, "tags", "foobar should prompt for manual correction without self-correction") + self.assertEqual( + tag, + "tags", + "foobar should prompt for manual correction without self-correction", + ) - @patch('builtins.input', return_value='tag') + @patch("builtins.input", return_value="tag") def test_validate_and_fix_tag_type_one_prompt_selfcorrect(self, mock_input): tag = self.tag_validator.validate_and_fix_tag_type("foobar") - self.assertEqual(tag, "tags", "foobar should prompt for manual correct followed by self-correction") - - @patch('builtins.input', side_effect=['foo', 'facts', 'tag']) - def test_validate_and_fix_tag_type_one_prompt_selfcorrect_side_effects(self, mock_input): + self.assertEqual( + tag, + "tags", + "foobar should prompt for manual correct followed by self-correction", + ) + + @patch("builtins.input", side_effect=["foo", "facts", "tag"]) + def test_validate_and_fix_tag_type_one_prompt_selfcorrect_side_effects( + self, mock_input + ): tag = self.tag_validator.validate_and_fix_tag_type("foobar") - self.assertEqual(tag, "tags", "foobar should prompt for manual correction thrice with self-correction") + self.assertEqual( + tag, + "tags", + "foobar should prompt for manual correction thrice with self-correction", + ) # Validate and Fix Tag Tests: Pass with No Self Correction or Prompts def test_validate_and_fix_tag_rating(self): @@ -225,16 +250,23 @@ def test_validate_and_fix_tag_other(self): self.assertEqual(tag, "Kirk/Spock", "Kirk/Spock should pass with no fixes") # Validate and Fix Tag Tests: Pass with Prompts - @patch('builtins.input', return_value='Not Rated') + @patch("builtins.input", return_value="Not Rated") def test_validate_and_fix_tag_type_one_prompt(self, mock_input): tag = self.tag_validator.validate_and_fix_tag("not ated", "rating") - self.assertEqual(tag, "Not Rated", "not ated should prompt for manual correction to Not Rated") + self.assertEqual( + tag, + "Not Rated", + "not ated should prompt for manual correction to Not Rated", + ) - @patch('builtins.input', side_effect=['mlm', 'M/M']) + @patch("builtins.input", side_effect=["mlm", "M/M"]) def test_validate_and_fix_tag_two_prompts(self, mock_input): tag = self.tag_validator.validate_and_fix_tag("male x male", "categories") - self.assertEqual(tag, "M/M", "male x male should prompt twice for manual correction to M/M") + self.assertEqual( + tag, "M/M", "male x male should prompt twice for manual correction to M/M" + ) + -if __name__ == '__main__': - print('here') - unittest.main() +if __name__ == "__main__": + print("here") + unittest.main() diff --git a/test/test_tags_length.py b/test/test_tags_length.py index 881ddb1..a92c7e5 100644 --- a/test/test_tags_length.py +++ b/test/test_tags_length.py @@ -6,62 +6,68 @@ import argparse import datetime + def testArgs(): - parser = argparse.ArgumentParser(description='Test an archive database') - args = parser.parse_args() - setattr(args, "archive_type", "AA") - setattr(args, "db_host", "localhost") - setattr(args, "db_user", "root") - setattr(args, "db_password", "test") - setattr(args, "temp_db_database", "test_final_open_doors") - setattr(args, "output_database", "test_final_open_doors") - setattr(args, "default_fandom", "Fandom C (TV)") - setattr(args, "sql_path", "./shared_python/create-open-doors-tables.sql") - return args + parser = argparse.ArgumentParser(description="Test an archive database") + args = parser.parse_args() + setattr(args, "archive_type", "AA") + setattr(args, "db_host", "localhost") + setattr(args, "db_user", "root") + setattr(args, "db_password", "test") + setattr(args, "temp_db_database", "test_final_open_doors") + setattr(args, "output_database", "test_final_open_doors") + setattr(args, "default_fandom", "Fandom C (TV)") + setattr(args, "sql_path", "./shared_python/create-open-doors-tables.sql") + return args + class TestTagsLength(TestCase): - args = testArgs() - log = logger("test") - sql = Sql(args, log) - sql.execute("DROP DATABASE IF EXISTS test_final_open_doors;") - sql.run_script_from_file(args.sql_path, args.temp_db_database, initial_load=False) - final_tables = FinalTables(args, sql, log) + args = testArgs() + log = logger("test") + sql = Sql(args, log) + sql.execute("DROP DATABASE IF EXISTS test_final_open_doors;") + sql.run_script_from_file(args.sql_path, args.temp_db_database, initial_load=False) + final_tables = FinalTables(args, sql, log) - def test_tags_length(self): - - test_item = [ - { 'id': 1, - 'title': 'story title', - 'summary ': '

story summary

', - 'notes': '', - 'author_id': 2, - 'date': datetime.datetime(2022, 9, 4, 22, 38, 47), - 'updated': datetime.datetime(2022, 9, 4, 22, 38, 47), - 'url': None, - 'ao3_url': None, - 'imported': 0, - 'do_not_import': 0, - 'coauthor_id': None - } - ] - long_tags = """Previous code fails to process tags longer than 255 chars. + def test_tags_length(self): + test_item = [ + { + "id": 1, + "title": "story title", + "summary ": "

story summary

", + "notes": "", + "author_id": 2, + "date": datetime.datetime(2022, 9, 4, 22, 38, 47), + "updated": datetime.datetime(2022, 9, 4, 22, 38, 47), + "url": None, + "ao3_url": None, + "imported": 0, + "do_not_import": 0, + "coauthor_id": None, + } + ] + long_tags = """Previous code fails to process tags longer than 255 chars. This is a long test tags with length greater than 255 chars. This is a long test tags with length greater than 255 chars. This is a long test tags with length greater than 255 chars. This is a long test tags with length greater than 255 chars.""" - story_tags = {'categories': 'M/M', - 'fandoms': 'This is a fandom', - 'rating': 'Explicit', - 'tags': long_tags, - 'relationships': 'AAA/BBB'} - story_id = 1 - output_table_name = "stories" - self.final_tables.insert_into_final(output_table_name, test_item) - self.final_tables.populate_story_tags(story_id, output_table_name, story_tags) - - extract_summary = self.sql.execute_and_fetchall(self.args.temp_db_database, - """SELECT tags FROM stories""") - self.assertEqual(extract_summary[0][0], long_tags) + story_tags = { + "categories": "M/M", + "fandoms": "This is a fandom", + "rating": "Explicit", + "tags": long_tags, + "relationships": "AAA/BBB", + } + story_id = 1 + output_table_name = "stories" + self.final_tables.insert_into_final(output_table_name, test_item) + self.final_tables.populate_story_tags(story_id, output_table_name, story_tags) + + extract_summary = self.sql.execute_and_fetchall( + self.args.temp_db_database, """SELECT tags FROM stories""" + ) + self.assertEqual(extract_summary[0][0], long_tags) + -if __name__ == '__main__': - unittest.main() \ No newline at end of file +if __name__ == "__main__": + unittest.main() diff --git a/xx-Remove-DNI-from-Open-Doors-Tables.py b/xx-Remove-DNI-from-Open-Doors-Tables.py index bd77023..60eccae 100755 --- a/xx-Remove-DNI-from-Open-Doors-Tables.py +++ b/xx-Remove-DNI-from-Open-Doors-Tables.py @@ -5,21 +5,23 @@ from shared_python.Sql import Sql if __name__ == "__main__": - args_obj = Args() - args = args_obj.args_for_05() - log = args_obj.logger_with_filename() - sql = Sql(args, log) + args_obj = Args() + args = args_obj.args_for_05() + log = args_obj.logger_with_filename() + sql = Sql(args, log) - filter = 'WHERE `id` in ' + filter = "WHERE `id` in " - story_exclusion_filter = '' - # Filter out DNI stories - story_ids_to_remove must be comma-separated list of DNI ids - if os.path.exists(args.story_ids_to_remove): - with open(args.story_ids_to_remove, "rt") as f: - for line in f: - story_exclusion_filter = filter + '(' + line + ')' + story_exclusion_filter = "" + # Filter out DNI stories - story_ids_to_remove must be comma-separated list of DNI ids + if os.path.exists(args.story_ids_to_remove): + with open(args.story_ids_to_remove, "rt") as f: + for line in f: + story_exclusion_filter = filter + "(" + line + ")" - command = "DELETE FROM `{}`.`stories` {}".format(args.output_database, story_exclusion_filter) - print(command) - result = sql.execute(command) - print(result) + command = "DELETE FROM `{}`.`stories` {}".format( + args.output_database, story_exclusion_filter + ) + print(command) + result = sql.execute(command) + print(result) diff --git a/xx-Remove-emails-from-Open-Doors-Tables.py b/xx-Remove-emails-from-Open-Doors-Tables.py index 05141f9..9c0ca29 100755 --- a/xx-Remove-emails-from-Open-Doors-Tables.py +++ b/xx-Remove-emails-from-Open-Doors-Tables.py @@ -10,27 +10,37 @@ r"([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\"([]!#-[^-~ \t]|(\\[\t -~]))+\")@([-!#-'*+/-9=?A-Z^-~]+(\.[-!#-'*+/-9=?A-Z^-~]+)*|\[[\t -Z^-~]*])" ) + def print_context(match, amount: int): start, end = match.span() - pre_context = "\t" + match.string[max(start - amount, 0) : start].replace('\n', '\n\t') - value = match.string[start : end] - post_context = match.string[end : end + amount].replace('\n', '\n\t') - print_formatted_text(FormattedText([ - ('', pre_context), - ('#ff0000 bold', value), - ('', post_context), - ])) + pre_context = "\t" + match.string[max(start - amount, 0) : start].replace( + "\n", "\n\t" + ) + value = match.string[start:end] + post_context = match.string[end : end + amount].replace("\n", "\n\t") + print_formatted_text( + FormattedText( + [ + ("", pre_context), + ("#ff0000 bold", value), + ("", post_context), + ] + ) + ) + def does_contain_letters(text: str) -> bool: - return any(x in text for x in 'qwertyuiopasdfghjklzxcvbnm') + return any(x in text for x in "qwertyuiopasdfghjklzxcvbnm") + def is_mailto(match) -> bool: start, _ = match.span() - mailto = 'mailto:' + mailto = "mailto:" if len(mailto) > start: return False return mailto == match.string[start - len(mailto) : start] + def ask_user_for_action(match) -> str: start, end = match.span() raw_email = match.string[start:end] @@ -51,9 +61,7 @@ def ask_user_for_action(match) -> str: new_email = input("Enter new email: ") if "@" in new_email: addresses[raw_email] = new_email - elif any(x in response for x in "wb") and any( - x in response for x in "ad" - ): + elif any(x in response for x in "wb") and any(x in response for x in "ad"): should_block = "b" in response if "d" in response: domains[domain] = not should_block @@ -66,6 +74,7 @@ def ask_user_for_action(match) -> str: domains = {} addresses = {} + def return_from_list(match) -> str: start, end = match.span() raw_email = match.string[start:end] @@ -84,9 +93,11 @@ def return_from_list(match) -> str: return BAN_TEXT raise Exception("Failed to resolve") + def escape_for_sql(raw: str) -> str: return raw.replace('"', '\\"').replace("\n", "\\n").replace("\t", "\\t") + if __name__ == "__main__": args_obj = Args() args = args_obj.args_for_05() @@ -99,9 +110,9 @@ def escape_for_sql(raw: str) -> str: ) ] chapter_count = int( - sql.execute_and_fetchall( - args.output_database, "SELECT COUNT(*) FROM chapters" - )[0][0] + sql.execute_and_fetchall(args.output_database, "SELECT COUNT(*) FROM chapters")[ + 0 + ][0] ) for index, (id, title, text, notes) in enumerate( sql.execute_and_fetchall( @@ -128,7 +139,7 @@ def replace_func(email): if is_mailto(email): # Mailto links are presumed to be real addresses[raw_email] = False - try: + try: return return_from_list(email) except: # noqa: E722 return ask_user_for_action(email) @@ -144,4 +155,6 @@ def replace_func(email): WHERE id = %s; """.strip() - sql.execute(update_query, (cleared_text, cleared_notes, id), args.output_database) + sql.execute( + update_query, (cleared_text, cleared_notes, id), args.output_database + )