Merge pull request #130

feat: internal code for automatic exchange rate fetching
This commit is contained in:
Herculino Trotta
2025-02-03 00:26:19 -03:00
committed by GitHub
9 changed files with 525 additions and 1 deletions

View File

@@ -1,6 +1,6 @@
from django.contrib import admin
from apps.currencies.models import Currency, ExchangeRate
from apps.currencies.models import Currency, ExchangeRate, ExchangeRateService
@admin.register(Currency)
@@ -11,4 +11,18 @@ class CurrencyAdmin(admin.ModelAdmin):
return super().formfield_for_dbfield(db_field, request, **kwargs)
@admin.register(ExchangeRateService)
class ExchangeRateServiceAdmin(admin.ModelAdmin):
list_display = [
"name",
"service_type",
"is_active",
"fetch_interval_hours",
"last_fetch",
]
list_filter = ["is_active", "service_type"]
search_fields = ["name"]
filter_horizontal = ["target_currencies"]
admin.site.register(ExchangeRate)

View File

@@ -0,0 +1,30 @@
from abc import ABC, abstractmethod
from decimal import Decimal
from typing import List, Tuple, Optional
from django.db.models import QuerySet
from apps.currencies.models import Currency
class ExchangeRateProvider(ABC):
rates_inverted = False
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
@abstractmethod
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
"""Fetch exchange rates for multiple currency pairs"""
raise NotImplementedError("Subclasses must implement get_rates method")
@classmethod
def requires_api_key(cls) -> bool:
"""Return True if the service requires an API key"""
return True
@staticmethod
def invert_rate(rate: Decimal) -> Decimal:
"""Invert the given rate."""
return Decimal("1") / rate

View File

@@ -0,0 +1,162 @@
import logging
from datetime import timedelta
from django.db.models import QuerySet
from django.utils import timezone
from apps.currencies.exchange_rates.providers import (
SynthFinanceProvider,
CoinGeckoProvider,
)
from apps.currencies.models import ExchangeRateService, ExchangeRate, Currency
logger = logging.getLogger(__name__)
# Map service types to provider classes
PROVIDER_MAPPING = {
"synth_finance": SynthFinanceProvider,
"coingecko": CoinGeckoProvider,
}
class ExchangeRateFetcher:
@staticmethod
def fetch_due_rates(force: bool = False) -> None:
"""
Fetch rates for all services that are due for update.
Args:
force (bool): If True, fetches all active services regardless of their last fetch time.
If False, only fetches services that are due according to their interval.
"""
services = ExchangeRateService.objects.filter(is_active=True)
current_time = timezone.now()
for service in services:
try:
if force:
logger.info(f"Force fetching rates for {service.name}")
ExchangeRateFetcher._fetch_service_rates(service)
continue
# Regular schedule-based fetching
if service.last_fetch is None:
logger.info(f"First fetch for service: {service.name}")
ExchangeRateFetcher._fetch_service_rates(service)
continue
# Calculate when the next fetch should occur
next_fetch_due = service.last_fetch + timedelta(
hours=service.fetch_interval_hours
)
# Check if it's time for the next fetch
if current_time >= next_fetch_due:
logger.info(
f"Fetching rates for {service.name}. "
f"Last fetch: {service.last_fetch}, "
f"Interval: {service.fetch_interval_hours}h"
)
ExchangeRateFetcher._fetch_service_rates(service)
else:
logger.debug(
f"Skipping {service.name}. "
f"Next fetch due at: {next_fetch_due}"
)
except Exception as e:
logger.error(f"Error checking fetch schedule for {service.name}: {e}")
@staticmethod
def _get_unique_currency_pairs(
service: ExchangeRateService,
) -> tuple[QuerySet, set]:
"""
Get unique currency pairs from both target_currencies and target_accounts
Returns a tuple of (target_currencies QuerySet, exchange_currencies set)
"""
# Get currencies from target_currencies
target_currencies = set(service.target_currencies.all())
# Add currencies from target_accounts
for account in service.target_accounts.all():
if account.currency and account.exchange_currency:
target_currencies.add(account.currency)
# Convert back to QuerySet for compatibility with existing code
target_currencies_qs = Currency.objects.filter(
id__in=[curr.id for curr in target_currencies]
)
# Get unique exchange currencies
exchange_currencies = set()
# From target_currencies
for currency in target_currencies:
if currency.exchange_currency:
exchange_currencies.add(currency.exchange_currency)
# From target_accounts
for account in service.target_accounts.all():
if account.exchange_currency:
exchange_currencies.add(account.exchange_currency)
return target_currencies_qs, exchange_currencies
@staticmethod
def _fetch_service_rates(service: ExchangeRateService) -> None:
"""Fetch rates for a specific service"""
try:
provider = service.get_provider()
# Check if API key is required but missing
if provider.requires_api_key() and not service.api_key:
logger.error(f"API key required but not provided for {service.name}")
return
# Get unique currency pairs from both sources
target_currencies, exchange_currencies = (
ExchangeRateFetcher._get_unique_currency_pairs(service)
)
# Skip if no currencies to process
if not target_currencies or not exchange_currencies:
logger.info(f"No currency pairs to process for service {service.name}")
return
rates = provider.get_rates(target_currencies, exchange_currencies)
# Track processed currency pairs to avoid duplicates
processed_pairs = set()
for from_currency, to_currency, rate in rates:
# Create a unique identifier for this currency pair
pair_key = (from_currency.id, to_currency.id)
if pair_key in processed_pairs:
continue
if provider.rates_inverted:
# If rates are inverted, we need to swap currencies
ExchangeRate.objects.create(
from_currency=to_currency,
to_currency=from_currency,
rate=rate,
date=timezone.now(),
)
processed_pairs.add((to_currency.id, from_currency.id))
else:
# If rates are not inverted, we can use them as is
ExchangeRate.objects.create(
from_currency=from_currency,
to_currency=to_currency,
rate=rate,
date=timezone.now(),
)
processed_pairs.add((from_currency.id, to_currency.id))
service.last_fetch = timezone.now()
service.save()
except Exception as e:
logger.error(f"Error fetching rates for {service.name}: {e}")

