mirror of
https://github.com/eitchtee/WYGIWYH.git
synced 2026-04-23 00:58:40 +02:00
feat: add QIF import
This commit is contained in:
@@ -3,6 +3,8 @@ import hashlib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import zipfile
|
||||
from django.db import transaction
|
||||
from datetime import datetime, date
|
||||
from decimal import Decimal, InvalidOperation
|
||||
from typing import Dict, Any, Literal, Union
|
||||
@@ -845,6 +847,219 @@ class ImportService:
|
||||
f"Invalid {self.settings.file_type.upper()} file format: {str(e)}"
|
||||
)
|
||||
|
||||
def _parse_and_import_qif(self, content_lines: list[str], filename: str) -> None:
|
||||
# Infer account from filename (remove extension)
|
||||
account_name = os.path.splitext(os.path.basename(filename))[0]
|
||||
|
||||
current_transaction = {}
|
||||
raw_lines_buffer = []
|
||||
|
||||
account = Account.objects.filter(name=account_name).first()
|
||||
if not account:
|
||||
raise ValueError(f"Account '{account_name}' not found.")
|
||||
|
||||
row_number = 0
|
||||
for line in content_lines:
|
||||
row_number += 1
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
raw_lines_buffer.append(line)
|
||||
|
||||
if line == "^":
|
||||
if current_transaction:
|
||||
# Deduplication using hash of raw lines
|
||||
raw_content = "".join(raw_lines_buffer)
|
||||
internal_id = hashlib.sha256(
|
||||
raw_content.encode("utf-8")
|
||||
).hexdigest()
|
||||
|
||||
# Reset buffer for next transaction
|
||||
raw_lines_buffer = []
|
||||
|
||||
try:
|
||||
with transaction.atomic():
|
||||
if Transaction.objects.filter(
|
||||
internal_id=internal_id
|
||||
).exists():
|
||||
self._increment_totals("skipped", 1)
|
||||
self._log(
|
||||
"info",
|
||||
f"Skipped duplicate transaction from {filename}",
|
||||
)
|
||||
current_transaction = {}
|
||||
continue
|
||||
|
||||
# Handle Account
|
||||
if account:
|
||||
current_transaction["account"] = account
|
||||
else:
|
||||
acc = Account.objects.filter(name=account_name).first()
|
||||
if acc:
|
||||
current_transaction["account"] = acc
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Account '{account_name}' not found."
|
||||
)
|
||||
|
||||
current_transaction["internal_id"] = internal_id
|
||||
|
||||
# Handle Description/Memo mapping
|
||||
if "memo" in current_transaction:
|
||||
current_transaction["description"] = (
|
||||
current_transaction.pop("memo")
|
||||
)
|
||||
|
||||
# Handle Payee mapping
|
||||
entities = []
|
||||
if "payee" in current_transaction:
|
||||
payee_name = current_transaction.pop("payee")
|
||||
# "Treat the payee (P) as the entity. Use existing or create"
|
||||
entity, _ = TransactionEntity.objects.get_or_create(
|
||||
name=payee_name
|
||||
)
|
||||
entities.append(entity)
|
||||
|
||||
# Handle Label/Category
|
||||
category = None
|
||||
tags = []
|
||||
if "label" in current_transaction:
|
||||
label = current_transaction.pop("label")
|
||||
if label.startswith("[") and label.endswith("]"):
|
||||
# Transfer: set label as description, ignore category/tags
|
||||
clean_label = label[1:-1]
|
||||
current_transaction["description"] = clean_label
|
||||
else:
|
||||
parts = label.split(":")
|
||||
if parts:
|
||||
cat_name = parts[0].strip()
|
||||
if cat_name:
|
||||
category, _ = (
|
||||
TransactionCategory.objects.get_or_create(
|
||||
name=cat_name
|
||||
)
|
||||
)
|
||||
|
||||
if len(parts) > 1:
|
||||
for tag_name in parts[1:]:
|
||||
tag_name = tag_name.strip()
|
||||
if tag_name:
|
||||
tag, _ = (
|
||||
TransactionTag.objects.get_or_create(
|
||||
name=tag_name
|
||||
)
|
||||
)
|
||||
tags.append(tag)
|
||||
|
||||
current_transaction["category"] = category
|
||||
|
||||
# Create transaction
|
||||
new_trans = Transaction.objects.create(
|
||||
**current_transaction
|
||||
)
|
||||
if entities:
|
||||
new_trans.entities.set(entities)
|
||||
if tags:
|
||||
new_trans.tags.set(tags)
|
||||
|
||||
self.import_run.transactions.add(new_trans)
|
||||
self._increment_totals("successful", 1)
|
||||
|
||||
except Exception as e:
|
||||
if not self.settings.skip_errors:
|
||||
raise e
|
||||
self._log(
|
||||
"warning",
|
||||
f"Error processing transaction in {filename}: {str(e)}",
|
||||
)
|
||||
self._increment_totals("failed", 1)
|
||||
|
||||
# Reset for next transaction
|
||||
current_transaction = {}
|
||||
else:
|
||||
# Empty transaction record (orphaned ^)
|
||||
raw_lines_buffer = []
|
||||
pass
|
||||
self._increment_totals("processed", 1)
|
||||
continue
|
||||
|
||||
if line.startswith("!"):
|
||||
continue
|
||||
|
||||
code = line[0]
|
||||
value = line[1:]
|
||||
|
||||
if code == "D":
|
||||
try:
|
||||
current_transaction["date"] = datetime.strptime(
|
||||
value, self.settings.date_format
|
||||
).date()
|
||||
except ValueError:
|
||||
self._log(
|
||||
"warning",
|
||||
f"Could not parse date '{value}' using format '{self.settings.date_format}' in {filename}",
|
||||
)
|
||||
if not self.settings.skip_errors:
|
||||
raise ValueError(f"Invalid date format '{value}'")
|
||||
|
||||
elif code == "T":
|
||||
try:
|
||||
cleaned_value = value.replace(",", "")
|
||||
amount = Decimal(cleaned_value)
|
||||
if amount < 0:
|
||||
current_transaction["type"] = Transaction.Type.EXPENSE
|
||||
current_transaction["amount"] = abs(amount)
|
||||
else:
|
||||
current_transaction["type"] = Transaction.Type.INCOME
|
||||
current_transaction["amount"] = amount
|
||||
except InvalidOperation:
|
||||
self._log(
|
||||
"warning", f"Could not parse amount '{value}' in {filename}"
|
||||
)
|
||||
if not self.settings.skip_errors:
|
||||
raise ValueError(f"Invalid amount format '{value}'")
|
||||
|
||||
elif code == "P":
|
||||
current_transaction["payee"] = value
|
||||
elif code == "M":
|
||||
current_transaction["memo"] = value
|
||||
elif code == "L":
|
||||
current_transaction["label"] = value
|
||||
elif code == "N":
|
||||
pass
|
||||
|
||||
def _process_qif(self, file_path):
|
||||
def process_logic():
|
||||
if zipfile.is_zipfile(file_path):
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, "r") as zf:
|
||||
for filename in zf.namelist():
|
||||
if filename.lower().endswith(
|
||||
".qif"
|
||||
) and not filename.startswith("__MACOSX"):
|
||||
self._log(
|
||||
"info", f"Processing QIF from ZIP: {filename}"
|
||||
)
|
||||
with zf.open(filename) as f:
|
||||
content = f.read().decode(self.settings.encoding)
|
||||
self._parse_and_import_qif(
|
||||
content.splitlines(), filename
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error processing ZIP file: {str(e)}")
|
||||
else:
|
||||
with open(file_path, "r", encoding=self.settings.encoding) as f:
|
||||
self._parse_and_import_qif(
|
||||
f.readlines(), os.path.basename(file_path)
|
||||
)
|
||||
|
||||
if not self.settings.skip_errors:
|
||||
with transaction.atomic():
|
||||
process_logic()
|
||||
else:
|
||||
process_logic()
|
||||
|
||||
def _validate_file_path(self, file_path: str) -> str:
|
||||
"""
|
||||
Validates that the file path is within the allowed temporary directory.
|
||||
@@ -871,6 +1086,8 @@ class ImportService:
|
||||
self._process_csv(file_path)
|
||||
elif isinstance(self.settings, version_1.ExcelImportSettings):
|
||||
self._process_excel(file_path)
|
||||
elif isinstance(self.settings, version_1.QIFImportSettings):
|
||||
self._process_qif(file_path)
|
||||
|
||||
self._update_status("FINISHED")
|
||||
self._log(
|
||||
|
||||
Reference in New Issue
Block a user