From 212ee2bf28c8b413c79bc0db1ebab12924c293b5 Mon Sep 17 00:00:00 2001 From: Herculino Trotta Date: Mon, 14 Oct 2024 11:06:02 -0300 Subject: [PATCH] feat: improve networth calculations --- .../net_worth/utils/calculate_net_worth.py | 284 ++++++++++++------ 1 file changed, 199 insertions(+), 85 deletions(-) diff --git a/app/apps/net_worth/utils/calculate_net_worth.py b/app/apps/net_worth/utils/calculate_net_worth.py index 1a9643d..fdc1db0 100644 --- a/app/apps/net_worth/utils/calculate_net_worth.py +++ b/app/apps/net_worth/utils/calculate_net_worth.py @@ -1,53 +1,66 @@ -from django.db.models import Sum +from collections import OrderedDict, defaultdict from decimal import Decimal +from dateutil.relativedelta import relativedelta +from django.db.models import ( + OuterRef, + Subquery, +) +from django.db.models import Sum, Min, Max, Case, When, F, Value, DecimalField +from django.db.models.functions import Coalesce from django.db.models.functions import TruncMonth +from django.template.defaultfilters import date as date_filter from django.utils.translation import gettext_lazy as _ -from apps.transactions.models import Transaction from apps.accounts.models import Account from apps.currencies.models import Currency from apps.currencies.utils.convert import convert +from apps.transactions.models import Transaction def calculate_account_net_worth(): - account_net_worth = {} ungrouped_id = None # Special ID for ungrouped accounts - # Initialize the "Ungrouped" category - account_net_worth[ungrouped_id] = {"name": _("Ungrouped"), "accounts": {}} + # Subquery to calculate balance for each account + balance_subquery = ( + Transaction.objects.filter(account=OuterRef("pk"), is_paid=True) + .values("account") + .annotate( + balance=Sum( + Case( + When(type=Transaction.Type.INCOME, then=F("amount")), + When(type=Transaction.Type.EXPENSE, then=-F("amount")), + default=0, + output_field=DecimalField(), + ) + ) + ) + .values("balance") + ) - # Get all accounts - accounts = Account.objects.all() + # Main query to fetch all account data + accounts_data = Account.objects.annotate( + balance=Coalesce(Subquery(balance_subquery), Decimal("0")) + ).select_related("currency", "exchange_currency", "group") - for account in accounts: - currency = account.currency - - income = Transaction.objects.filter( - account=account, type=Transaction.Type.INCOME, is_paid=True - ).aggregate(total=Sum("amount"))["total"] or Decimal("0") - - expenses = Transaction.objects.filter( - account=account, type=Transaction.Type.EXPENSE, is_paid=True - ).aggregate(total=Sum("amount"))["total"] or Decimal("0") - - account_balance = income - expenses + account_net_worth = {ungrouped_id: {"name": _("Ungrouped"), "accounts": {}}} + for account in accounts_data: account_data = { "name": account.name, - "balance": account_balance, + "balance": account.balance, "currency": { - "code": currency.code, - "name": currency.name, - "prefix": currency.prefix, - "suffix": currency.suffix, - "decimal_places": currency.decimal_places, + "code": account.currency.code, + "name": account.currency.name, + "prefix": account.currency.prefix, + "suffix": account.currency.suffix, + "decimal_places": account.currency.decimal_places, }, } if account.exchange_currency: converted_amount, prefix, suffix, decimal_places = convert( - amount=account_balance, + amount=account.balance, from_currency=account.currency, to_currency=account.exchange_currency, ) @@ -59,14 +72,13 @@ def calculate_account_net_worth(): "decimal_places": decimal_places, } - if account.group: - group_id = account.group.id - group_name = account.group.name - if group_id not in account_net_worth: - account_net_worth[group_id] = {"name": group_name, "accounts": {}} - account_net_worth[group_id]["accounts"][account.id] = account_data - else: - account_net_worth[ungrouped_id]["accounts"][account.id] = account_data + group_id = account.group.id if account.group else ungrouped_id + group_name = account.group.name if account.group else _("Ungrouped") + + if group_id not in account_net_worth: + account_net_worth[group_id] = {"name": group_name, "accounts": {}} + + account_net_worth[group_id]["accounts"][account.id] = account_data # Remove the "Ungrouped" category if it's empty if not account_net_worth[ungrouped_id]["accounts"]: @@ -75,70 +87,172 @@ def calculate_account_net_worth(): return account_net_worth -def calculate_net_worth(): - accounts = Account.objects.all() +def calculate_currency_net_worth(): + # Calculate net worth and fetch currency details in a single query + net_worth_data = ( + Transaction.objects.filter(is_paid=True) + .values( + "account__currency__name", + "account__currency__code", + "account__currency__prefix", + "account__currency__suffix", + "account__currency__decimal_places", + ) + .annotate( + amount=Sum( + Case( + When(type=Transaction.Type.INCOME, then=F("amount")), + When(type=Transaction.Type.EXPENSE, then=-F("amount")), + default=0, + output_field=DecimalField(), + ) + ) + ) + ) + + # Create the net worth dictionary from the query results net_worth = {} - - for account in accounts: - currency = account.currency - if currency.code not in net_worth: - net_worth[currency.code] = Decimal("0") - - income = Transaction.objects.filter( - account=account, type=Transaction.Type.INCOME, is_paid=True - ).aggregate(total=Sum("amount"))["total"] or Decimal("0") - - expenses = Transaction.objects.filter( - account=account, type=Transaction.Type.EXPENSE, is_paid=True - ).aggregate(total=Sum("amount"))["total"] or Decimal("0") - - account_balance = income - expenses - net_worth[currency.code] += account_balance + for item in net_worth_data: + currency_name = item["account__currency__name"] + net_worth[currency_name] = { + "amount": item["amount"] or Decimal("0"), + "code": item["account__currency__code"], + "name": currency_name, + "prefix": item["account__currency__prefix"], + "suffix": item["account__currency__suffix"], + "decimal_places": item["account__currency__decimal_places"], + } return net_worth -def calculate_historical_net_worth(start_date, end_date): - asset_accounts = Account.objects.all() - currencies = Currency.objects.all() +def calculate_historical_currency_net_worth(): + # Get all currencies and date range in a single query + aggregates = Transaction.objects.aggregate( + min_date=Min("reference_date"), + max_date=Max("reference_date"), + ) + currencies = list(Currency.objects.values_list("name", flat=True)) - # Initialize the result dictionary - historical_net_worth = {} + start_date = aggregates["min_date"].replace(day=1) + end_date = aggregates["max_date"].replace(day=1) - # Get all months between start_date and end_date - months = ( - Transaction.objects.filter(account__in=asset_accounts) + # Calculate cumulative balances for each account, currency, and month + cumulative_balances = ( + Transaction.objects.filter(is_paid=True) .annotate(month=TruncMonth("reference_date")) - .values("month") - .distinct() - .order_by("month") + .values("account__currency__name", "month") + .annotate( + balance=Sum( + Case( + When(type=Transaction.Type.INCOME, then=F("amount")), + When(type=Transaction.Type.EXPENSE, then=-F("amount")), + default=Value(0), + output_field=DecimalField(), + ) + ) + ) + .order_by("month", "account__currency__name") ) - for month_data in months: - month = month_data["month"] - month_str = month.strftime("%Y-%m") - historical_net_worth[month_str] = { - currency.code: Decimal("0.00") for currency in currencies - } + # Create a dictionary to store cumulative balances + balance_dict = {} + for b in cumulative_balances: + month = b["month"] + currency = b["account__currency__name"] + if month not in balance_dict: + balance_dict[month] = {} + balance_dict[month][currency] = b["balance"] - for account in asset_accounts: - currency = account.currency + # Initialize the result dictionary + historical_net_worth = OrderedDict() - income = Transaction.objects.filter( - account=account, - type=Transaction.Type.INCOME, - is_paid=True, - reference_date__lte=month, - ).aggregate(total=Sum("amount"))["total"] or Decimal("0.00") + # Calculate running totals for each month + running_totals = {currency: Decimal("0.00") for currency in currencies} + last_recorded_totals = running_totals.copy() - expenses = Transaction.objects.filter( - account=account, - type=Transaction.Type.EXPENSE, - is_paid=True, - reference_date__lte=month, - ).aggregate(total=Sum("amount"))["total"] or Decimal("0.00") + current_month = start_date + while current_month <= end_date: + month_str = date_filter(current_month, "b Y") + totals_changed = False - account_balance = income - expenses - historical_net_worth[month_str][currency.code] += account_balance + for currency in currencies: + balance_change = balance_dict.get(current_month, {}).get( + currency, Decimal("0.00") + ) + running_totals[currency] += balance_change + if balance_change != Decimal("0.00"): + totals_changed = True + + if totals_changed: + historical_net_worth[month_str] = running_totals.copy() + last_recorded_totals = running_totals.copy() + + current_month += relativedelta(months=1) + + # Ensure the last month is always included + if historical_net_worth and list(historical_net_worth.keys())[-1] != date_filter( + end_date, "b Y" + ): + historical_net_worth[date_filter(end_date, "b Y")] = last_recorded_totals return historical_net_worth + + +def calculate_historical_account_balance(): + # Get all accounts + accounts = Account.objects.all() + + # Get the date range + date_range = Transaction.objects.aggregate( + min_date=Min("reference_date"), max_date=Max("reference_date") + ) + start_date = date_range["min_date"].replace(day=1) + end_date = date_range["max_date"].replace(day=1) + + # Calculate balances for each account and month + balances = ( + Transaction.objects.filter(is_paid=True) + .annotate(month=TruncMonth("reference_date")) + .values("account", "month") + .annotate(balance=Sum("amount")) + .order_by("account", "month") + ) + + # Organize data by account and month + account_balances = defaultdict(lambda: defaultdict(Decimal)) + for balance in balances: + account_balances[balance["account"]][balance["month"]] += balance["balance"] + + # Prepare the result + historical_account_balance = OrderedDict() + current_date = start_date + previous_balances = {account.id: Decimal("0") for account in accounts} + + while current_date <= end_date: + month_data = {} + has_changes = False + + for account in accounts: + running_balance = previous_balances[account.id] + account_balances[ + account.id + ].get(current_date, Decimal("0")) + + if running_balance != previous_balances[account.id]: + has_changes = True + + month_data[account.name] = running_balance + previous_balances[account.id] = running_balance + + if has_changes or not historical_account_balance: + historical_account_balance[date_filter(current_date, "b Y")] = month_data + + current_date += relativedelta(months=1) + + # Ensure the last month is always included + if historical_account_balance and list(historical_account_balance.keys())[ + -1 + ] != date_filter(end_date, "b Y"): + historical_account_balance[date_filter(end_date, "b Y")] = month_data + + return historical_account_balance