View File

@@ -0,0 +1,140 @@
import logging
import time
import requests
from decimal import Decimal
from typing import Tuple, List
from django.db.models import QuerySet
from apps.currencies.models import Currency
from apps.currencies.exchange_rates.base import ExchangeRateProvider
logger = logging.getLogger(__name__)
class SynthFinanceProvider(ExchangeRateProvider):
"""Implementation for Synth Finance API (synthfinance.com)"""
BASE_URL = "https://api.synthfinance.com/rates/live"
rates_inverted = False # SynthFinance returns non-inverted rates
def __init__(self, api_key: str = None):
super().__init__(api_key)
self.session = requests.Session()
self.session.headers.update({"Authorization": f"Bearer {self.api_key}"})
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
results = []
currency_groups = {}
for currency in target_currencies:
if currency.exchange_currency in exchange_currencies:
group = currency_groups.setdefault(currency.exchange_currency.code, [])
group.append(currency)
for base_currency, currencies in currency_groups.items():
try:
to_currencies = ",".join(
currency.code
for currency in currencies
if currency.code != base_currency
)
response = self.session.get(
f"{self.BASE_URL}",
params={"from": base_currency, "to": to_currencies},
)
response.raise_for_status()
data = response.json()
rates = data["data"]["rates"]
for currency in currencies:
if currency.code == base_currency:
rate = Decimal("1")
else:
rate = Decimal(str(rates[currency.code]))
# Return the rate as is, without inversion
results.append((currency.exchange_currency, currency, rate))
credits_used = data["meta"]["credits_used"]
credits_remaining = data["meta"]["credits_remaining"]
logger.info(
f"Synth Finance API call: {credits_used} credits used, {credits_remaining} remaining"
)
except requests.RequestException as e:
logger.error(
f"Error fetching rates from Synth Finance API for base {base_currency}: {e}"
)
except KeyError as e:
logger.error(
f"Unexpected response structure from Synth Finance API for base {base_currency}: {e}"
)
except Exception as e:
logger.error(
f"Unexpected error processing Synth Finance data for base {base_currency}: {e}"
)
return results
class CoinGeckoProvider(ExchangeRateProvider):
"""Implementation for CoinGecko API"""
BASE_URL = "https://api.coingecko.com/api/v3"
rates_inverted = True
def __init__(self, api_key: str):
super().__init__(api_key)
self.session = requests.Session()
self.session.headers.update({"x-cg-demo-api-key": api_key})
@classmethod
def requires_api_key(cls) -> bool:
return True
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
results = []
all_currencies = set(currency.code.lower() for currency in target_currencies)
all_currencies.update(currency.code.lower() for currency in exchange_currencies)
try:
response = self.session.get(
f"{self.BASE_URL}/simple/price",
params={
"ids": ",".join(all_currencies),
"vs_currencies": ",".join(all_currencies),
},
)
response.raise_for_status()
rates_data = response.json()
for target_currency in target_currencies:
if target_currency.exchange_currency in exchange_currencies:
try:
rate = Decimal(
str(
rates_data[target_currency.code.lower()][
target_currency.exchange_currency.code.lower()
]
)
)
# The rate is already inverted, so we don't need to invert it again
results.append(
(target_currency.exchange_currency, target_currency, rate)
)
except KeyError:
logger.error(
f"Rate not found for {target_currency.code} or {target_currency.exchange_currency.code}"
)
except Exception as e:
logger.error(
f"Error calculating rate for {target_currency.code}: {e}"
)
time.sleep(1) # CoinGecko allows 10-30 calls/minute for free tier
except requests.RequestException as e:
logger.error(f"Error fetching rates from CoinGecko API: {e}")
return results

