feat: rename app, some work on schema

This commit is contained in:
Herculino Trotta
2025-01-17 17:40:51 -03:00
parent c171e0419a
commit fbb26b8442
22 changed files with 486 additions and 11 deletions

View File

@@ -64,7 +64,7 @@ INSTALLED_APPS = [
"apps.accounts.apps.AccountsConfig",
"apps.common.apps.CommonConfig",
"apps.net_worth.apps.NetWorthConfig",
"apps.import.apps.ImportConfig",
"apps.import_app.apps.ImportConfig",
"apps.api.apps.ApiConfig",
"cachalot",
"rest_framework",

View File

@@ -47,4 +47,5 @@ urlpatterns = [
path("", include("apps.calendar_view.urls")),
path("", include("apps.dca.urls")),
path("", include("apps.mini_tools.urls")),
path("", include("apps.import_app.urls")),
]

View File

@@ -1,3 +0,0 @@
from django.contrib import admin
# Register your models here.

View File

@@ -1,3 +0,0 @@
from django.db import models
# Create your models here.

View File

@@ -1,3 +0,0 @@
from django.shortcuts import render
# Create your views here.

View File

@@ -0,0 +1,6 @@
from django.contrib import admin
from apps.import_app import models
# Register your models here.
admin.site.register(models.ImportRun)
admin.site.register(models.ImportProfile)

View File

@@ -3,4 +3,4 @@ from django.apps import AppConfig
class ImportConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "apps.import"
name = "apps.import_app"

View File

@@ -0,0 +1,74 @@
from django.db import models
from django.utils.translation import gettext_lazy as _
class ImportProfile(models.Model):
class Versions(models.IntegerChoices):
VERSION_1 = 1, _("Version 1")
name = models.CharField(max_length=100)
yaml_config = models.TextField(help_text=_("YAML configuration"))
version = models.IntegerField(
choices=Versions,
default=Versions.VERSION_1,
verbose_name=_("Version"),
)
def __str__(self):
return self.name
class Meta:
ordering = ["name"]
class ImportRun(models.Model):
class Status(models.TextChoices):
QUEUED = "QUEUED", _("Queued")
PROCESSING = "PROCESSING", _("Processing")
FAILED = "FAILED", _("Failed")
FINISHED = "FINISHED", _("Finished")
status = models.CharField(
max_length=10,
choices=Status,
default=Status.QUEUED,
verbose_name=_("Status"),
)
profile = models.ForeignKey(
ImportProfile,
on_delete=models.CASCADE,
)
file_name = models.CharField(
max_length=10000,
help_text=_("File name"),
)
transactions = models.ManyToManyField(
"transactions.Transaction", related_name="import_runs"
)
tags = models.ManyToManyField(
"transactions.TransactionTag", related_name="import_runs"
)
categories = models.ManyToManyField(
"transactions.TransactionCategory", related_name="import_runs"
)
entities = models.ManyToManyField(
"transactions.TransactionEntity", related_name="import_runs"
)
currencies = models.ManyToManyField(
"currencies.Currency", related_name="import_runs"
)
logs = models.TextField(blank=True)
processed_rows = models.IntegerField(default=0)
total_rows = models.IntegerField(default=0)
successful_rows = models.IntegerField(default=0)
skipped_rows = models.IntegerField(default=0)
failed_rows = models.IntegerField(default=0)
started_at = models.DateTimeField(null=True)
finished_at = models.DateTimeField(null=True)
@property
def progress(self):
if self.total_rows == 0:
return 0
return (self.processed_rows / self.total_rows) * 100

View File

View File

@@ -0,0 +1,8 @@
from apps.import_app.schemas.v1 import (
ImportProfileSchema as SchemaV1,
ColumnMapping as ColumnMappingV1,
# TransformationRule as TransformationRuleV1,
ImportSettings as SettingsV1,
HashTransformationRule as HashTransformationRuleV1,
CompareDeduplicationRule as CompareDeduplicationRuleV1,
)

View File

@@ -0,0 +1,104 @@
from typing import Dict, List, Optional, Literal
from pydantic import BaseModel, Field
class CompareDeduplicationRule(BaseModel):
type: Literal["compare"]
fields: Dict = Field(
..., description="Match header and fields to compare for deduplication"
)
match_type: Literal["lax", "strict"]
class ReplaceTransformationRule(BaseModel):
field: str
type: Literal["replace", "regex"] = Field(
..., description="Type of transformation: replace or regex"
)
pattern: str = Field(..., description="Pattern to match")
replacement: str = Field(..., description="Value to replace with")
class DateFormatTransformationRule(BaseModel):
field: str
type: Literal["date_format"] = Field(
..., description="Type of transformation: replace or regex"
)
original_format: str = Field(..., description="Original date format")
new_format: str = Field(..., description="New date format to use")
class HashTransformationRule(BaseModel):
fields: List[str]
type: Literal["hash"]
class MergeTransformationRule(BaseModel):
fields: List[str]
type: Literal["merge"]
separator: str = Field(default=" ", description="Separator to use when merging")
class SplitTransformationRule(BaseModel):
fields: List[str]
type: Literal["split"]
separator: str = Field(default=",", description="Separator to use when splitting")
index: int | None = Field(
default=0, description="Index to return as value. Empty to return all."
)
class ImportSettings(BaseModel):
skip_errors: bool = Field(
default=False,
description="If True, errors during import will be logged and skipped",
)
file_type: Literal["csv"] = "csv"
delimiter: str = Field(default=",", description="CSV delimiter character")
encoding: str = Field(default="utf-8", description="File encoding")
skip_rows: int = Field(
default=0, description="Number of rows to skip at the beginning of the file"
)
importing: Literal[
"transactions", "accounts", "currencies", "categories", "tags", "entities"
]
class ColumnMapping(BaseModel):
source: Optional[str] = Field(
default=None,
description="CSV column header. If None, the field will be generated from transformations",
)
target: Literal[
"account",
"type",
"is_paid",
"date",
"reference_date",
"amount",
"notes",
"category",
"tags",
"entities",
"internal_note",
] = Field(..., description="Transaction field to map to")
default_value: Optional[str] = None
required: bool = False
transformations: Optional[
List[
ReplaceTransformationRule
| DateFormatTransformationRule
| HashTransformationRule
| MergeTransformationRule
| SplitTransformationRule
]
] = Field(default_factory=list)
class ImportProfileSchema(BaseModel):
settings: ImportSettings
column_mapping: Dict[str, ColumnMapping]
deduplication: List[CompareDeduplicationRule] = Field(
default_factory=list,
description="Rules for deduplicating records during import",
)

View File

View File

@@ -0,0 +1 @@
from apps.import_app.services.v1 import ImportService as ImportServiceV1

View File

@@ -0,0 +1,237 @@
import csv
import hashlib
import re
from datetime import datetime
from typing import Dict, Any, Literal
import yaml
from django.db import transaction
from django.core.files.storage import default_storage
from django.utils import timezone
from apps.import_app.models import ImportRun, ImportProfile
from apps.import_app.schemas import (
SchemaV1,
ColumnMappingV1,
SettingsV1,
HashTransformationRuleV1,
CompareDeduplicationRuleV1,
)
from apps.transactions.models import Transaction
class ImportService:
def __init__(self, import_run: ImportRun):
self.import_run: ImportRun = import_run
self.profile: ImportProfile = import_run.profile
self.config: SchemaV1 = self._load_config()
self.settings: SettingsV1 = self.config.settings
self.deduplication: list[CompareDeduplicationRuleV1] = self.config.deduplication
self.mapping: Dict[str, ColumnMappingV1] = self.config.column_mapping
def _load_config(self) -> SchemaV1:
yaml_data = yaml.safe_load(self.profile.yaml_config)
if self.profile.version == ImportProfile.Versions.VERSION_1:
return SchemaV1(**yaml_data)
raise ValueError(f"Unsupported version: {self.profile.version}")
def _log(self, level: str, message: str, **kwargs) -> None:
"""Add a log entry to the import run logs"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# Format additional context if present
context = ""
if kwargs:
context = " - " + ", ".join(f"{k}={v}" for k, v in kwargs.items())
log_line = f"[{timestamp}] {level.upper()}: {message}{context}\n"
# Append to existing logs
self.import_run.logs += log_line
self.import_run.save(update_fields=["logs"])
def _update_status(
self, new_status: Literal["PROCESSING", "FAILED", "FINISHED"]
) -> None:
if new_status == "PROCESSING":
self.import_run.status = ImportRun.Status.PROCESSING
elif new_status == "FAILED":
self.import_run.status = ImportRun.Status.FAILED
elif new_status == "FINISHED":
self.import_run.status = ImportRun.Status.FINISHED
self.import_run.save(update_fields=["status"])
@staticmethod
def _transform_value(
value: str, mapping: ColumnMappingV1, row: Dict[str, str] = None
) -> Any:
transformed = value
for transform in mapping.transformations:
if transform.type == "hash":
if not isinstance(transform, HashTransformationRuleV1):
continue
# Collect all values to be hashed
values_to_hash = []
for field in transform.fields:
if field in row:
values_to_hash.append(str(row[field]))
# Create hash from concatenated values
if values_to_hash:
concatenated = "|".join(values_to_hash)
transformed = hashlib.sha256(concatenated.encode()).hexdigest()
elif transform.type == "replace":
transformed = transformed.replace(
transform.pattern, transform.replacement
)
elif transform.type == "regex":
transformed = re.sub(
transform.pattern, transform.replacement, transformed
)
elif transform.type == "date_format":
transformed = datetime.strptime(
transformed, transform.pattern
).strftime(transform.replacement)
return transformed
def _map_row_to_transaction(self, row: Dict[str, str]) -> Dict[str, Any]:
transaction_data = {}
for field, mapping in self.mapping.items():
# If source is None, use None as the initial value
value = row.get(mapping.source) if mapping.source else None
# Use default_value if value is None
if value is None:
value = mapping.default_value
if mapping.required and value is None and not mapping.transformations:
raise ValueError(f"Required field {field} is missing")
# Apply transformations even if initial value is None
if mapping.transformations:
value = self._transform_value(value, mapping, row)
if value is not None:
transaction_data[field] = value
return transaction_data
def _check_duplicate_transaction(self, transaction_data: Dict[str, Any]) -> bool:
for rule in self.deduplication:
if rule.type == "compare":
query = Transaction.objects.all()
# Build query conditions for each field in the rule
for field, header in rule.fields.items():
if field in transaction_data:
if rule.match_type == "strict":
query = query.filter(**{field: transaction_data[field]})
else: # lax matching
query = query.filter(
**{f"{field}__iexact": transaction_data[field]}
)
# If we found any matching transaction, it's a duplicate
if query.exists():
return True
return False
def _process_csv(self, file_path):
with open(file_path, "r", encoding=self.settings.encoding) as csv_file:
reader = csv.DictReader(csv_file, delimiter=self.settings.delimiter)
# Count total rows
self.import_run.total_rows = sum(1 for _ in reader)
csv_file.seek(0)
reader = csv.DictReader(csv_file, delimiter=self.settings.delimiter)
self._log("info", f"Starting import with {self.import_run.total_rows} rows")
# Skip specified number of rows
for _ in range(self.settings.skip_rows):
next(reader)
if self.settings.skip_rows:
self._log("info", f"Skipped {self.settings.skip_rows} initial rows")
for row_number, row in enumerate(reader, start=1):
try:
transaction_data = self._map_row_to_transaction(row)
if transaction_data:
if self.deduplication and self._check_duplicate_transaction(
transaction_data
):
self.import_run.skipped_rows += 1
self._log("info", f"Skipped duplicate row {row_number}")
continue
self.import_run.transactions.add(transaction_data)
self.import_run.successful_rows += 1
self._log("debug", f"Successfully processed row {row_number}")
self.import_run.processed_rows += 1
self.import_run.save(
update_fields=[
"processed_rows",
"successful_rows",
"skipped_rows",
]
)
except Exception as e:
if not self.settings.skip_errors:
self._log(
"error",
f"Fatal error processing row {row_number}: {str(e)}",
)
self._update_status("FAILED")
raise
else:
self._log(
"warning", f"Error processing row {row_number}: {str(e)}"
)
self.import_run.failed_rows += 1
self.import_run.save(update_fields=["failed_rows"])
def process_file(self, file_path: str):
self._update_status("PROCESSING")
self.import_run.started_at = timezone.now()
self.import_run.save(update_fields=["started_at"])
self._log("info", "Starting import process")
try:
if self.settings.file_type == "csv":
self._process_csv(file_path)
if self.import_run.processed_rows == self.import_run.total_rows:
self._update_status("FINISHED")
self._log(
"info",
f"Import completed successfully. "
f"Successful: {self.import_run.successful_rows}, "
f"Failed: {self.import_run.failed_rows}, "
f"Skipped: {self.import_run.skipped_rows}",
)
except Exception as e:
self._update_status("FAILED")
self._log("error", f"Import failed: {str(e)}")
raise Exception("Import failed")
finally:
self._log("info", "Cleaning up temporary files")
default_storage.delete(file_path)
self.import_run.finished_at = timezone.now()
self.import_run.save(update_fields=["finished_at"])

View File

@@ -0,0 +1,18 @@
import logging
from procrastinate.contrib.django import app
from apps.import_app.models import ImportRun
from apps.import_app.services import ImportServiceV1
logger = logging.getLogger(__name__)
@app.task(queue="imports")
def process_import(import_run_id: int, file_path: str):
try:
import_run = ImportRun.objects.get(id=import_run_id)
import_service = ImportServiceV1(import_run)
import_service.process_file(file_path)
except ImportRun.DoesNotExist:
raise ValueError(f"ImportRun with id {import_run_id} not found")

View File

@@ -0,0 +1,6 @@
from django.urls import path
import apps.import_app.views as views
urlpatterns = [
path("import/", views.ImportRunCreateView.as_view(), name="import"),
]

View File

@@ -0,0 +1,26 @@
from django.views.generic import CreateView
from apps.import_app.models import ImportRun
from apps.import_app.services import ImportServiceV1
class ImportRunCreateView(CreateView):
model = ImportRun
fields = ["profile"]
def form_valid(self, form):
response = super().form_valid(form)
import_run = form.instance
file = self.request.FILES["file"]
# Save uploaded file temporarily
temp_file_path = f"/tmp/import_{import_run.id}.csv"
with open(temp_file_path, "wb+") as destination:
for chunk in file.chunks():
destination.write(chunk)
# Process the import
import_service = ImportServiceV1(import_run)
import_service.process_file(temp_file_path)
return response

View File

@@ -141,6 +141,7 @@ class Transaction(models.Model):
related_name="transactions",
verbose_name=_("Recurring Transaction"),
)
internal_note = models.TextField(blank=True, verbose_name=_("Internal Note"))
class Meta:
verbose_name = _("Transaction")

View File

@@ -24,3 +24,5 @@ requests~=2.32.3
pytz~=2024.2
python-dateutil~=2.9.0.post0
simpleeval~=1.0.0
pydantic~=2.10.5
PyYAML~=6.0.2