feat: internal code for automatic exchange rate fetching

This commit is contained in:
Herculino Trotta
2025-02-03 00:26:00 -03:00
parent aa8abe0e1c
commit f7768c8658
9 changed files with 525 additions and 1 deletions
@@ -0,0 +1,30 @@
from abc import ABC, abstractmethod
from decimal import Decimal
from typing import List, Tuple, Optional
from django.db.models import QuerySet
from apps.currencies.models import Currency
class ExchangeRateProvider(ABC):
rates_inverted = False
def __init__(self, api_key: Optional[str] = None):
self.api_key = api_key
@abstractmethod
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
"""Fetch exchange rates for multiple currency pairs"""
raise NotImplementedError("Subclasses must implement get_rates method")
@classmethod
def requires_api_key(cls) -> bool:
"""Return True if the service requires an API key"""
return True
@staticmethod
def invert_rate(rate: Decimal) -> Decimal:
"""Invert the given rate."""
return Decimal("1") / rate
@@ -0,0 +1,162 @@
import logging
from datetime import timedelta
from django.db.models import QuerySet
from django.utils import timezone
from apps.currencies.exchange_rates.providers import (
SynthFinanceProvider,
CoinGeckoProvider,
)
from apps.currencies.models import ExchangeRateService, ExchangeRate, Currency
logger = logging.getLogger(__name__)
# Map service types to provider classes
PROVIDER_MAPPING = {
"synth_finance": SynthFinanceProvider,
"coingecko": CoinGeckoProvider,
}
class ExchangeRateFetcher:
@staticmethod
def fetch_due_rates(force: bool = False) -> None:
"""
Fetch rates for all services that are due for update.
Args:
force (bool): If True, fetches all active services regardless of their last fetch time.
If False, only fetches services that are due according to their interval.
"""
services = ExchangeRateService.objects.filter(is_active=True)
current_time = timezone.now()
for service in services:
try:
if force:
logger.info(f"Force fetching rates for {service.name}")
ExchangeRateFetcher._fetch_service_rates(service)
continue
# Regular schedule-based fetching
if service.last_fetch is None:
logger.info(f"First fetch for service: {service.name}")
ExchangeRateFetcher._fetch_service_rates(service)
continue
# Calculate when the next fetch should occur
next_fetch_due = service.last_fetch + timedelta(
hours=service.fetch_interval_hours
)
# Check if it's time for the next fetch
if current_time >= next_fetch_due:
logger.info(
f"Fetching rates for {service.name}. "
f"Last fetch: {service.last_fetch}, "
f"Interval: {service.fetch_interval_hours}h"
)
ExchangeRateFetcher._fetch_service_rates(service)
else:
logger.debug(
f"Skipping {service.name}. "
f"Next fetch due at: {next_fetch_due}"
)
except Exception as e:
logger.error(f"Error checking fetch schedule for {service.name}: {e}")
@staticmethod
def _get_unique_currency_pairs(
service: ExchangeRateService,
) -> tuple[QuerySet, set]:
"""
Get unique currency pairs from both target_currencies and target_accounts
Returns a tuple of (target_currencies QuerySet, exchange_currencies set)
"""
# Get currencies from target_currencies
target_currencies = set(service.target_currencies.all())
# Add currencies from target_accounts
for account in service.target_accounts.all():
if account.currency and account.exchange_currency:
target_currencies.add(account.currency)
# Convert back to QuerySet for compatibility with existing code
target_currencies_qs = Currency.objects.filter(
id__in=[curr.id for curr in target_currencies]
)
# Get unique exchange currencies
exchange_currencies = set()
# From target_currencies
for currency in target_currencies:
if currency.exchange_currency:
exchange_currencies.add(currency.exchange_currency)
# From target_accounts
for account in service.target_accounts.all():
if account.exchange_currency:
exchange_currencies.add(account.exchange_currency)
return target_currencies_qs, exchange_currencies
@staticmethod
def _fetch_service_rates(service: ExchangeRateService) -> None:
"""Fetch rates for a specific service"""
try:
provider = service.get_provider()
# Check if API key is required but missing
if provider.requires_api_key() and not service.api_key:
logger.error(f"API key required but not provided for {service.name}")
return
# Get unique currency pairs from both sources
target_currencies, exchange_currencies = (
ExchangeRateFetcher._get_unique_currency_pairs(service)
)
# Skip if no currencies to process
if not target_currencies or not exchange_currencies:
logger.info(f"No currency pairs to process for service {service.name}")
return
rates = provider.get_rates(target_currencies, exchange_currencies)
# Track processed currency pairs to avoid duplicates
processed_pairs = set()
for from_currency, to_currency, rate in rates:
# Create a unique identifier for this currency pair
pair_key = (from_currency.id, to_currency.id)
if pair_key in processed_pairs:
continue
if provider.rates_inverted:
# If rates are inverted, we need to swap currencies
ExchangeRate.objects.create(
from_currency=to_currency,
to_currency=from_currency,
rate=rate,
date=timezone.now(),
)
processed_pairs.add((to_currency.id, from_currency.id))
else:
# If rates are not inverted, we can use them as is
ExchangeRate.objects.create(
from_currency=from_currency,
to_currency=to_currency,
rate=rate,
date=timezone.now(),
)
processed_pairs.add((from_currency.id, to_currency.id))
service.last_fetch = timezone.now()
service.save()
except Exception as e:
logger.error(f"Error fetching rates for {service.name}: {e}")
@@ -0,0 +1,140 @@
import logging
import time
import requests
from decimal import Decimal
from typing import Tuple, List
from django.db.models import QuerySet
from apps.currencies.models import Currency
from apps.currencies.exchange_rates.base import ExchangeRateProvider
logger = logging.getLogger(__name__)
class SynthFinanceProvider(ExchangeRateProvider):
"""Implementation for Synth Finance API (synthfinance.com)"""
BASE_URL = "https://api.synthfinance.com/rates/live"
rates_inverted = False # SynthFinance returns non-inverted rates
def __init__(self, api_key: str = None):
super().__init__(api_key)
self.session = requests.Session()
self.session.headers.update({"Authorization": f"Bearer {self.api_key}"})
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
results = []
currency_groups = {}
for currency in target_currencies:
if currency.exchange_currency in exchange_currencies:
group = currency_groups.setdefault(currency.exchange_currency.code, [])
group.append(currency)
for base_currency, currencies in currency_groups.items():
try:
to_currencies = ",".join(
currency.code
for currency in currencies
if currency.code != base_currency
)
response = self.session.get(
f"{self.BASE_URL}",
params={"from": base_currency, "to": to_currencies},
)
response.raise_for_status()
data = response.json()
rates = data["data"]["rates"]
for currency in currencies:
if currency.code == base_currency:
rate = Decimal("1")
else:
rate = Decimal(str(rates[currency.code]))
# Return the rate as is, without inversion
results.append((currency.exchange_currency, currency, rate))
credits_used = data["meta"]["credits_used"]
credits_remaining = data["meta"]["credits_remaining"]
logger.info(
f"Synth Finance API call: {credits_used} credits used, {credits_remaining} remaining"
)
except requests.RequestException as e:
logger.error(
f"Error fetching rates from Synth Finance API for base {base_currency}: {e}"
)
except KeyError as e:
logger.error(
f"Unexpected response structure from Synth Finance API for base {base_currency}: {e}"
)
except Exception as e:
logger.error(
f"Unexpected error processing Synth Finance data for base {base_currency}: {e}"
)
return results
class CoinGeckoProvider(ExchangeRateProvider):
"""Implementation for CoinGecko API"""
BASE_URL = "https://api.coingecko.com/api/v3"
rates_inverted = True
def __init__(self, api_key: str):
super().__init__(api_key)
self.session = requests.Session()
self.session.headers.update({"x-cg-demo-api-key": api_key})
@classmethod
def requires_api_key(cls) -> bool:
return True
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
results = []
all_currencies = set(currency.code.lower() for currency in target_currencies)
all_currencies.update(currency.code.lower() for currency in exchange_currencies)
try:
response = self.session.get(
f"{self.BASE_URL}/simple/price",
params={
"ids": ",".join(all_currencies),
"vs_currencies": ",".join(all_currencies),
},
)
response.raise_for_status()
rates_data = response.json()
for target_currency in target_currencies:
if target_currency.exchange_currency in exchange_currencies:
try:
rate = Decimal(
str(
rates_data[target_currency.code.lower()][
target_currency.exchange_currency.code.lower()
]
)
)
# The rate is already inverted, so we don't need to invert it again
results.append(
(target_currency.exchange_currency, target_currency, rate)
)
except KeyError:
logger.error(
f"Rate not found for {target_currency.code} or {target_currency.exchange_currency.code}"
)
except Exception as e:
logger.error(
f"Error calculating rate for {target_currency.code}: {e}"
)
time.sleep(1) # CoinGecko allows 10-30 calls/minute for free tier
except requests.RequestException as e:
logger.error(f"Error fetching rates from CoinGecko API: {e}")
return results