mirror of
https://github.com/eitchtee/WYGIWYH.git
synced 2026-04-19 15:21:31 +02:00
@@ -106,6 +106,17 @@ class ExcelImportSettings(BaseModel):
|
||||
sheets: list[str] | str = "*"
|
||||
|
||||
|
||||
class QIFImportSettings(BaseModel):
|
||||
skip_errors: bool = Field(
|
||||
default=False,
|
||||
description="If True, errors during import will be logged and skipped",
|
||||
)
|
||||
file_type: Literal["qif"] = "qif"
|
||||
importing: Literal["transactions"] = "transactions"
|
||||
encoding: str = Field(default="utf-8", description="File encoding")
|
||||
date_format: str = Field(..., description="Date format (e.g. %d/%m/%Y)")
|
||||
|
||||
|
||||
class ColumnMapping(BaseModel):
|
||||
source: Optional[str] | Optional[list[str]] = Field(
|
||||
default=None,
|
||||
@@ -342,7 +353,7 @@ class CurrencyExchangeMapping(ColumnMapping):
|
||||
|
||||
|
||||
class ImportProfileSchema(BaseModel):
|
||||
settings: CSVImportSettings | ExcelImportSettings
|
||||
settings: CSVImportSettings | ExcelImportSettings | QIFImportSettings
|
||||
mapping: Dict[
|
||||
str,
|
||||
TransactionAccountMapping
|
||||
|
||||
@@ -3,6 +3,8 @@ import hashlib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import zipfile
|
||||
from django.db import transaction
|
||||
from datetime import datetime, date
|
||||
from decimal import Decimal, InvalidOperation
|
||||
from typing import Dict, Any, Literal, Union
|
||||
@@ -845,6 +847,219 @@ class ImportService:
|
||||
f"Invalid {self.settings.file_type.upper()} file format: {str(e)}"
|
||||
)
|
||||
|
||||
def _parse_and_import_qif(self, content_lines: list[str], filename: str) -> None:
|
||||
# Infer account from filename (remove extension)
|
||||
account_name = os.path.splitext(os.path.basename(filename))[0]
|
||||
|
||||
current_transaction = {}
|
||||
raw_lines_buffer = []
|
||||
|
||||
account = Account.objects.filter(name=account_name).first()
|
||||
if not account:
|
||||
raise ValueError(f"Account '{account_name}' not found.")
|
||||
|
||||
row_number = 0
|
||||
for line in content_lines:
|
||||
row_number += 1
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
raw_lines_buffer.append(line)
|
||||
|
||||
if line == "^":
|
||||
if current_transaction:
|
||||
# Deduplication using hash of raw lines
|
||||
raw_content = "".join(raw_lines_buffer)
|
||||
internal_id = hashlib.sha256(
|
||||
raw_content.encode("utf-8")
|
||||
).hexdigest()
|
||||
|
||||
# Reset buffer for next transaction
|
||||
raw_lines_buffer = []
|
||||
|
||||
try:
|
||||
with transaction.atomic():
|
||||
if Transaction.objects.filter(
|
||||
internal_id=internal_id
|
||||
).exists():
|
||||
self._increment_totals("skipped", 1)
|
||||
self._log(
|
||||
"info",
|
||||
f"Skipped duplicate transaction from {filename}",
|
||||
)
|
||||
current_transaction = {}
|
||||
continue
|
||||
|
||||
# Handle Account
|
||||
if account:
|
||||
current_transaction["account"] = account
|
||||
else:
|
||||
acc = Account.objects.filter(name=account_name).first()
|
||||
if acc:
|
||||
current_transaction["account"] = acc
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Account '{account_name}' not found."
|
||||
)
|
||||
|
||||
current_transaction["internal_id"] = internal_id
|
||||
|
||||
# Handle Description/Memo mapping
|
||||
if "memo" in current_transaction:
|
||||
current_transaction["description"] = (
|
||||
current_transaction.pop("memo")
|
||||
)
|
||||
|
||||
# Handle Payee mapping
|
||||
entities = []
|
||||
if "payee" in current_transaction:
|
||||
payee_name = current_transaction.pop("payee")
|
||||
# "Treat the payee (P) as the entity. Use existing or create"
|
||||
entity, _ = TransactionEntity.objects.get_or_create(
|
||||
name=payee_name
|
||||
)
|
||||
entities.append(entity)
|
||||
|
||||
# Handle Label/Category
|
||||
category = None
|
||||
tags = []
|
||||
if "label" in current_transaction:
|
||||
label = current_transaction.pop("label")
|
||||
if label.startswith("[") and label.endswith("]"):
|
||||
# Transfer: set label as description, ignore category/tags
|
||||
clean_label = label[1:-1]
|
||||
current_transaction["description"] = clean_label
|
||||
else:
|
||||
parts = label.split(":")
|
||||
if parts:
|
||||
cat_name = parts[0].strip()
|
||||
if cat_name:
|
||||
category, _ = (
|
||||
TransactionCategory.objects.get_or_create(
|
||||
name=cat_name
|
||||
)
|
||||
)
|
||||
|
||||
if len(parts) > 1:
|
||||
for tag_name in parts[1:]:
|
||||
tag_name = tag_name.strip()
|
||||
if tag_name:
|
||||
tag, _ = (
|
||||
TransactionTag.objects.get_or_create(
|
||||
name=tag_name
|
||||
)
|
||||
)
|
||||
tags.append(tag)
|
||||
|
||||
current_transaction["category"] = category
|
||||
|
||||
# Create transaction
|
||||
new_trans = Transaction.objects.create(
|
||||
**current_transaction
|
||||
)
|
||||
if entities:
|
||||
new_trans.entities.set(entities)
|
||||
if tags:
|
||||
new_trans.tags.set(tags)
|
||||
|
||||
self.import_run.transactions.add(new_trans)
|
||||
self._increment_totals("successful", 1)
|
||||
|
||||
except Exception as e:
|
||||
if not self.settings.skip_errors:
|
||||
raise e
|
||||
self._log(
|
||||
"warning",
|
||||
f"Error processing transaction in {filename}: {str(e)}",
|
||||
)
|
||||
self._increment_totals("failed", 1)
|
||||
|
||||
# Reset for next transaction
|
||||
current_transaction = {}
|
||||
else:
|
||||
# Empty transaction record (orphaned ^)
|
||||
raw_lines_buffer = []
|
||||
pass
|
||||
self._increment_totals("processed", 1)
|
||||
continue
|
||||
|
||||
if line.startswith("!"):
|
||||
continue
|
||||
|
||||
code = line[0]
|
||||
value = line[1:]
|
||||
|
||||
if code == "D":
|
||||
try:
|
||||
current_transaction["date"] = datetime.strptime(
|
||||
value, self.settings.date_format
|
||||
).date()
|
||||
except ValueError:
|
||||
self._log(
|
||||
"warning",
|
||||
f"Could not parse date '{value}' using format '{self.settings.date_format}' in {filename}",
|
||||
)
|
||||
if not self.settings.skip_errors:
|
||||
raise ValueError(f"Invalid date format '{value}'")
|
||||
|
||||
elif code == "T":
|
||||
try:
|
||||
cleaned_value = value.replace(",", "")
|
||||
amount = Decimal(cleaned_value)
|
||||
if amount < 0:
|
||||
current_transaction["type"] = Transaction.Type.EXPENSE
|
||||
current_transaction["amount"] = abs(amount)
|
||||
else:
|
||||
current_transaction["type"] = Transaction.Type.INCOME
|
||||
current_transaction["amount"] = amount
|
||||
except InvalidOperation:
|
||||
self._log(
|
||||
"warning", f"Could not parse amount '{value}' in {filename}"
|
||||
)
|
||||
if not self.settings.skip_errors:
|
||||
raise ValueError(f"Invalid amount format '{value}'")
|
||||
|
||||
elif code == "P":
|
||||
current_transaction["payee"] = value
|
||||
elif code == "M":
|
||||
current_transaction["memo"] = value
|
||||
elif code == "L":
|
||||
current_transaction["label"] = value
|
||||
elif code == "N":
|
||||
pass
|
||||
|
||||
def _process_qif(self, file_path):
|
||||
def process_logic():
|
||||
if zipfile.is_zipfile(file_path):
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, "r") as zf:
|
||||
for filename in zf.namelist():
|
||||
if filename.lower().endswith(
|
||||
".qif"
|
||||
) and not filename.startswith("__MACOSX"):
|
||||
self._log(
|
||||
"info", f"Processing QIF from ZIP: {filename}"
|
||||
)
|
||||
with zf.open(filename) as f:
|
||||
content = f.read().decode(self.settings.encoding)
|
||||
self._parse_and_import_qif(
|
||||
content.splitlines(), filename
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error processing ZIP file: {str(e)}")
|
||||
else:
|
||||
with open(file_path, "r", encoding=self.settings.encoding) as f:
|
||||
self._parse_and_import_qif(
|
||||
f.readlines(), os.path.basename(file_path)
|
||||
)
|
||||
|
||||
if not self.settings.skip_errors:
|
||||
with transaction.atomic():
|
||||
process_logic()
|
||||
else:
|
||||
process_logic()
|
||||
|
||||
def _validate_file_path(self, file_path: str) -> str:
|
||||
"""
|
||||
Validates that the file path is within the allowed temporary directory.
|
||||
@@ -871,6 +1086,8 @@ class ImportService:
|
||||
self._process_csv(file_path)
|
||||
elif isinstance(self.settings, version_1.ExcelImportSettings):
|
||||
self._process_excel(file_path)
|
||||
elif isinstance(self.settings, version_1.QIFImportSettings):
|
||||
self._process_qif(file_path)
|
||||
|
||||
self._update_status("FINISHED")
|
||||
self._log(
|
||||
|
||||
259
app/apps/import_app/tests/test_qif_import.py
Normal file
259
app/apps/import_app/tests/test_qif_import.py
Normal file
@@ -0,0 +1,259 @@
|
||||
from decimal import Decimal
|
||||
import os
|
||||
import shutil
|
||||
from django.test import TestCase
|
||||
from django.contrib.auth import get_user_model
|
||||
from apps.accounts.models import Account, AccountGroup
|
||||
from apps.currencies.models import Currency
|
||||
from apps.common.middleware.thread_local import write_current_user, delete_current_user
|
||||
from apps.import_app.models import ImportProfile, ImportRun
|
||||
from apps.import_app.services.v1 import ImportService
|
||||
from apps.transactions.models import (
|
||||
Transaction,
|
||||
)
|
||||
|
||||
|
||||
class QIFImportTests(TestCase):
|
||||
def setUp(self):
|
||||
# Patch TEMP_DIR for testing
|
||||
self.original_temp_dir = ImportService.TEMP_DIR
|
||||
self.test_dir = os.path.abspath("temp_test_import")
|
||||
ImportService.TEMP_DIR = self.test_dir
|
||||
os.makedirs(self.test_dir, exist_ok=True)
|
||||
|
||||
# Create user and set context
|
||||
User = get_user_model()
|
||||
self.user = User.objects.create_user(
|
||||
email="test@example.com", password="password"
|
||||
)
|
||||
write_current_user(self.user)
|
||||
|
||||
self.currency = Currency.objects.create(
|
||||
code="BRL", name="Real", decimal_places=2, prefix="R$ "
|
||||
)
|
||||
self.group = AccountGroup.objects.create(name="Test Group", owner=self.user)
|
||||
self.account = Account.objects.create(
|
||||
name="bradesco-checking",
|
||||
group=self.group,
|
||||
currency=self.currency,
|
||||
owner=self.user,
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
delete_current_user()
|
||||
ImportService.TEMP_DIR = self.original_temp_dir
|
||||
if os.path.exists(self.test_dir):
|
||||
shutil.rmtree(self.test_dir)
|
||||
|
||||
def test_import_single_qif_valid_mapping(self):
|
||||
content = """!Type:Bank
|
||||
D04/01/2015
|
||||
T8069.46
|
||||
PMy Payee -> Entity
|
||||
MNote -> Desc
|
||||
LOld Cat:New Tag
|
||||
^
|
||||
D05/01/2015
|
||||
T-100.00
|
||||
PSupermarket
|
||||
MWeekly shopping
|
||||
L[Transfer]
|
||||
^
|
||||
"""
|
||||
filename = "bradesco-checking.qif"
|
||||
file_path = os.path.join(self.test_dir, filename)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
yaml_config = """
|
||||
settings:
|
||||
file_type: qif
|
||||
importing: transactions
|
||||
date_format: "%d/%m/%Y"
|
||||
mapping: {}
|
||||
"""
|
||||
profile = ImportProfile.objects.create(
|
||||
name="QIF Profile",
|
||||
yaml_config=yaml_config,
|
||||
version=ImportProfile.Versions.VERSION_1,
|
||||
)
|
||||
run = ImportRun.objects.create(profile=profile, file_name=filename)
|
||||
service = ImportService(run)
|
||||
|
||||
service.process_file(file_path)
|
||||
|
||||
self.assertEqual(Transaction.objects.count(), 2)
|
||||
|
||||
# Transaction 1: Income, Category+Tag
|
||||
t1 = Transaction.objects.get(description="Note -> Desc")
|
||||
self.assertEqual(t1.amount, Decimal("8069.46"))
|
||||
self.assertEqual(t1.type, Transaction.Type.INCOME)
|
||||
self.assertEqual(t1.category.name, "Old Cat")
|
||||
self.assertTrue(t1.tags.filter(name="New Tag").exists())
|
||||
self.assertTrue(t1.entities.filter(name="My Payee -> Entity").exists())
|
||||
self.assertEqual(t1.account, self.account)
|
||||
|
||||
# Transaction 2: Expense, Transfer ([Transfer] -> Description)
|
||||
t2 = Transaction.objects.get(description="Transfer")
|
||||
self.assertEqual(t2.amount, Decimal("100.00"))
|
||||
self.assertEqual(t2.type, Transaction.Type.EXPENSE)
|
||||
self.assertIsNone(t2.category)
|
||||
self.assertFalse(t2.tags.exists())
|
||||
self.assertTrue(t2.entities.filter(name="Supermarket").exists())
|
||||
self.assertEqual(t2.description, "Transfer")
|
||||
|
||||
def test_import_deduplication_hash(self):
|
||||
# Same content twice. Should result in only 1 transaction due to hash deduplication.
|
||||
content = """!Type:Bank
|
||||
D04/01/2015
|
||||
T100.00
|
||||
POK
|
||||
^
|
||||
"""
|
||||
filename = "bradesco-checking.qif"
|
||||
file_path = os.path.join(self.test_dir, filename)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
yaml_config = """
|
||||
settings:
|
||||
file_type: qif
|
||||
importing: transactions
|
||||
date_format: "%d/%m/%Y"
|
||||
mapping: {}
|
||||
"""
|
||||
profile = ImportProfile.objects.create(
|
||||
name="QIF Profile",
|
||||
yaml_config=yaml_config,
|
||||
version=ImportProfile.Versions.VERSION_1,
|
||||
)
|
||||
run = ImportRun.objects.create(profile=profile, file_name=filename)
|
||||
service = ImportService(run)
|
||||
|
||||
# First run
|
||||
service.process_file(file_path)
|
||||
self.assertEqual(Transaction.objects.count(), 1)
|
||||
|
||||
# Service deletes file after processing, so recreate it for second run
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
# Second run - Duplicate content
|
||||
service.process_file(file_path)
|
||||
self.assertEqual(Transaction.objects.count(), 1)
|
||||
|
||||
def test_import_strict_error_rollback(self):
|
||||
# atomic check.
|
||||
# Transaction 1 valid, Transaction 2 invalid date.
|
||||
content = """!Type:Bank
|
||||
D04/01/2015
|
||||
T100.00
|
||||
POK
|
||||
^
|
||||
DINVALID
|
||||
T100.00
|
||||
PBad
|
||||
^
|
||||
"""
|
||||
filename = "bradesco-checking.qif"
|
||||
file_path = os.path.join(self.test_dir, filename)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
yaml_config = """
|
||||
settings:
|
||||
file_type: qif
|
||||
importing: transactions
|
||||
date_format: "%d/%m/%Y"
|
||||
skip_errors: false
|
||||
mapping: {}
|
||||
"""
|
||||
profile = ImportProfile.objects.create(
|
||||
name="QIF Profile",
|
||||
yaml_config=yaml_config,
|
||||
version=ImportProfile.Versions.VERSION_1,
|
||||
)
|
||||
run = ImportRun.objects.create(profile=profile, file_name=filename)
|
||||
service = ImportService(run)
|
||||
|
||||
with self.assertRaises(Exception) as cm:
|
||||
service.process_file(file_path)
|
||||
self.assertEqual(str(cm.exception), "Import failed")
|
||||
|
||||
# Should be 0 transactions because of atomic rollback
|
||||
self.assertEqual(Transaction.objects.count(), 0)
|
||||
|
||||
def test_import_missing_account(self):
|
||||
# File with account name that doesn't exist
|
||||
content = """!Type:Bank
|
||||
D04/01/2015
|
||||
T100.00
|
||||
POK
|
||||
^
|
||||
"""
|
||||
filename = "missing-account.qif"
|
||||
file_path = os.path.join(self.test_dir, filename)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
yaml_config = """
|
||||
settings:
|
||||
file_type: qif
|
||||
importing: transactions
|
||||
date_format: "%d/%m/%Y"
|
||||
mapping: {}
|
||||
"""
|
||||
profile = ImportProfile.objects.create(
|
||||
name="QIF Profile",
|
||||
yaml_config=yaml_config,
|
||||
version=ImportProfile.Versions.VERSION_1,
|
||||
)
|
||||
run = ImportRun.objects.create(profile=profile, file_name=filename)
|
||||
service = ImportService(run)
|
||||
|
||||
# Should fail because account doesn't exist
|
||||
with self.assertRaises(Exception) as cm:
|
||||
service.process_file(file_path)
|
||||
self.assertEqual(str(cm.exception), "Import failed")
|
||||
|
||||
def test_import_skip_errors(self):
|
||||
# skip_errors: true.
|
||||
# Transaction 1 valid, Transaction 2 invalid date.
|
||||
content = """!Type:Bank
|
||||
D04/01/2015
|
||||
T100.00
|
||||
POK
|
||||
^
|
||||
DINVALID
|
||||
T100.00
|
||||
PBad
|
||||
^
|
||||
"""
|
||||
filename = "bradesco-checking.qif"
|
||||
file_path = os.path.join(self.test_dir, filename)
|
||||
with open(file_path, "w", encoding="utf-8") as f:
|
||||
f.write(content)
|
||||
|
||||
yaml_config = """
|
||||
settings:
|
||||
file_type: qif
|
||||
importing: transactions
|
||||
date_format: "%d/%m/%Y"
|
||||
skip_errors: true
|
||||
mapping: {}
|
||||
"""
|
||||
profile = ImportProfile.objects.create(
|
||||
name="QIF Profile",
|
||||
yaml_config=yaml_config,
|
||||
version=ImportProfile.Versions.VERSION_1,
|
||||
)
|
||||
run = ImportRun.objects.create(profile=profile, file_name=filename)
|
||||
service = ImportService(run)
|
||||
|
||||
service.process_file(file_path)
|
||||
|
||||
# Should be 1 transaction (valid one)
|
||||
self.assertEqual(Transaction.objects.count(), 1)
|
||||
self.assertEqual(
|
||||
Transaction.objects.first().description, ""
|
||||
) # empty desc if no memo
|
||||
9
app/import_presets/qif_standard/config.yml
Normal file
9
app/import_presets/qif_standard/config.yml
Normal file
@@ -0,0 +1,9 @@
|
||||
settings:
|
||||
file_type: qif
|
||||
importing: transactions
|
||||
date_format: "%d/%m/%Y"
|
||||
skip_errors: true
|
||||
|
||||
mapping: {}
|
||||
|
||||
deduplicate: []
|
||||
7
app/import_presets/qif_standard/manifest.json
Normal file
7
app/import_presets/qif_standard/manifest.json
Normal file
@@ -0,0 +1,7 @@
|
||||
{
|
||||
"author": "eitchtee",
|
||||
"description": "Standard QIF Import. Mapping is automatic.",
|
||||
"schema_version": 1,
|
||||
"name": "Standard QIF",
|
||||
"message": "Account is inferred from filename (e.g., 'Checking.qif' -> Account 'Checking').\nYou might need to change the date format to match the date format on your file."
|
||||
}
|
||||
Reference in New Issue
Block a user