feat: improve networth calculations

This commit is contained in:
Herculino Trotta
2024-10-14 11:06:02 -03:00
parent f785488977
commit 212ee2bf28
+199 -85
View File
@@ -1,53 +1,66 @@
from django.db.models import Sum from collections import OrderedDict, defaultdict
from decimal import Decimal from decimal import Decimal
from dateutil.relativedelta import relativedelta
from django.db.models import (
OuterRef,
Subquery,
)
from django.db.models import Sum, Min, Max, Case, When, F, Value, DecimalField
from django.db.models.functions import Coalesce
from django.db.models.functions import TruncMonth from django.db.models.functions import TruncMonth
from django.template.defaultfilters import date as date_filter
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from apps.transactions.models import Transaction
from apps.accounts.models import Account from apps.accounts.models import Account
from apps.currencies.models import Currency from apps.currencies.models import Currency
from apps.currencies.utils.convert import convert from apps.currencies.utils.convert import convert
from apps.transactions.models import Transaction
def calculate_account_net_worth(): def calculate_account_net_worth():
account_net_worth = {}
ungrouped_id = None # Special ID for ungrouped accounts ungrouped_id = None # Special ID for ungrouped accounts
# Initialize the "Ungrouped" category # Subquery to calculate balance for each account
account_net_worth[ungrouped_id] = {"name": _("Ungrouped"), "accounts": {}} balance_subquery = (
Transaction.objects.filter(account=OuterRef("pk"), is_paid=True)
.values("account")
.annotate(
balance=Sum(
Case(
When(type=Transaction.Type.INCOME, then=F("amount")),
When(type=Transaction.Type.EXPENSE, then=-F("amount")),
default=0,
output_field=DecimalField(),
)
)
)
.values("balance")
)
# Get all accounts # Main query to fetch all account data
accounts = Account.objects.all() accounts_data = Account.objects.annotate(
balance=Coalesce(Subquery(balance_subquery), Decimal("0"))
).select_related("currency", "exchange_currency", "group")
for account in accounts: account_net_worth = {ungrouped_id: {"name": _("Ungrouped"), "accounts": {}}}
currency = account.currency
income = Transaction.objects.filter(
account=account, type=Transaction.Type.INCOME, is_paid=True
).aggregate(total=Sum("amount"))["total"] or Decimal("0")
expenses = Transaction.objects.filter(
account=account, type=Transaction.Type.EXPENSE, is_paid=True
).aggregate(total=Sum("amount"))["total"] or Decimal("0")
account_balance = income - expenses
for account in accounts_data:
account_data = { account_data = {
"name": account.name, "name": account.name,
"balance": account_balance, "balance": account.balance,
"currency": { "currency": {
"code": currency.code, "code": account.currency.code,
"name": currency.name, "name": account.currency.name,
"prefix": currency.prefix, "prefix": account.currency.prefix,
"suffix": currency.suffix, "suffix": account.currency.suffix,
"decimal_places": currency.decimal_places, "decimal_places": account.currency.decimal_places,
}, },
} }
if account.exchange_currency: if account.exchange_currency:
converted_amount, prefix, suffix, decimal_places = convert( converted_amount, prefix, suffix, decimal_places = convert(
amount=account_balance, amount=account.balance,
from_currency=account.currency, from_currency=account.currency,
to_currency=account.exchange_currency, to_currency=account.exchange_currency,
) )
@@ -59,14 +72,13 @@ def calculate_account_net_worth():
"decimal_places": decimal_places, "decimal_places": decimal_places,
} }
if account.group: group_id = account.group.id if account.group else ungrouped_id
group_id = account.group.id group_name = account.group.name if account.group else _("Ungrouped")
group_name = account.group.name
if group_id not in account_net_worth: if group_id not in account_net_worth:
account_net_worth[group_id] = {"name": group_name, "accounts": {}} account_net_worth[group_id] = {"name": group_name, "accounts": {}}
account_net_worth[group_id]["accounts"][account.id] = account_data
else: account_net_worth[group_id]["accounts"][account.id] = account_data
account_net_worth[ungrouped_id]["accounts"][account.id] = account_data
# Remove the "Ungrouped" category if it's empty # Remove the "Ungrouped" category if it's empty
if not account_net_worth[ungrouped_id]["accounts"]: if not account_net_worth[ungrouped_id]["accounts"]:
@@ -75,70 +87,172 @@ def calculate_account_net_worth():
return account_net_worth return account_net_worth
def calculate_net_worth(): def calculate_currency_net_worth():
accounts = Account.objects.all() # Calculate net worth and fetch currency details in a single query
net_worth_data = (
Transaction.objects.filter(is_paid=True)
.values(
"account__currency__name",
"account__currency__code",
"account__currency__prefix",
"account__currency__suffix",
"account__currency__decimal_places",
)
.annotate(
amount=Sum(
Case(
When(type=Transaction.Type.INCOME, then=F("amount")),
When(type=Transaction.Type.EXPENSE, then=-F("amount")),
default=0,
output_field=DecimalField(),
)
)
)
)
# Create the net worth dictionary from the query results
net_worth = {} net_worth = {}
for item in net_worth_data:
for account in accounts: currency_name = item["account__currency__name"]
currency = account.currency net_worth[currency_name] = {
if currency.code not in net_worth: "amount": item["amount"] or Decimal("0"),
net_worth[currency.code] = Decimal("0") "code": item["account__currency__code"],
"name": currency_name,
income = Transaction.objects.filter( "prefix": item["account__currency__prefix"],
account=account, type=Transaction.Type.INCOME, is_paid=True "suffix": item["account__currency__suffix"],
).aggregate(total=Sum("amount"))["total"] or Decimal("0") "decimal_places": item["account__currency__decimal_places"],
}
expenses = Transaction.objects.filter(
account=account, type=Transaction.Type.EXPENSE, is_paid=True
).aggregate(total=Sum("amount"))["total"] or Decimal("0")
account_balance = income - expenses
net_worth[currency.code] += account_balance
return net_worth return net_worth
def calculate_historical_net_worth(start_date, end_date): def calculate_historical_currency_net_worth():
asset_accounts = Account.objects.all() # Get all currencies and date range in a single query
currencies = Currency.objects.all() aggregates = Transaction.objects.aggregate(
min_date=Min("reference_date"),
max_date=Max("reference_date"),
)
currencies = list(Currency.objects.values_list("name", flat=True))
# Initialize the result dictionary start_date = aggregates["min_date"].replace(day=1)
historical_net_worth = {} end_date = aggregates["max_date"].replace(day=1)
# Get all months between start_date and end_date # Calculate cumulative balances for each account, currency, and month
months = ( cumulative_balances = (
Transaction.objects.filter(account__in=asset_accounts) Transaction.objects.filter(is_paid=True)
.annotate(month=TruncMonth("reference_date")) .annotate(month=TruncMonth("reference_date"))
.values("month") .values("account__currency__name", "month")
.distinct() .annotate(
.order_by("month") balance=Sum(
Case(
When(type=Transaction.Type.INCOME, then=F("amount")),
When(type=Transaction.Type.EXPENSE, then=-F("amount")),
default=Value(0),
output_field=DecimalField(),
)
)
)
.order_by("month", "account__currency__name")
) )
for month_data in months: # Create a dictionary to store cumulative balances
month = month_data["month"] balance_dict = {}
month_str = month.strftime("%Y-%m") for b in cumulative_balances:
historical_net_worth[month_str] = { month = b["month"]
currency.code: Decimal("0.00") for currency in currencies currency = b["account__currency__name"]
} if month not in balance_dict:
balance_dict[month] = {}
balance_dict[month][currency] = b["balance"]
for account in asset_accounts: # Initialize the result dictionary
currency = account.currency historical_net_worth = OrderedDict()
income = Transaction.objects.filter( # Calculate running totals for each month
account=account, running_totals = {currency: Decimal("0.00") for currency in currencies}
type=Transaction.Type.INCOME, last_recorded_totals = running_totals.copy()
is_paid=True,
reference_date__lte=month,
).aggregate(total=Sum("amount"))["total"] or Decimal("0.00")
expenses = Transaction.objects.filter( current_month = start_date
account=account, while current_month <= end_date:
type=Transaction.Type.EXPENSE, month_str = date_filter(current_month, "b Y")
is_paid=True, totals_changed = False
reference_date__lte=month,
).aggregate(total=Sum("amount"))["total"] or Decimal("0.00")
account_balance = income - expenses for currency in currencies:
historical_net_worth[month_str][currency.code] += account_balance balance_change = balance_dict.get(current_month, {}).get(
currency, Decimal("0.00")
)
running_totals[currency] += balance_change
if balance_change != Decimal("0.00"):
totals_changed = True
if totals_changed:
historical_net_worth[month_str] = running_totals.copy()
last_recorded_totals = running_totals.copy()
current_month += relativedelta(months=1)
# Ensure the last month is always included
if historical_net_worth and list(historical_net_worth.keys())[-1] != date_filter(
end_date, "b Y"
):
historical_net_worth[date_filter(end_date, "b Y")] = last_recorded_totals
return historical_net_worth return historical_net_worth
def calculate_historical_account_balance():
# Get all accounts
accounts = Account.objects.all()
# Get the date range
date_range = Transaction.objects.aggregate(
min_date=Min("reference_date"), max_date=Max("reference_date")
)
start_date = date_range["min_date"].replace(day=1)
end_date = date_range["max_date"].replace(day=1)
# Calculate balances for each account and month
balances = (
Transaction.objects.filter(is_paid=True)
.annotate(month=TruncMonth("reference_date"))
.values("account", "month")
.annotate(balance=Sum("amount"))
.order_by("account", "month")
)
# Organize data by account and month
account_balances = defaultdict(lambda: defaultdict(Decimal))
for balance in balances:
account_balances[balance["account"]][balance["month"]] += balance["balance"]
# Prepare the result
historical_account_balance = OrderedDict()
current_date = start_date
previous_balances = {account.id: Decimal("0") for account in accounts}
while current_date <= end_date:
month_data = {}
has_changes = False
for account in accounts:
running_balance = previous_balances[account.id] + account_balances[
account.id
].get(current_date, Decimal("0"))
if running_balance != previous_balances[account.id]:
has_changes = True
month_data[account.name] = running_balance
previous_balances[account.id] = running_balance
if has_changes or not historical_account_balance:
historical_account_balance[date_filter(current_date, "b Y")] = month_data
current_date += relativedelta(months=1)
# Ensure the last month is always included
if historical_account_balance and list(historical_account_balance.keys())[
-1
] != date_filter(end_date, "b Y"):
historical_account_balance[date_filter(end_date, "b Y")] = month_data
return historical_account_balance