diff --git a/.isort.cfg b/.isort.cfg index b896d6387..baed56af5 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -4,4 +4,4 @@ include_trailing_comma = true force_grid_wrap = 0 use_parentheses = true line_length = 100 -known_third_party =alembic,dateutil,geoalchemy2,geopy,halo,iterfzf,pg8000,pint,prompt_toolkit,pyfiglet,pygments,pytest,setuptools,shapely,sqlalchemy,tabulate,testing,tqdm \ No newline at end of file +known_third_party =alembic,dateutil,docx,geoalchemy2,geopy,halo,iterfzf,pg8000,pint,prompt_toolkit,pyfiglet,pygments,pytest,setuptools,shapely,sqlalchemy,tabulate,testing,tqdm \ No newline at end of file diff --git a/importers/word_narrative_importer.py b/importers/word_narrative_importer.py new file mode 100644 index 000000000..7895232db --- /dev/null +++ b/importers/word_narrative_importer.py @@ -0,0 +1,386 @@ +import os +import re +from datetime import datetime +from xml.etree.ElementTree import XML + +from docx import Document + +from pepys_import.core.validators import constants +from pepys_import.file.importer import Importer + +WORD_NAMESPACE = "{http://schemas.openxmlformats.org/wordprocessingml/2006/main}" +TEXT = WORD_NAMESPACE + "t" + + +class WordNarrativeImporter(Importer): + def __init__(self): + super().__init__( + name="Word Narrative Format Importer", + validation_level=constants.BASIC_LEVEL, + short_name="Word Narrative Importer", + default_privacy="Public", + datafile_type="Word Narrative", + ) + + self.last_day = None + self.last_month = None + self.last_year = None + + def can_load_this_type(self, suffix): + return suffix.upper() in [".DOCX", ".PDF"] + + def can_load_this_filename(self, filename): + return True + + def can_load_this_header(self, header): + return True + + def can_load_this_file(self, file_contents): + return True + + def _load_this_file(self, data_store, path, file_object, datafile, change_id): + # Store datafile, data_store and change_id in the object so we don't have + # to keep passing them around + self.datafile = datafile + self.data_store = data_store + self.change_id = change_id + + _, ext = os.path.splitext(path) + if ext.upper() == ".DOCX": + header, entries, error = self.load_docx_file(path) + elif ext.upper() == ".PDF": + header, entries, error = self.load_pdf_file(path) + else: + self.errors.append({self.error_type: f"Unsupported file extension {ext}."}) + return + + if error: + # Stop parsing if there was an error during loading that we can't recover from + return + + self.parse_file(header, entries) + + def parse_file(self, header, entries): + platform_from_header = header.get("platform", None) + self.platform = self.get_cached_platform( + self.data_store, platform_name=platform_from_header, change_id=self.change_id + ) + print(self.platform) + + # Loop through each entry in the file + for entry in entries: + stripped_entry = entry.strip() + print(f"Entry {stripped_entry}") + if stripped_entry == "": + # Skip blank entries + continue + + parts = stripped_entry.split(",") + + correct_length = len(parts) > 5 + has_length_and_four_fig_datetime = correct_length and re.fullmatch(r"\d{4}", parts[0]) + has_length_and_six_fig_datetime = correct_length and re.fullmatch(r"\d{6}", parts[0]) + + is_comma_sep_with_datetime = ( + has_length_and_four_fig_datetime or has_length_and_six_fig_datetime + ) + + if is_comma_sep_with_datetime: + self.process_comma_sep_entry(header, parts, has_length_and_four_fig_datetime) + else: + # The entry isn't comma separated with a datetime at the start + # These entries mostly occur in PDFs not DOCXs - but we check for them + # everywhere. + # Even though it isn't comma separated, it might still have a date at the + # beginning and look like this: + # 120500 Message 1 (NB: the message could still include FCS entries etc) + # Or it could be a date block marker like this: + # 12 Dec 95 + # Or it could be a bit of text that just needs adding on to the previous entry + # So, check for these one at a time + # + # Here we check if it starts with 4 or 6 digits, followed by whitespace + if re.match(r"\d{4}\w", stripped_entry) or re.match(r"\d{6}\w", stripped_entry): + # If so, we process the entry + self.process_non_comma_entry(header, stripped_entry) + else: + # Try parsing the line as a date in the formats + # dd MMM yy + # dd MMM yyyy + # For example, "12 DEC 1995" + formats = ["%d %b %y", "%d %b %Y"] + timestamp = None + for date_format in formats: + try: + timestamp = datetime.strptime(stripped_entry, date_format) + except ValueError: + continue + + if timestamp is not None: + # We've got a valid timestamp + # So store the details ready for use with any lines that follow it + self.last_day = timestamp.day + self.last_month = timestamp.month + self.last_year = timestamp.year + continue + + # If we've got here, then we just have some text that needs appending to the previous entry + # TODO: Append entry + + def process_non_comma_entry(self, header, stripped_entry): + print(f"Found non comma entry: {stripped_entry}") + split_by_whitespace = stripped_entry.split() + timestamp_str = split_by_whitespace[0].strip() + + try: + timestamp = self.parse_singlepart_datetime(timestamp_str) + except Exception as e: + self.errors.append( + {self.error_type: f"Error parsing timestamp {timestamp_str}, error was {str(e)}"} + ) + return + + message_text = stripped_entry.replace(timestamp_str, "").strip() + + self.store_comment(timestamp, None, message_text) + + def parse_singlepart_datetime(self, timestamp_str): + if self.last_day is None or self.last_month is None or self.last_year is None: + raise ValueError("No previous day/month/year block") + + if len(timestamp_str) == 6: + day = int(timestamp_str[0:2]) + hour = int(timestamp_str[2:4]) + mins = int(timestamp_str[4:6]) + + if day < self.last_day: + # Day has gone down, so month must go up + + # However, if month is 12 then it must go to 1 and year must go up + if self.last_month == 12: + month = 1 + year = self.last_year + 1 + else: + month = self.last_month + 1 + year = self.last_year + else: + month = self.last_month + year = self.last_year + + timestamp = datetime(year, month, day, hour, mins) + return timestamp + elif len(timestamp_str) == 4: + hour = int(timestamp_str[0:2]) + mins = int(timestamp_str[2:4]) + + timestamp = datetime(self.last_year, self.last_month, self.last_day, hour, mins) + return timestamp + else: + raise ValueError("Timestamp must be 4 digits (HHMM) or 6 digits (DDHHMM)") + + def process_comma_sep_entry(self, header, parts, has_length_and_four_fig_datetime): + # Parse datetime + timestamp, error = self.parse_multipart_datetime( + parts, four_fig=has_length_and_four_fig_datetime + ) + if error: + return + + # Process rest of entry + entry_platform_name = parts[4].strip() + + if entry_platform_name.upper() != header["platform"].upper(): + header_platform_name = header["platform"] + self.errors.append( + { + self.error_type: f"Platform name in entry ('{entry_platform_name}') doesn't match platform name in header ('{header_platform_name}')" + } + ) + return + + message_type = parts[5].strip() + + if message_type.upper() == "FCS": + # It's a Fire Control Solution message + self.process_fcs_message(timestamp, entry_platform_name, parts[6:]) + else: + # It's another type of message + if len(message_type) > 20: + # Sometimes there isn't the end comma on the message type field + # which means it gets merged with the text field + # If this field is very long then this is probably what happened + # So we find the first location of a tab, and split on that + index = message_type.find("\t") + if index != -1: + text = message_type[index:].strip() + message_type = message_type[:index].strip() + else: + fulltext = ",".join(parts) + self.errors.append( + { + self.error_type: f"Can't separate message type and text, are fields mangled or a comma missing? {fulltext}" + } + ) + return + else: + text = ",".join(parts[6:]).strip() + + print(f"Timestamp: {timestamp}") + print(f"message_type: {message_type}") + print(f"text: {text}") + + # TODO: Work out here if we've got a state entry in the comment + # and if so then parse it and store it + + # Store message data here + self.store_comment(timestamp, message_type, text) + + def process_fcs_message(self, timestamp, platform_name, fcs_parts): + pass + + def store_comment(self, timestamp, message_type, text): + if message_type is None: + comment_type = self.data_store.add_to_comment_types("General Comment", self.change_id) + else: + comment_type = self.data_store.add_to_comment_types(message_type, self.change_id) + + self.last_comment = self.datafile.create_comment( + data_store=self.data_store, + platform=self.platform, + timestamp=timestamp, + comment=text, + comment_type=comment_type, + parser_name=self.short_name, + ) + + def parse_multipart_datetime(self, parts, four_fig): + day_visible = None + + # Get the parts separated by commas, as they're always there + day_hidden = int(parts[1]) + month = int(parts[2]) + year = int(parts[3]) + + if four_fig: + # It's a four figure time with just HHMM + hour = int(parts[0][0:2]) + mins = int(parts[0][2:4]) + else: + # It's a six figure time with DDHHMM + day_visible = int(parts[0][0:2]) # day in the visible text + hour = int(parts[0][2:4]) + mins = int(parts[0][4:6]) + + # Deal with entries that might need to be pulled back from the next day + # If something that happened at 2345 only gets entered at 0010 then + # the hidden text will have the next day in it, when it should be + # the previous day + if hour == 23: + if day_hidden == day_visible + 1: + day_hidden = day_visible + + if day_hidden != day_visible: + full_text = ",".join(parts) + self.errors.append( + { + self.error_type: f"Day in text doesn't match day in hidden text - possible copy/paste error: '{full_text}'." + } + ) + return None, True + + day = day_visible or day_hidden + + day_decreased = (self.last_day is not None) and (day < self.last_day) + month_increased = (self.last_month is not None) and (month > self.last_month) + year_increased = (self.last_month is not None) and (year > self.last_year) + + # Deal with entries where the day has decreased (ie. gone to the beginning of the next month) + # but the month and/or year hasn't increased + # This suggests that there has been a copy-paste error, mangling the data + if day_decreased and ((not month_increased) or (not year_increased)): + self.errors.append( + {self.error_type: f"Day decreased but month/year didn't increase: {parts[0]}."} + ) + return None, True + else: + # Everything makes sense, so we can update the last_X variables + self.last_day = day_visible + self.last_month = month + self.last_year = year + + if year < 100: + # If a two digit year + if year > 80: + # If it is from 80s onwards then it's 1900s + year = 1900 + year + else: + year = 2000 + year + + if year < 1900 or year > 2100: + self.errors.append({self.error_type: f"Year too big or too small: {year}."}) + return None, True + + try: + timestamp = datetime(year, month, day, hour, mins) + except ValueError: + full_text = ",".join(parts) + self.errors.append({self.error_type: f"Could not parse timestamp {full_text}."}) + return None, True + + return timestamp, False + + def load_docx_file(self, path): + try: + doc = Document(path) + except Exception as e: + self.errors.append( + {self.error_type: f'Invalid docx file at {path}\nError from parsing was "{str(e)}"'} + ) + return None, None, True + + try: + # Get text from the header + # Headers are attached to a document section, so we need to extract the section first + sec = doc.sections[0] + header_text = "" + for para in sec.header.paragraphs: + header_text += "\n" + para.text + + splitted = re.split("[\n\t]+", header_text.strip()) + header = {} + header["privacy"] = splitted[0].strip() + header["platform"] = splitted[1].strip() + header["exercise"] = splitted[4].strip() + header["fulltext"] = header_text.strip() + except Exception: + # Couldn't extract header, so presumably doesn't have a header + # That's ok - we just create an empty dict + header = {} + + try: + # Get each paragraph entry, after accepting any tracked changes + entries = [] + for p in doc.paragraphs: + entries.append(self.get_accepted_text(p)) + except Exception as e: + self.errors.append( + {self.error_type: f'Cannot extract paragraphs\nError from parsing was "{str(e)}"'} + ) + return None, None, True + + return header, entries, False + + def get_accepted_text(self, p): + """Return text of a paragraph after accepting all changes. + + This gets the XML content of the paragraph and checks for deletions or insertions. If there + aren't any, then it just returns the text. If there are some, then it parses the XML and + joins the individual text entries.""" + # Taken from https://stackoverflow.com/questions/38247251/how-to-extract-text-inserted-with-track-changes-in-python-docx + xml = p._p.xml + if "w:del" in xml or "w:ins" in xml: + tree = XML(xml) + runs = (node.text for node in tree.iter(TEXT) if node.text) + return "".join(runs) + else: + return p.text diff --git a/pepys_import/file/file_processor.py b/pepys_import/file/file_processor.py index 3db62dcb1..f49ff320e 100644 --- a/pepys_import/file/file_processor.py +++ b/pepys_import/file/file_processor.py @@ -216,17 +216,17 @@ def process_file(self, file_object, current_path, data_store, processed_ctr, imp # Get the file contents, for the final check try: file_contents = self.get_file_contents(full_path) + + # lastly the contents + tmp_importers = good_importers.copy() + for importer in tmp_importers: + if not importer.can_load_this_file(file_contents): + good_importers.remove(importer) except Exception: # Can't get the file contents - eg. because it's not a proper # unicode text file (This can occur for binary files in the same folders) - # So skip the file - return processed_ctr - - # lastly the contents - tmp_importers = good_importers.copy() - for importer in tmp_importers: - if not importer.can_load_this_file(file_contents): - good_importers.remove(importer) + # So continue to try and process it without checking the file contents + pass # if good importers list is empty, return processed_ctr, # which means the file is not processed diff --git a/pepys_import/file/highlighter/highlighter.py b/pepys_import/file/highlighter/highlighter.py index e3eea2ff9..42997df46 100644 --- a/pepys_import/file/highlighter/highlighter.py +++ b/pepys_import/file/highlighter/highlighter.py @@ -89,8 +89,22 @@ def not_limited_lines(self): """ Return a list of Line objects for each line in the file """ - with open(self.filename, "r") as file: - file_contents = file.read() + try: + with open(self.filename, "r") as file: + file_contents = file.read() + except UnicodeDecodeError: + # If we get a unicode error then it means that the file we're trying to read + # is a binary file, and we can't do highlighting on it, so we return an empty + # list of lines + # Note: This will mean that any importer that tries to process this file + # using the HighlightedFile.lines() method will get nothing, and therefore + # the loop over lines will never execute + # This could potentially cause some files to be skipped incorrectly, + # if they are text files but have unicode errors in them. + print( + f"Warning: trying to process highlighting for a binary file {self.filename} - skipping" + ) + return [] lines_list = file_contents.splitlines() lines = self.create_lines(file_contents, lines_list) diff --git a/requirements.txt b/requirements.txt index 6ee176355..8d5565050 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,4 +14,5 @@ pg8000>=1.14.1 setuptools>=40.8.0 Pygments>=2.6.1 geopy>=1.22 -halo>=0.0.31 \ No newline at end of file +halo>=0.0.31 +python-docx>=0.8.10 \ No newline at end of file diff --git a/tests/sample_data/track_files/other_data/ASW Data Format2.doc b/tests/sample_data/track_files/other_data/ASW Data Format2.doc deleted file mode 100644 index 4c1b1bfc7..000000000 Binary files a/tests/sample_data/track_files/other_data/ASW Data Format2.doc and /dev/null differ diff --git a/tests/sample_data/track_files/word/FCS_extra_narrativetypes 2.doc b/tests/sample_data/track_files/word/FCS_extra_narrativetypes 2.doc new file mode 100644 index 000000000..86b67841c Binary files /dev/null and b/tests/sample_data/track_files/word/FCS_extra_narrativetypes 2.doc differ diff --git a/tests/sample_data/track_files/word/Narrative Example.docx b/tests/sample_data/track_files/word/Narrative Example.docx new file mode 100644 index 000000000..5aa8b66b6 Binary files /dev/null and b/tests/sample_data/track_files/word/Narrative Example.docx differ diff --git a/tests/sample_data/track_files/word/NarrativeExample_NoHiddenText.docx b/tests/sample_data/track_files/word/NarrativeExample_NoHiddenText.docx new file mode 100644 index 000000000..5e68c3917 Binary files /dev/null and b/tests/sample_data/track_files/word/NarrativeExample_NoHiddenText.docx differ diff --git a/tests/sample_data/track_files/word/test_narrative.docx b/tests/sample_data/track_files/word/test_narrative.docx new file mode 100644 index 000000000..14e7a0a3b Binary files /dev/null and b/tests/sample_data/track_files/word/test_narrative.docx differ diff --git a/tests/test_import_cli.py b/tests/test_import_cli.py index 2896f4aa4..c6fa6e66e 100644 --- a/tests/test_import_cli.py +++ b/tests/test_import_cli.py @@ -18,6 +18,7 @@ FILE_PATH = os.path.dirname(__file__) DATA_PATH = os.path.join(FILE_PATH, "sample_data/track_files/other_data") +EMPTY_FOLDER = os.path.join(FILE_PATH, "sample_data/track_files/empty_folder") REP_WITH_ERRORS_PATH = os.path.join( FILE_PATH, "sample_data/track_files/rep_data/uk_track_failing_enh_validation.rep" ) @@ -192,18 +193,6 @@ def test_import_with_wrong_type_db_field(self, patched_print): assert "ERROR: SQL error when communicating with database" in output -@patch("pepys_import.cli.DefaultResolver") -def test_process_resolver_specification_default(patched_default_resolver): - process(resolver="default") - patched_default_resolver.assert_called_once() - - -@patch("pepys_import.cli.CommandLineResolver") -def test_process_resolver_specification_cli(patched_cl_resolver): - process(resolver="command-line") - patched_cl_resolver.assert_called_once() - - @patch("pepys_import.cli.custom_print_formatted_text", side_effect=side_effect) @patch("pepys_import.cli.CommandLineResolver") @patch("pepys_import.cli.DefaultResolver") diff --git a/tests/test_load_word_narrative.py b/tests/test_load_word_narrative.py new file mode 100644 index 000000000..a2d4a51a7 --- /dev/null +++ b/tests/test_load_word_narrative.py @@ -0,0 +1,137 @@ +import os +import unittest +from datetime import datetime +from unittest.mock import patch + +from importers.word_narrative_importer import WordNarrativeImporter +from pepys_import.core.store.data_store import DataStore +from pepys_import.file.file_processor import FileProcessor + +FILE_PATH = os.path.dirname(__file__) +FULL_NARRATIVE_PATH = os.path.join(FILE_PATH, "sample_data/track_files/word/Narrative Example.docx") +NO_HIDDEN_TEXT_PATH = os.path.join( + FILE_PATH, "sample_data/track_files/word/NarrativeExample_NoHiddenText.docx" +) + + +class TestLoadWordNarrative(unittest.TestCase): + def setUp(self): + self.store = DataStore("", "", "", 0, ":memory:", db_type="sqlite") + self.store.initialise() + + def tearDown(self): + pass + + @patch("pepys_import.core.store.common_db.prompt", return_value="2") + def test_load_word_data_full_narrative(self, patched_prompt): + processor = FileProcessor(archive=False) + processor.register_importer(WordNarrativeImporter()) + + # check states empty + with self.store.session_scope(): + # there must be no states at the beginning + states = self.store.session.query(self.store.db_classes.State).all() + self.assertEqual(len(states), 0) + + # there must be no platforms at the beginning + platforms = self.store.session.query(self.store.db_classes.Platform).all() + self.assertEqual(len(platforms), 0) + + # there must be no datafiles at the beginning + datafiles = self.store.session.query(self.store.db_classes.Datafile).all() + self.assertEqual(len(datafiles), 0) + + # parse the file + processor.process(FULL_NARRATIVE_PATH, self.store, False) + + # # check data got created + # with self.store.session_scope(): + # # there must be no states after the import + # states = self.store.session.query(self.store.db_classes.State).all() + # self.assertEqual(len(states), 0) + + # # there must be 1 platform after the import + # platforms = self.store.session.query(self.store.db_classes.Platform).all() + # self.assertEqual(len(platforms), 1) + + # # there must be one datafile afterwards + # datafiles = self.store.session.query(self.store.db_classes.Datafile).all() + # self.assertEqual(len(datafiles), 1) + + # # there must be 25 comments afterwards + # comments = self.store.session.query(self.store.db_classes.Comment).all() + # self.assertEqual(len(comments), 25) + + # # There should be 15 Comment entries with the text 'Message 1' + # comments_with_message_1 = ( + # self.store.session.query(self.store.db_classes.Comment) + # .filter(self.store.db_classes.Comment.content == "Message 1") + # .all() + # ) + + # assert len(comments_with_message_1) == 25 + + # # The first one should have a timestamp of 1995-12-12 05:00 + # assert comments_with_message_1[0].timestamp == datetime(1995, 12, 12, 5, 0) + + # # The last one should have a timestamp of 1995-12-13 05:17 + # assert comments_with_message_1[-1].timestamp == datetime(1995, 12, 13, 5, 17) + + @patch("pepys_import.core.store.common_db.prompt", return_value="2") + def test_load_word_data_no_hidden_text(self, patched_prompt): + processor = FileProcessor(archive=False) + processor.register_importer(WordNarrativeImporter()) + + # check states empty + with self.store.session_scope(): + # there must be no states at the beginning + states = self.store.session.query(self.store.db_classes.State).all() + self.assertEqual(len(states), 0) + + # there must be no platforms at the beginning + platforms = self.store.session.query(self.store.db_classes.Platform).all() + self.assertEqual(len(platforms), 0) + + # there must be no datafiles at the beginning + datafiles = self.store.session.query(self.store.db_classes.Datafile).all() + self.assertEqual(len(datafiles), 0) + + # parse the file + processor.process(NO_HIDDEN_TEXT_PATH, self.store, False) + + # check data got created + with self.store.session_scope(): + # there must be no states after the import + states = self.store.session.query(self.store.db_classes.State).all() + self.assertEqual(len(states), 0) + + # there must be 1 platform after the import + platforms = self.store.session.query(self.store.db_classes.Platform).all() + self.assertEqual(len(platforms), 1) + + # there must be one datafile afterwards + datafiles = self.store.session.query(self.store.db_classes.Datafile).all() + self.assertEqual(len(datafiles), 1) + + # there must be 25 comments afterwards + comments = self.store.session.query(self.store.db_classes.Comment).all() + self.assertEqual(len(comments), 25) + + # There should be 15 Comment entries with the text 'Message 1' + comments_with_message_1 = ( + self.store.session.query(self.store.db_classes.Comment) + .filter(self.store.db_classes.Comment.content == "Message 1") + .all() + ) + + assert len(comments_with_message_1) == 15 + + # The first one should have a timestamp of 1995-12-12 05:00 + assert comments_with_message_1[0].time == datetime(1995, 12, 12, 5, 0) + + # The last one should have a timestamp of 1995-12-13 05:17 + assert comments_with_message_1[-1].time == datetime(1995, 12, 13, 5, 17) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_word_narrative_importer.py b/tests/test_word_narrative_importer.py new file mode 100644 index 000000000..c3b2b5f68 --- /dev/null +++ b/tests/test_word_narrative_importer.py @@ -0,0 +1,177 @@ +from datetime import datetime + +import pytest + +from importers.word_narrative_importer import WordNarrativeImporter + + +@pytest.mark.parametrize( + "input, last_day, last_month, last_year, timestamp", + [ + pytest.param( + "141030", + 14, + 7, + 2019, + datetime(2019, 7, 14, 10, 30), + id="valid timestamp with days matching", + ), + pytest.param( + "151030", + 14, + 7, + 2019, + datetime(2019, 7, 15, 10, 30), + id="valid timestamp with day one more", + ), + pytest.param( + "011030", 30, 7, 2019, datetime(2019, 8, 1, 10, 30), id="end of month rollover" + ), + pytest.param( + "011030", 28, 12, 2019, datetime(2020, 1, 1, 10, 30), id="end of year rollover" + ), + pytest.param("1030", 28, 12, 2019, datetime(2019, 12, 28, 10, 30), id="four digit"), + ], +) +def test_singlepart_datetime_parsing_valid(input, last_day, last_month, last_year, timestamp): + imp = WordNarrativeImporter() + imp.errors = [] + + imp.last_day = last_day + imp.last_month = last_month + imp.last_year = last_year + + output_timestamp = imp.parse_singlepart_datetime(input) + + assert output_timestamp == timestamp + + +@pytest.mark.parametrize( + "input, last_day, last_month, last_year", + [ + pytest.param("141030", None, 7, 2019, id="missing last_day"), + pytest.param("151030", 14, None, 2019, id="missing last_month"), + pytest.param("011030", 30, 7, None, id="missing last_year"), + pytest.param("991030", 28, 12, 2019, id="invalid day"), + pytest.param("019930", 28, 12, 2019, id="invalid hour"), + pytest.param("011099", 28, 12, 2019, id="invalid min"), + pytest.param("9930", 28, 12, 2019, id="four digit invalid hour"), + pytest.param("1099", 28, 12, 2019, id="four digit invalid min"), + pytest.param("", 28, 12, 2019, id="empty"), + pytest.param("123456789", 28, 12, 2019, id="too long"), + ], +) +def test_singlepart_datetime_parsing_invalid(input, last_day, last_month, last_year): + imp = WordNarrativeImporter() + imp.errors = [] + + imp.last_day = last_day + imp.last_month = last_month + imp.last_year = last_year + + with pytest.raises(ValueError): + _ = imp.parse_singlepart_datetime(input) + + +@pytest.mark.parametrize( + "input,timestamp", + [ + pytest.param( + ["041014", "04", "07", "2020"], datetime(2020, 7, 4, 10, 14), id="valid timestamp" + ), + pytest.param(["041014", "4", "7", "2020"], datetime(2020, 7, 4, 10, 14), id="single chars"), + pytest.param( + ["041014", "4", "7", "20"], datetime(2020, 7, 4, 10, 14), id="two digit year 20" + ), + pytest.param( + ["041014", "4", "7", "85"], datetime(1985, 7, 4, 10, 14), id="two digit year 85" + ), + pytest.param( + ["042314", "05", "07", "2020"], + datetime(2020, 7, 4, 23, 14), + id="near midnight mismatch", + ), + ], +) +def test_multipart_datetime_parsing_valid_sixfig(input, timestamp): + imp = WordNarrativeImporter() + imp.errors = [] + + output_timestamp, error = imp.parse_multipart_datetime(input, four_fig=False) + + assert not error + assert output_timestamp == timestamp + + +@pytest.mark.parametrize( + "input,timestamp", + [ + pytest.param( + ["1014", "04", "07", "2020"], datetime(2020, 7, 4, 10, 14), id="valid timestamp" + ), + pytest.param(["1014", "4", "7", "2020"], datetime(2020, 7, 4, 10, 14), id="single chars"), + pytest.param( + ["1014", "4", "7", "20"], datetime(2020, 7, 4, 10, 14), id="two digit year 20" + ), + pytest.param( + ["1014", "4", "7", "85"], datetime(1985, 7, 4, 10, 14), id="two digit year 85" + ), + ], +) +def test_multipart_datetime_parsing_valid_fourfig(input, timestamp): + imp = WordNarrativeImporter() + imp.errors = [] + + output_timestamp, error = imp.parse_multipart_datetime(input, four_fig=True) + + assert not error + assert output_timestamp == timestamp + + +@pytest.mark.parametrize( + "input,timestamp", + [ + pytest.param( + ["041014", "08", "07", "2020"], datetime(2020, 7, 4, 10, 14), id="mismatch day" + ), + pytest.param(["991014", "99", "7", "2020"], datetime(2020, 7, 4, 10, 14), id="invalid day"), + pytest.param(["041014", "4", "99", "20"], datetime(2020, 7, 4, 10, 14), id="invalid month"), + pytest.param( + ["041014", "4", "7", "-1234"], datetime(1985, 7, 4, 10, 14), id="invalid year" + ), + pytest.param( + ["049914", "04", "07", "2020"], datetime(2020, 7, 4, 23, 14), id="invalid hour" + ), + pytest.param( + ["041099", "04", "07", "2020"], datetime(2020, 7, 4, 23, 14), id="invalid minute" + ), + ], +) +def test_multipart_datetime_parsing_invalid_sixfig(input, timestamp): + imp = WordNarrativeImporter() + imp.errors = [] + + output_timestamp, error = imp.parse_multipart_datetime(input, four_fig=False) + + assert error + + +@pytest.mark.parametrize( + "input,timestamp", + [ + pytest.param(["1014", "99", "7", "2020"], datetime(2020, 7, 4, 10, 14), id="invalid day"), + pytest.param(["1014", "4", "99", "20"], datetime(2020, 7, 4, 10, 14), id="invalid month"), + pytest.param(["1014", "4", "7", "-1234"], datetime(1985, 7, 4, 10, 14), id="invalid year"), + pytest.param(["9914", "04", "07", "2020"], datetime(2020, 7, 4, 23, 14), id="invalid hour"), + pytest.param( + ["1099", "04", "07", "2020"], datetime(2020, 7, 4, 23, 14), id="invalid minute" + ), + ], +) +def test_multipart_datetime_parsing_invalid_fourfig(input, timestamp): + imp = WordNarrativeImporter() + imp.errors = [] + + output_timestamp, error = imp.parse_multipart_datetime(input, four_fig=True) + + assert error