feat(dca): link transactions to DCA

This commit is contained in:
Herculino Trotta
2025-02-15 00:41:06 -03:00
parent 9e912b2736
commit 865618e054
11 changed files with 450 additions and 84 deletions

View File

@@ -12,15 +12,14 @@ class DynamicModelChoiceField(forms.ModelChoiceField):
self.to_field_name = kwargs.pop("to_field_name", "pk")
self.create_field = kwargs.pop("create_field", None)
if not self.create_field:
raise ValueError("The 'create_field' parameter is required.")
self.queryset = kwargs.pop("queryset", model.objects.all())
super().__init__(queryset=self.queryset, *args, **kwargs)
self._created_instance = None
self.widget = TomSelect(clear_button=True, create=True)
super().__init__(queryset=self.queryset, *args, **kwargs)
self._created_instance = None
def to_python(self, value):
if value in self.empty_values:
return None
@@ -53,14 +52,19 @@ class DynamicModelChoiceField(forms.ModelChoiceField):
else:
raise self.model.DoesNotExist
except self.model.DoesNotExist:
try:
with transaction.atomic():
instance, _ = self.model.objects.update_or_create(
**{self.create_field: value}
if self.create_field:
try:
with transaction.atomic():
instance, _ = self.model.objects.update_or_create(
**{self.create_field: value}
)
self._created_instance = instance
return instance
except Exception as e:
raise ValidationError(
self.error_messages["invalid_choice"], code="invalid_choice"
)
self._created_instance = instance
return instance
except Exception as e:
else:
raise ValidationError(
self.error_messages["invalid_choice"], code="invalid_choice"
)

View File

@@ -1,4 +1,5 @@
from django.forms import widgets, SelectMultiple
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
@@ -129,3 +130,28 @@ class TomSelect(widgets.Select):
class TomSelectMultiple(SelectMultiple, TomSelect):
pass
class TransactionSelect(TomSelect):
def __init__(self, income: bool = True, expense: bool = True, *args, **kwargs):
super().__init__(*args, **kwargs)
self.load_income = income
self.load_expense = expense
self.create = False
def build_attrs(self, base_attrs, extra_attrs=None):
attrs = super().build_attrs(base_attrs, extra_attrs)
if self.load_income and self.load_expense:
attrs["data-load"] = reverse("transactions_search")
elif self.load_income and not self.load_expense:
attrs["data-load"] = reverse(
"transactions_search", kwargs={"filter_type": "income"}
)
elif self.load_expense and not self.load_income:
attrs["data-load"] = reverse(
"transactions_search", kwargs={"filter_type": "expenses"}
)
return attrs

View File

@@ -1,14 +1,22 @@
from crispy_forms.bootstrap import FormActions
from crispy_bootstrap5.bootstrap5 import Switch, BS5Accordion
from crispy_forms.bootstrap import FormActions, AccordionGroup
from crispy_forms.helper import FormHelper
from crispy_forms.layout import Layout, Row, Column
from crispy_forms.layout import Layout, Row, Column, HTML
from django import forms
from django.utils.translation import gettext_lazy as _
from apps.accounts.models import Account
from apps.common.widgets.crispy.submit import NoClassSubmit
from apps.common.widgets.datepicker import AirDatePickerInput
from apps.common.widgets.decimal import ArbitraryDecimalDisplayNumberInput
from apps.common.widgets.tom_select import TomSelect
from apps.dca.models import DCAStrategy, DCAEntry
from apps.common.widgets.tom_select import TransactionSelect
from apps.transactions.models import Transaction, TransactionTag, TransactionCategory
from apps.common.fields.forms.dynamic_select import (
DynamicModelChoiceField,
DynamicModelMultipleChoiceField,
)
class DCAStrategyForm(forms.ModelForm):
@@ -53,6 +61,75 @@ class DCAStrategyForm(forms.ModelForm):
class DCAEntryForm(forms.ModelForm):
create_transaction = forms.BooleanField(
label=_("Create transaction"), initial=False, required=False
)
from_account = forms.ModelChoiceField(
queryset=Account.objects.filter(is_archived=False),
label=_("From Account"),
widget=TomSelect(clear_button=False, group_by="group"),
required=False,
)
to_account = forms.ModelChoiceField(
queryset=Account.objects.filter(is_archived=False),
label=_("To Account"),
widget=TomSelect(clear_button=False, group_by="group"),
required=False,
)
from_category = DynamicModelChoiceField(
create_field="name",
model=TransactionCategory,
required=False,
label=_("Category"),
queryset=TransactionCategory.objects.filter(active=True),
)
to_category = DynamicModelChoiceField(
create_field="name",
model=TransactionCategory,
required=False,
label=_("Category"),
queryset=TransactionCategory.objects.filter(active=True),
)
from_tags = DynamicModelMultipleChoiceField(
model=TransactionTag,
to_field_name="name",
create_field="name",
required=False,
label=_("Tags"),
queryset=TransactionTag.objects.filter(active=True),
)
to_tags = DynamicModelMultipleChoiceField(
model=TransactionTag,
to_field_name="name",
create_field="name",
required=False,
label=_("Tags"),
queryset=TransactionTag.objects.filter(active=True),
)
expense_transaction = DynamicModelChoiceField(
model=Transaction,
to_field_name="id",
label=_("Expense Transaction"),
required=False,
queryset=Transaction.objects.none(),
widget=TransactionSelect(clear_button=True, income=False, expense=True),
help_text=_("Type to search for a transaction to link to this entry"),
)
income_transaction = DynamicModelChoiceField(
model=Transaction,
to_field_name="id",
label=_("Income Transaction"),
required=False,
queryset=Transaction.objects.none(),
widget=TransactionSelect(clear_button=True, income=True, expense=False),
help_text=_("Type to search for a transaction to link to this entry"),
)
class Meta:
model = DCAEntry
fields = [
@@ -60,13 +137,19 @@ class DCAEntryForm(forms.ModelForm):
"amount_paid",
"amount_received",
"notes",
"expense_transaction",
"income_transaction",
]
widgets = {
"notes": forms.Textarea(attrs={"rows": 3}),
}
def __init__(self, *args, **kwargs):
strategy = kwargs.pop("strategy", None)
super().__init__(*args, **kwargs)
self.strategy = strategy if strategy else self.instance.strategy
self.helper = FormHelper()
self.helper.form_tag = False
self.helper.layout = Layout(
@@ -75,18 +158,66 @@ class DCAEntryForm(forms.ModelForm):
Column("amount_paid", css_class="form-group col-md-6"),
Column("amount_received", css_class="form-group col-md-6"),
),
Row(
Column("expense_transaction", css_class="form-group col-md-6"),
Column("income_transaction", css_class="form-group col-md-6"),
),
"notes",
BS5Accordion(
AccordionGroup(
_("Create transaction"),
Switch("create_transaction"),
Row(
Column(
Row(
Column(
"from_account",
css_class="form-group col-md-6 mb-0",
),
css_class="form-row",
),
Row(
Column(
"from_category",
css_class="form-group col-md-6 mb-0",
),
Column(
"from_tags", css_class="form-group col-md-6 mb-0"
),
css_class="form-row",
),
),
css_class="p-1 mx-1 my-3 border rounded-3",
),
Row(
Column(
Row(
Column(
"to_account",
css_class="form-group col-md-6 mb-0",
),
css_class="form-row",
),
Row(
Column(
"to_category", css_class="form-group col-md-6 mb-0"
),
Column("to_tags", css_class="form-group col-md-6 mb-0"),
css_class="form-row",
),
),
css_class="p-1 mx-1 my-3 border rounded-3",
),
active=False,
),
AccordionGroup(
_("Link transaction"),
"income_transaction",
"expense_transaction",
),
flush=False,
always_open=False,
css_class="mb-3",
),
)
if self.instance and self.instance.pk:
# decimal_places = self.instance.account.currency.decimal_places
# self.fields["amount"].widget = ArbitraryDecimalDisplayNumberInput(
# decimal_places=decimal_places
# )
self.helper.layout.append(
FormActions(
NoClassSubmit(
@@ -95,7 +226,6 @@ class DCAEntryForm(forms.ModelForm):
),
)
else:
# self.fields["amount"].widget = ArbitraryDecimalDisplayNumberInput()
self.helper.layout.append(
FormActions(
NoClassSubmit(
@@ -107,3 +237,118 @@ class DCAEntryForm(forms.ModelForm):
self.fields["amount_paid"].widget = ArbitraryDecimalDisplayNumberInput()
self.fields["amount_received"].widget = ArbitraryDecimalDisplayNumberInput()
self.fields["date"].widget = AirDatePickerInput(clear_button=False)
expense_transaction = None
income_transaction = None
if self.instance and self.instance.pk:
# Edit mode - get from instance
expense_transaction = self.instance.expense_transaction
income_transaction = self.instance.income_transaction
elif self.data.get("expense_transaction"):
# Form validation - get from submitted data
try:
expense_transaction = Transaction.objects.get(
id=self.data["expense_transaction"]
)
income_transaction = Transaction.objects.get(
id=self.data["income_transaction"]
)
except Transaction.DoesNotExist:
pass
# If we have a current transaction, ensure it's in the queryset
if income_transaction:
self.fields["income_transaction"].queryset = Transaction.objects.filter(
id=income_transaction.id
)
if expense_transaction:
self.fields["expense_transaction"].queryset = Transaction.objects.filter(
id=expense_transaction.id
)
def clean(self):
cleaned_data = super().clean()
if cleaned_data.get("create_transaction"):
from_account = cleaned_data.get("from_account")
to_account = cleaned_data.get("to_account")
if not from_account and not to_account:
raise forms.ValidationError(
{
"from_account": _("You must provide an account."),
"to_account": _("You must provide an account."),
}
)
elif not from_account and to_account:
raise forms.ValidationError(
{"from_account": _("You must provide an account.")}
)
elif not to_account and from_account:
raise forms.ValidationError(
{"to_account": _("You must provide an account.")}
)
if from_account == to_account:
raise forms.ValidationError(
_("From and To accounts must be different.")
)
return cleaned_data
def save(self, **kwargs):
instance = super().save(commit=False)
if self.cleaned_data.get("create_transaction"):
from_account = self.cleaned_data["from_account"]
to_account = self.cleaned_data["to_account"]
from_amount = instance.amount_paid
to_amount = instance.amount_received
date = instance.date
description = _("DCA for %(strategy_name)s") % {
"strategy_name": self.strategy.name
}
from_category = self.cleaned_data.get("from_category")
to_category = self.cleaned_data.get("to_category")
notes = self.cleaned_data.get("notes")
# Create "From" transaction
from_transaction = Transaction.objects.create(
account=from_account,
type=Transaction.Type.EXPENSE,
is_paid=True,
date=date,
amount=from_amount,
description=description,
category=from_category,
notes=notes,
)
from_transaction.tags.set(self.cleaned_data.get("from_tags", []))
# Create "To" transaction
to_transaction = Transaction.objects.create(
account=to_account,
type=Transaction.Type.INCOME,
is_paid=True,
date=date,
amount=to_amount,
description=description,
category=to_category,
notes=notes,
)
to_transaction.tags.set(self.cleaned_data.get("to_tags", []))
instance.expense_transaction = from_transaction
instance.income_transaction = to_transaction
else:
if instance.expense_transaction:
instance.expense_transaction.amount = instance.amount_paid
instance.expense_transaction.save()
if instance.income_transaction:
instance.income_transaction.amount = instance.amount_received
instance.income_transaction.save()
instance.strategy = self.strategy
instance.save()
return instance

View File

@@ -155,11 +155,9 @@ def strategy_detail(request, strategy_id):
def strategy_entry_add(request, strategy_id):
strategy = get_object_or_404(DCAStrategy, id=strategy_id)
if request.method == "POST":
form = DCAEntryForm(request.POST)
form = DCAEntryForm(request.POST, strategy=strategy)
if form.is_valid():
entry = form.save(commit=False)
entry.strategy = strategy
entry.save()
entry = form.save()
messages.success(request, _("Entry added successfully"))
return HttpResponse(
@@ -169,7 +167,7 @@ def strategy_entry_add(request, strategy_id):
},
)
else:
form = DCAEntryForm()
form = DCAEntryForm(strategy=strategy)
return render(
request,

View File

@@ -92,6 +92,8 @@ def transactions_list(request, month: int, year: int):
"account__currency",
"installment_plan",
"entities",
"dca_expense_entries",
"dca_income_entries",
)
)

View File

@@ -11,6 +11,13 @@ from apps.rules.tasks import check_for_transaction_rules
@receiver(transaction_created)
@receiver(transaction_updated)
def transaction_changed_receiver(sender: Transaction, signal, **kwargs):
for dca_entry in sender.dca_expense_entries.all():
dca_entry.amount_paid = sender.amount
dca_entry.save()
for dca_entry in sender.dca_income_entries.all():
dca_entry.amount_received = sender.amount
dca_entry.save()
check_for_transaction_rules.defer(
instance_id=sender.id,
signal=(

View File

@@ -320,10 +320,10 @@ class Transaction(models.Model):
type_display = self.get_type_display()
frmt_date = date(self.date, "SHORT_DATE_FORMAT")
account = self.account
tags = ", ".join([x.name for x in self.tags.all()]) or _("No Tags")
category = self.category or _("No Category")
tags = ", ".join([x.name for x in self.tags.all()]) or _("No tags")
category = self.category or _("No category")
amount = localize_number(drop_trailing_zeros(self.amount))
description = self.description or _("No Description")
description = self.description or _("No description")
return f"[{frmt_date}][{type_display}][{account}] {description}{category}{tags}{amount}"

View File

@@ -86,6 +86,16 @@ urlpatterns = [
views.transactions_bulk_edit,
name="transactions_bulk_edit",
),
path(
"transactions/json/search/",
views.get_recent_transactions,
name="transactions_search",
),
path(
"transactions/json/search/<str:filter_type>/",
views.get_recent_transactions,
name="transactions_search",
),
path(
"transaction/<int:transaction_id>/clone/",
views.transaction_clone,

View File

@@ -4,14 +4,14 @@ from copy import deepcopy
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.core.paginator import Paginator
from django.http import HttpResponse
from django.db.models import Q
from django.http import HttpResponse, JsonResponse
from django.shortcuts import render, get_object_or_404
from django.utils import timezone
from django.utils.translation import gettext_lazy as _, ngettext_lazy
from django.views.decorators.http import require_http_methods
from apps.common.decorators.htmx import only_htmx
from apps.common.utils.dicts import remove_falsey_entries
from apps.rules.signals import transaction_created, transaction_updated
from apps.transactions.filters import TransactionsFilter
from apps.transactions.forms import (
@@ -363,6 +363,8 @@ def transaction_all_list(request):
"account__currency",
"installment_plan",
"entities",
"dca_expense_entries",
"dca_income_entries",
).all()
transactions = default_order(transactions, order=order)
@@ -395,6 +397,9 @@ def transaction_all_summary(request):
"account__exchange_currency",
"account__currency",
"installment_plan",
"entities",
"dca_expense_entries",
"dca_income_entries",
).all()
f = TransactionsFilter(request.GET, queryset=transactions)
@@ -426,6 +431,9 @@ def transaction_all_account_summary(request):
"account__exchange_currency",
"account__currency",
"installment_plan",
"entities",
"dca_expense_entries",
"dca_income_entries",
).all()
f = TransactionsFilter(request.GET, queryset=transactions)
@@ -453,6 +461,9 @@ def transaction_all_currency_summary(request):
"account__exchange_currency",
"account__currency",
"installment_plan",
"entities",
"dca_expense_entries",
"dca_income_entries",
).all()
f = TransactionsFilter(request.GET, queryset=transactions)
@@ -484,6 +495,9 @@ def transactions_trash_can_index(request):
return render(request, "transactions/pages/trash.html")
@only_htmx
@login_required
@require_http_methods(["GET"])
def transactions_trash_can_list(request):
transactions = Transaction.deleted_objects.prefetch_related(
"account",
@@ -493,6 +507,10 @@ def transactions_trash_can_list(request):
"account__exchange_currency",
"account__currency",
"installment_plan",
"entities",
"entities",
"dca_expense_entries",
"dca_income_entries",
).all()
return render(
@@ -500,3 +518,43 @@ def transactions_trash_can_list(request):
"transactions/fragments/trash_list.html",
{"transactions": transactions},
)
@login_required
@require_http_methods(["GET"])
def get_recent_transactions(request, filter_type=None):
"""Return the 100 most recent non-deleted transactions with optional search."""
# Get search term from query params
search_term = request.GET.get("q", "").strip()
# Base queryset with selected fields
queryset = (
Transaction.objects.filter(deleted=False)
.select_related("account", "category")
.order_by("-created_at")
)
if filter_type:
if filter_type == "expenses":
queryset = queryset.filter(type=Transaction.Type.EXPENSE)
elif filter_type == "income":
queryset = queryset.filter(type=Transaction.Type.INCOME)
# Apply search if provided
if search_term:
queryset = queryset.filter(
Q(description__icontains=search_term)
| Q(notes__icontains=search_term)
| Q(internal_note__icontains=search_term)
| Q(tags__name__icontains=search_term)
| Q(category__name__icontains=search_term)
)
# Prepare data for JSON response
data = []
for t in queryset:
data.append({"text": str(t), "value": str(t.id)})
print(data)
return JsonResponse(data, safe=False)

View File

@@ -44,13 +44,16 @@
{# Description#}
<div class="mb-2 mb-lg-1 text-white tw-text-base">
{% spaceless %}
<span>{{ transaction.description }}</span>
<span class="{% if transaction.description %}me-2{% endif %}">{{ transaction.description }}</span>
{% if transaction.installment_plan and transaction.installment_id %}
<span
class="badge text-bg-secondary ms-2">{{ transaction.installment_id }}/{{ transaction.installment_plan.installment_total_number }}</span>
class="badge text-bg-secondary">{{ transaction.installment_id }}/{{ transaction.installment_plan.installment_total_number }}</span>
{% endif %}
{% if transaction.recurring_transaction %}
<span class="text-primary tw-text-xs ms-2"><i class="fa-solid fa-arrows-rotate fa-fw"></i></span>
<span class="text-primary tw-text-xs"><i class="fa-solid fa-arrows-rotate fa-fw"></i></span>
{% endif %}
{% if transaction.dca_expense_entries.all or transaction.dca_income_entries.all %}
<span class="badge text-bg-secondary">{% trans 'DCA' %}</span>
{% endif %}
{% endspaceless %}
</div>