feat(import:v1): add XLS and XLSX support

Closes #47
This commit is contained in:
Herculino Trotta
2025-02-09 00:51:26 -03:00
parent 37b5a43c1f
commit 6ec5b5df1e
3 changed files with 190 additions and 5 deletions

View File

@@ -92,6 +92,20 @@ class CSVImportSettings(BaseModel):
]
class ExcelImportSettings(BaseModel):
skip_errors: bool = Field(
default=False,
description="If True, errors during import will be logged and skipped",
)
file_type: Literal["xls", "xlsx"]
trigger_transaction_rules: bool = True
importing: Literal[
"transactions", "accounts", "currencies", "categories", "tags", "entities"
]
start_row: int = Field(default=1, description="Where your header is located")
sheets: list[str] | str = "*"
class ColumnMapping(BaseModel):
source: Optional[str] | Optional[list[str]] = Field(
default=None,
@@ -328,7 +342,7 @@ class CurrencyExchangeMapping(ColumnMapping):
class ImportProfileSchema(BaseModel):
settings: CSVImportSettings
settings: CSVImportSettings | ExcelImportSettings
mapping: Dict[
str,
TransactionAccountMapping

View File

@@ -3,13 +3,16 @@ import hashlib
import logging
import os
import re
from datetime import datetime
from datetime import datetime, date
from decimal import Decimal, InvalidOperation
from typing import Dict, Any, Literal, Union
import openpyxl
import xlrd
import yaml
from cachalot.api import cachalot_disabled
from django.utils import timezone
from openpyxl.utils.exceptions import InvalidFileException
from apps.accounts.models import Account, AccountGroup
from apps.currencies.models import Currency
@@ -39,7 +42,9 @@ class ImportService:
self.import_run: ImportRun = import_run
self.profile: ImportProfile = import_run.profile
self.config: version_1.ImportProfileSchema = self._load_config()
self.settings: version_1.CSVImportSettings = self.config.settings
self.settings: version_1.CSVImportSettings | version_1.ExcelImportSettings = (
self.config.settings
)
self.deduplication: list[version_1.CompareDeduplicationRule] = (
self.config.deduplication
)
@@ -74,6 +79,13 @@ class ImportService:
self.import_run.logs += log_line
self.import_run.save(update_fields=["logs"])
if level == "info":
logger.info(log_line)
elif level == "warning":
logger.warning(log_line)
elif level == "error":
logger.error(log_line, exc_info=True)
def _update_totals(
self,
field: Literal["total", "processed", "successful", "skipped", "failed"],
@@ -163,6 +175,7 @@ class ImportService:
transformed = transformed.replace(
transform.pattern, transform.replacement
)
elif transform.type == "regex":
if transform.exclusive:
transformed = re.sub(
@@ -172,10 +185,12 @@ class ImportService:
transformed = re.sub(
transform.pattern, transform.replacement, transformed
)
elif transform.type == "date_format":
transformed = datetime.strptime(
transformed, transform.original_format
).strftime(transform.new_format)
elif transform.type == "merge":
values_to_merge = []
for field in transform.fields:
@@ -188,12 +203,14 @@ class ImportService:
):
values_to_merge.append(str(mapped_data[field[2:]]))
transformed = transform.separator.join(values_to_merge)
elif transform.type == "split":
parts = transformed.split(transform.separator)
if transform.index is not None:
transformed = parts[transform.index] if parts else ""
else:
transformed = parts
elif transform.type in ["add", "subtract"]:
try:
source_value = Decimal(transformed)
@@ -442,7 +459,7 @@ class ImportService:
def _coerce_type(
self, value: str, mapping: version_1.ColumnMapping
) -> Union[str, int, bool, Decimal, datetime, list]:
) -> Union[str, int, bool, Decimal, datetime, list, None]:
if not value:
return None
@@ -477,6 +494,11 @@ class ImportService:
version_1.TransactionReferenceDateMapping,
),
):
if isinstance(value, datetime):
return value.date()
elif isinstance(value, date):
return value
formats = (
mapping.format
if isinstance(mapping.format, list)
@@ -646,6 +668,151 @@ class ImportService:
for row_number, row in enumerate(reader, start=1):
self._process_row(row, row_number)
def _process_excel(self, file_path):
try:
if self.settings.file_type == "xlsx":
workbook = openpyxl.load_workbook(
file_path, read_only=True, data_only=True
)
sheets_to_process = (
workbook.sheetnames
if self.settings.sheets == "*"
else (
self.settings.sheets
if isinstance(self.settings.sheets, list)
else [self.settings.sheets]
)
)
# Calculate total rows
total_rows = sum(
max(0, workbook[sheet_name].max_row - self.settings.start_row)
for sheet_name in sheets_to_process
if sheet_name in workbook.sheetnames
)
self._update_totals("total", value=total_rows)
# Process sheets
for sheet_name in sheets_to_process:
if sheet_name not in workbook.sheetnames:
self._log(
"warning",
f"Sheet '{sheet_name}' not found in the Excel file. Skipping.",
)
continue
sheet = workbook[sheet_name]
self._log("info", f"Processing sheet: {sheet_name}")
headers = [
str(cell.value or "") for cell in sheet[self.settings.start_row]
]
for row_number, row in enumerate(
sheet.iter_rows(
min_row=self.settings.start_row + 1, values_only=True
),
start=1,
):
try:
row_data = {
key: str(value) if value is not None else None
for key, value in zip(headers, row)
}
self._process_row(row_data, row_number)
except Exception as e:
if self.settings.skip_errors:
self._log(
"warning",
f"Error processing row {row_number} in sheet '{sheet_name}': {str(e)}",
)
self._increment_totals("failed", value=1)
else:
raise
workbook.close()
else: # xls
workbook = xlrd.open_workbook(file_path)
sheets_to_process = (
workbook.sheet_names()
if self.settings.sheets == "*"
else (
self.settings.sheets
if isinstance(self.settings.sheets, list)
else [self.settings.sheets]
)
)
# Calculate total rows
total_rows = sum(
max(
0,
workbook.sheet_by_name(sheet_name).nrows
- self.settings.start_row,
)
for sheet_name in sheets_to_process
if sheet_name in workbook.sheet_names()
)
self._update_totals("total", value=total_rows)
# Process sheets
for sheet_name in sheets_to_process:
if sheet_name not in workbook.sheet_names():
self._log(
"warning",
f"Sheet '{sheet_name}' not found in the Excel file. Skipping.",
)
continue
sheet = workbook.sheet_by_name(sheet_name)
self._log("info", f"Processing sheet: {sheet_name}")
headers = [
str(sheet.cell_value(self.settings.start_row - 1, col) or "")
for col in range(sheet.ncols)
]
for row_number in range(self.settings.start_row, sheet.nrows):
try:
row_data = {}
for col, key in enumerate(headers):
cell_type = sheet.cell_type(row_number, col)
cell_value = sheet.cell_value(row_number, col)
if cell_type == xlrd.XL_CELL_DATE:
# Convert Excel date to Python datetime
try:
python_date = datetime(
*xlrd.xldate_as_tuple(
cell_value, workbook.datemode
)
)
row_data[key] = python_date
except Exception:
# If date conversion fails, use the original value
row_data[key] = (
str(cell_value)
if cell_value is not None
else None
)
elif cell_value is None:
row_data[key] = None
else:
row_data[key] = str(cell_value)
self._process_row(
row_data, row_number - self.settings.start_row + 1
)
except Exception as e:
if self.settings.skip_errors:
self._log(
"warning",
f"Error processing row {row_number} in sheet '{sheet_name}': {str(e)}",
)
self._increment_totals("failed", value=1)
else:
raise
except (InvalidFileException, xlrd.XLRDError) as e:
raise ValueError(
f"Invalid {self.settings.file_type.upper()} file format: {str(e)}"
)
def _validate_file_path(self, file_path: str) -> str:
"""
Validates that the file path is within the allowed temporary directory.
@@ -668,8 +835,10 @@ class ImportService:
self._log("info", "Starting import process")
try:
if self.settings.file_type == "csv":
if isinstance(self.settings, version_1.CSVImportSettings):
self._process_csv(file_path)
elif isinstance(self.settings, version_1.ExcelImportSettings):
self._process_excel(file_path)
self._update_status("FINISHED")
self._log(

View File

@@ -27,3 +27,5 @@ simpleeval~=1.0.0
pydantic~=2.10.5
PyYAML~=6.0.2
mistune~=3.1.1
openpyxl~=3.1
xlrd~=2.0