From 6ec5b5df1eb681d6ee2905f6566c4abda3297c77 Mon Sep 17 00:00:00 2001 From: Herculino Trotta Date: Sun, 9 Feb 2025 00:51:26 -0300 Subject: [PATCH] feat(import:v1): add XLS and XLSX support Closes #47 --- app/apps/import_app/schemas/v1.py | 16 ++- app/apps/import_app/services/v1.py | 177 ++++++++++++++++++++++++++++- requirements.txt | 2 + 3 files changed, 190 insertions(+), 5 deletions(-) diff --git a/app/apps/import_app/schemas/v1.py b/app/apps/import_app/schemas/v1.py index e5f5b07..5b735d5 100644 --- a/app/apps/import_app/schemas/v1.py +++ b/app/apps/import_app/schemas/v1.py @@ -92,6 +92,20 @@ class CSVImportSettings(BaseModel): ] +class ExcelImportSettings(BaseModel): + skip_errors: bool = Field( + default=False, + description="If True, errors during import will be logged and skipped", + ) + file_type: Literal["xls", "xlsx"] + trigger_transaction_rules: bool = True + importing: Literal[ + "transactions", "accounts", "currencies", "categories", "tags", "entities" + ] + start_row: int = Field(default=1, description="Where your header is located") + sheets: list[str] | str = "*" + + class ColumnMapping(BaseModel): source: Optional[str] | Optional[list[str]] = Field( default=None, @@ -328,7 +342,7 @@ class CurrencyExchangeMapping(ColumnMapping): class ImportProfileSchema(BaseModel): - settings: CSVImportSettings + settings: CSVImportSettings | ExcelImportSettings mapping: Dict[ str, TransactionAccountMapping diff --git a/app/apps/import_app/services/v1.py b/app/apps/import_app/services/v1.py index e3daaf5..8cff32f 100644 --- a/app/apps/import_app/services/v1.py +++ b/app/apps/import_app/services/v1.py @@ -3,13 +3,16 @@ import hashlib import logging import os import re -from datetime import datetime +from datetime import datetime, date from decimal import Decimal, InvalidOperation from typing import Dict, Any, Literal, Union +import openpyxl +import xlrd import yaml from cachalot.api import cachalot_disabled from django.utils import timezone +from openpyxl.utils.exceptions import InvalidFileException from apps.accounts.models import Account, AccountGroup from apps.currencies.models import Currency @@ -39,7 +42,9 @@ class ImportService: self.import_run: ImportRun = import_run self.profile: ImportProfile = import_run.profile self.config: version_1.ImportProfileSchema = self._load_config() - self.settings: version_1.CSVImportSettings = self.config.settings + self.settings: version_1.CSVImportSettings | version_1.ExcelImportSettings = ( + self.config.settings + ) self.deduplication: list[version_1.CompareDeduplicationRule] = ( self.config.deduplication ) @@ -74,6 +79,13 @@ class ImportService: self.import_run.logs += log_line self.import_run.save(update_fields=["logs"]) + if level == "info": + logger.info(log_line) + elif level == "warning": + logger.warning(log_line) + elif level == "error": + logger.error(log_line, exc_info=True) + def _update_totals( self, field: Literal["total", "processed", "successful", "skipped", "failed"], @@ -163,6 +175,7 @@ class ImportService: transformed = transformed.replace( transform.pattern, transform.replacement ) + elif transform.type == "regex": if transform.exclusive: transformed = re.sub( @@ -172,10 +185,12 @@ class ImportService: transformed = re.sub( transform.pattern, transform.replacement, transformed ) + elif transform.type == "date_format": transformed = datetime.strptime( transformed, transform.original_format ).strftime(transform.new_format) + elif transform.type == "merge": values_to_merge = [] for field in transform.fields: @@ -188,12 +203,14 @@ class ImportService: ): values_to_merge.append(str(mapped_data[field[2:]])) transformed = transform.separator.join(values_to_merge) + elif transform.type == "split": parts = transformed.split(transform.separator) if transform.index is not None: transformed = parts[transform.index] if parts else "" else: transformed = parts + elif transform.type in ["add", "subtract"]: try: source_value = Decimal(transformed) @@ -442,7 +459,7 @@ class ImportService: def _coerce_type( self, value: str, mapping: version_1.ColumnMapping - ) -> Union[str, int, bool, Decimal, datetime, list]: + ) -> Union[str, int, bool, Decimal, datetime, list, None]: if not value: return None @@ -477,6 +494,11 @@ class ImportService: version_1.TransactionReferenceDateMapping, ), ): + if isinstance(value, datetime): + return value.date() + elif isinstance(value, date): + return value + formats = ( mapping.format if isinstance(mapping.format, list) @@ -646,6 +668,151 @@ class ImportService: for row_number, row in enumerate(reader, start=1): self._process_row(row, row_number) + def _process_excel(self, file_path): + try: + if self.settings.file_type == "xlsx": + workbook = openpyxl.load_workbook( + file_path, read_only=True, data_only=True + ) + sheets_to_process = ( + workbook.sheetnames + if self.settings.sheets == "*" + else ( + self.settings.sheets + if isinstance(self.settings.sheets, list) + else [self.settings.sheets] + ) + ) + + # Calculate total rows + total_rows = sum( + max(0, workbook[sheet_name].max_row - self.settings.start_row) + for sheet_name in sheets_to_process + if sheet_name in workbook.sheetnames + ) + self._update_totals("total", value=total_rows) + + # Process sheets + for sheet_name in sheets_to_process: + if sheet_name not in workbook.sheetnames: + self._log( + "warning", + f"Sheet '{sheet_name}' not found in the Excel file. Skipping.", + ) + continue + + sheet = workbook[sheet_name] + self._log("info", f"Processing sheet: {sheet_name}") + headers = [ + str(cell.value or "") for cell in sheet[self.settings.start_row] + ] + + for row_number, row in enumerate( + sheet.iter_rows( + min_row=self.settings.start_row + 1, values_only=True + ), + start=1, + ): + try: + row_data = { + key: str(value) if value is not None else None + for key, value in zip(headers, row) + } + self._process_row(row_data, row_number) + except Exception as e: + if self.settings.skip_errors: + self._log( + "warning", + f"Error processing row {row_number} in sheet '{sheet_name}': {str(e)}", + ) + self._increment_totals("failed", value=1) + else: + raise + + workbook.close() + + else: # xls + workbook = xlrd.open_workbook(file_path) + sheets_to_process = ( + workbook.sheet_names() + if self.settings.sheets == "*" + else ( + self.settings.sheets + if isinstance(self.settings.sheets, list) + else [self.settings.sheets] + ) + ) + # Calculate total rows + total_rows = sum( + max( + 0, + workbook.sheet_by_name(sheet_name).nrows + - self.settings.start_row, + ) + for sheet_name in sheets_to_process + if sheet_name in workbook.sheet_names() + ) + self._update_totals("total", value=total_rows) + # Process sheets + for sheet_name in sheets_to_process: + if sheet_name not in workbook.sheet_names(): + self._log( + "warning", + f"Sheet '{sheet_name}' not found in the Excel file. Skipping.", + ) + continue + sheet = workbook.sheet_by_name(sheet_name) + self._log("info", f"Processing sheet: {sheet_name}") + headers = [ + str(sheet.cell_value(self.settings.start_row - 1, col) or "") + for col in range(sheet.ncols) + ] + for row_number in range(self.settings.start_row, sheet.nrows): + try: + row_data = {} + for col, key in enumerate(headers): + cell_type = sheet.cell_type(row_number, col) + cell_value = sheet.cell_value(row_number, col) + + if cell_type == xlrd.XL_CELL_DATE: + # Convert Excel date to Python datetime + try: + python_date = datetime( + *xlrd.xldate_as_tuple( + cell_value, workbook.datemode + ) + ) + row_data[key] = python_date + except Exception: + # If date conversion fails, use the original value + row_data[key] = ( + str(cell_value) + if cell_value is not None + else None + ) + elif cell_value is None: + row_data[key] = None + else: + row_data[key] = str(cell_value) + + self._process_row( + row_data, row_number - self.settings.start_row + 1 + ) + except Exception as e: + if self.settings.skip_errors: + self._log( + "warning", + f"Error processing row {row_number} in sheet '{sheet_name}': {str(e)}", + ) + self._increment_totals("failed", value=1) + else: + raise + + except (InvalidFileException, xlrd.XLRDError) as e: + raise ValueError( + f"Invalid {self.settings.file_type.upper()} file format: {str(e)}" + ) + def _validate_file_path(self, file_path: str) -> str: """ Validates that the file path is within the allowed temporary directory. @@ -668,8 +835,10 @@ class ImportService: self._log("info", "Starting import process") try: - if self.settings.file_type == "csv": + if isinstance(self.settings, version_1.CSVImportSettings): self._process_csv(file_path) + elif isinstance(self.settings, version_1.ExcelImportSettings): + self._process_excel(file_path) self._update_status("FINISHED") self._log( diff --git a/requirements.txt b/requirements.txt index 58c34db..b7e8ae1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,5 @@ simpleeval~=1.0.0 pydantic~=2.10.5 PyYAML~=6.0.2 mistune~=3.1.1 +openpyxl~=3.1 +xlrd~=2.0