View File

@@ -0,0 +1,32 @@
# Generated by Django 5.1.5 on 2025-02-02 20:35
import django.core.validators
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('currencies', '0006_currency_exchange_currency'),
]
operations = [
migrations.CreateModel(
name='ExchangeRateService',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=255, unique=True, verbose_name='Service Name')),
('service_type', models.CharField(choices=[('synth_finance', 'Synth Finance'), ('coingecko', 'CoinGecko')], max_length=255, verbose_name='Service Type')),
('is_active', models.BooleanField(default=True, verbose_name='Active')),
('api_key', models.CharField(blank=True, help_text='API key for the service (if required)', max_length=255, null=True, verbose_name='API Key')),
('fetch_interval_hours', models.PositiveIntegerField(default=24, validators=[django.core.validators.MinValueValidator(1)], verbose_name='Fetch Interval (hours)')),
('last_fetch', models.DateTimeField(blank=True, null=True, verbose_name='Last Successful Fetch')),
('target_currencies', models.ManyToManyField(help_text='Select currencies to fetch exchange rates for. Rates will be fetched for each currency against their exchange_currency.', related_name='exchange_services', to='currencies.currency', verbose_name='Target Currencies')),
],
options={
'verbose_name': 'Exchange Rate Service',
'verbose_name_plural': 'Exchange Rate Services',
'ordering': ['name'],
},
),
]

View File

@@ -0,0 +1,24 @@
# Generated by Django 5.1.5 on 2025-02-03 01:52
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('accounts', '0008_alter_account_name'),
('currencies', '0007_exchangerateservice'),
]
operations = [
migrations.AddField(
model_name='exchangerateservice',
name='target_accounts',
field=models.ManyToManyField(help_text="Select accounts to fetch exchange rates for. Rates will be fetched for each account's currency against their set exchange currency.", related_name='exchange_services', to='accounts.account', verbose_name='Target Accounts'),
),
migrations.AlterField(
model_name='exchangerateservice',
name='target_currencies',
field=models.ManyToManyField(help_text='Select currencies to fetch exchange rates for. Rates will be fetched for each currency against their set exchange currency.', related_name='exchange_services', to='currencies.currency', verbose_name='Target Currencies'),
),
]

View File

@@ -0,0 +1,24 @@
# Generated by Django 5.1.5 on 2025-02-03 01:55
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('accounts', '0008_alter_account_name'),
('currencies', '0008_exchangerateservice_target_accounts_and_more'),
]
operations = [
migrations.AlterField(
model_name='exchangerateservice',
name='target_accounts',
field=models.ManyToManyField(blank=True, help_text="Select accounts to fetch exchange rates for. Rates will be fetched for each account's currency against their set exchange currency.", related_name='exchange_services', to='accounts.account', verbose_name='Target Accounts'),
),
migrations.AlterField(
model_name='exchangerateservice',
name='target_currencies',
field=models.ManyToManyField(blank=True, help_text='Select currencies to fetch exchange rates for. Rates will be fetched for each currency against their set exchange currency.', related_name='exchange_services', to='currencies.currency', verbose_name='Target Currencies'),
),
]

View File

@@ -1,8 +1,12 @@
import logging
from django.core.exceptions import ValidationError
from django.core.validators import MaxValueValidator, MinValueValidator
from django.db import models
from django.utils.translation import gettext_lazy as _
logger = logging.getLogger(__name__)
class Currency(models.Model):
code = models.CharField(max_length=10, unique=True, verbose_name=_("Currency Code"))
@@ -78,3 +82,66 @@ class ExchangeRate(models.Model):
raise ValidationError(
{"to_currency": _("From and To currencies cannot be the same.")}
)
class ExchangeRateService(models.Model):
"""Configuration for exchange rate services"""
class ServiceType(models.TextChoices):
SYNTH_FINANCE = "synth_finance", "Synth Finance"
COINGECKO = "coingecko", "CoinGecko"
name = models.CharField(max_length=255, unique=True, verbose_name=_("Service Name"))
service_type = models.CharField(
max_length=255, choices=ServiceType.choices, verbose_name=_("Service Type")
)
is_active = models.BooleanField(default=True, verbose_name=_("Active"))
api_key = models.CharField(
max_length=255,
blank=True,
null=True,
verbose_name=_("API Key"),
help_text=_("API key for the service (if required)"),
)
fetch_interval_hours = models.PositiveIntegerField(
default=24,
validators=[MinValueValidator(1)],
verbose_name=_("Fetch Interval (hours)"),
)
last_fetch = models.DateTimeField(
null=True, blank=True, verbose_name=_("Last Successful Fetch")
)
target_currencies = models.ManyToManyField(
Currency,
verbose_name=_("Target Currencies"),
help_text=_(
"Select currencies to fetch exchange rates for. Rates will be fetched for each currency against their set exchange currency."
),
related_name="exchange_services",
blank=True,
)
target_accounts = models.ManyToManyField(
"accounts.Account",
verbose_name=_("Target Accounts"),
help_text=_(
"Select accounts to fetch exchange rates for. Rates will be fetched for each account's currency against their set exchange currency."
),
related_name="exchange_services",
blank=True,
)
class Meta:
verbose_name = _("Exchange Rate Service")
verbose_name_plural = _("Exchange Rate Services")
ordering = ["name"]
def __str__(self):
return self.name
def get_provider(self):
from apps.currencies.exchange_rates.fetcher import PROVIDER_MAPPING
provider_class = PROVIDER_MAPPING[self.service_type]
return provider_class(self.api_key)

View File

@@ -0,0 +1,31 @@
import logging
from procrastinate.contrib.django import app
from apps.currencies.exchange_rates.fetcher import ExchangeRateFetcher
from apps.currencies.models import ExchangeRateService
from django.utils import timezone
logger = logging.getLogger(__name__)
@app.periodic(cron="0 * * * *") # Run every hour
@app.task(name="automatic_fetch_exchange_rates")
def automatic_fetch_exchange_rates(timestamp=None):
"""Fetch exchange rates for all due services"""
fetcher = ExchangeRateFetcher()
try:
fetcher.fetch_due_rates()
except Exception as e:
logger.error(e, exc_info=True)
@app.task(name="manual_fetch_exchange_rates")
def manual_fetch_exchange_rates(timestamp=None):
"""Fetch exchange rates for all due services"""
fetcher = ExchangeRateFetcher()
try:
fetcher.fetch_due_rates(force=True)
except Exception as e:
logger.error(e, exc_info=True)