Files
WYGIWYH/app/apps/currencies/exchange_rates/providers.py
2025-08-17 03:54:10 -03:00

506 lines
19 KiB
Python

import logging
import time
import requests
from decimal import Decimal
from typing import Tuple, List, Optional, Dict
from django.db.models import QuerySet
from apps.currencies.models import Currency, ExchangeRate
from apps.currencies.exchange_rates.base import ExchangeRateProvider
logger = logging.getLogger(__name__)
class CoinGeckoFreeProvider(ExchangeRateProvider):
"""Implementation for CoinGecko Free API"""
BASE_URL = "https://api.coingecko.com/api/v3"
rates_inverted = True
def __init__(self, api_key: str):
super().__init__(api_key)
self.session = requests.Session()
self.session.headers.update({"x-cg-demo-api-key": api_key})
@classmethod
def requires_api_key(cls) -> bool:
return True
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
results = []
all_currencies = set(currency.code.lower() for currency in target_currencies)
all_currencies.update(currency.code.lower() for currency in exchange_currencies)
try:
response = self.session.get(
f"{self.BASE_URL}/simple/price",
params={
"ids": ",".join(all_currencies),
"vs_currencies": ",".join(all_currencies),
},
)
response.raise_for_status()
rates_data = response.json()
for target_currency in target_currencies:
if target_currency.exchange_currency in exchange_currencies:
try:
rate = Decimal(
str(
rates_data[target_currency.code.lower()][
target_currency.exchange_currency.code.lower()
]
)
)
# The rate is already inverted, so we don't need to invert it again
results.append(
(target_currency.exchange_currency, target_currency, rate)
)
except KeyError:
logger.error(
f"Rate not found for {target_currency.code} or {target_currency.exchange_currency.code}"
)
except Exception as e:
logger.error(
f"Error calculating rate for {target_currency.code}: {e}"
)
time.sleep(1) # CoinGecko allows 10-30 calls/minute for free tier
except requests.RequestException as e:
logger.error(f"Error fetching rates from CoinGecko API: {e}")
return results
class CoinGeckoProProvider(CoinGeckoFreeProvider):
"""Implementation for CoinGecko Pro API"""
BASE_URL = "https://pro-api.coingecko.com/api/v3/simple/price"
rates_inverted = True
def __init__(self, api_key: str):
super().__init__(api_key)
self.session = requests.Session()
self.session.headers.update({"x-cg-pro-api-key": api_key})
class TransitiveRateProvider(ExchangeRateProvider):
"""Calculates exchange rates through paths of existing rates"""
rates_inverted = True
def __init__(self, api_key: str = None):
super().__init__(api_key) # API key not needed but maintaining interface
@classmethod
def requires_api_key(cls) -> bool:
return False
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
results = []
# Get recent rates for building the graph
recent_rates = ExchangeRate.objects.all()
# Build currency graph
currency_graph = self._build_currency_graph(recent_rates)
for target in target_currencies:
if (
not target.exchange_currency
or target.exchange_currency not in exchange_currencies
):
continue
# Find path and calculate rate
from_id = target.exchange_currency.id
to_id = target.id
path, rate = self._find_conversion_path(currency_graph, from_id, to_id)
if path and rate:
path_codes = [Currency.objects.get(id=cid).code for cid in path]
logger.info(
f"Found conversion path: {' -> '.join(path_codes)}, rate: {rate}"
)
results.append((target.exchange_currency, target, rate))
else:
logger.debug(
f"No conversion path found for {target.exchange_currency.code}->{target.code}"
)
return results
@staticmethod
def _build_currency_graph(rates) -> Dict[int, Dict[int, Decimal]]:
"""Build a graph representation of currency relationships"""
graph = {}
for rate in rates:
# Add both directions to make the graph bidirectional
if rate.from_currency_id not in graph:
graph[rate.from_currency_id] = {}
graph[rate.from_currency_id][rate.to_currency_id] = rate.rate
if rate.to_currency_id not in graph:
graph[rate.to_currency_id] = {}
graph[rate.to_currency_id][rate.from_currency_id] = Decimal("1") / rate.rate
return graph
@staticmethod
def _find_conversion_path(
graph, from_id, to_id
) -> Tuple[Optional[list], Optional[Decimal]]:
"""Find the shortest path between currencies using breadth-first search"""
if from_id not in graph or to_id not in graph:
return None, None
queue = [(from_id, [from_id], Decimal("1"))]
visited = {from_id}
while queue:
current, path, current_rate = queue.pop(0)
if current == to_id:
return path, current_rate
for neighbor, rate in graph.get(current, {}).items():
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor], current_rate * rate))
return None, None
class FrankfurterProvider(ExchangeRateProvider):
"""Implementation for the Frankfurter API (frankfurter.dev)"""
BASE_URL = "https://api.frankfurter.dev/v1/latest"
rates_inverted = (
False # Frankfurter returns non-inverted rates (e.g., 1 EUR = 1.1 USD)
)
def __init__(self, api_key: str = None):
"""
Initializes the provider. The Frankfurter API does not require an API key,
so the api_key parameter is ignored.
"""
super().__init__(api_key)
self.session = requests.Session()
@classmethod
def requires_api_key(cls) -> bool:
return False
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
results = []
currency_groups = {}
# Group target currencies by their exchange (base) currency to minimize API calls
for currency in target_currencies:
if currency.exchange_currency in exchange_currencies:
group = currency_groups.setdefault(currency.exchange_currency.code, [])
group.append(currency)
# Make one API call for each base currency
for base_currency, currencies in currency_groups.items():
try:
# Create a comma-separated list of target currency codes
to_currencies = ",".join(
currency.code
for currency in currencies
if currency.code != base_currency
)
# If there are no target currencies other than the base, skip the API call
if not to_currencies:
# Handle the case where the only request is for the base rate (e.g., USD to USD)
for currency in currencies:
if currency.code == base_currency:
results.append(
(currency.exchange_currency, currency, Decimal("1"))
)
continue
response = self.session.get(
self.BASE_URL,
params={"base": base_currency, "symbols": to_currencies},
)
response.raise_for_status()
data = response.json()
rates = data["rates"]
# Process the returned rates
for currency in currencies:
if currency.code == base_currency:
# The rate for the base currency to itself is always 1
rate = Decimal("1")
else:
rate = Decimal(str(rates[currency.code]))
results.append((currency.exchange_currency, currency, rate))
except requests.RequestException as e:
logger.error(
f"Error fetching rates from Frankfurter API for base {base_currency}: {e}"
)
except KeyError as e:
logger.error(
f"Unexpected response structure from Frankfurter API for base {base_currency}: {e}"
)
except Exception as e:
logger.error(
f"Unexpected error processing Frankfurter data for base {base_currency}: {e}"
)
return results
class TwelveDataProvider(ExchangeRateProvider):
"""Implementation for the Twelve Data API (twelvedata.com)"""
BASE_URL = "https://api.twelvedata.com/exchange_rate"
rates_inverted = (
False # The API returns direct rates, e.g., for EUR/USD it's 1 EUR = X USD
)
def __init__(self, api_key: str):
"""
Initializes the provider with an API key and a requests session.
"""
super().__init__(api_key)
self.session = requests.Session()
@classmethod
def requires_api_key(cls) -> bool:
"""This provider requires an API key."""
return True
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
"""
Fetches exchange rates from the Twelve Data API for the given currency pairs.
This provider makes one API call for each requested currency pair.
"""
results = []
for target_currency in target_currencies:
# Ensure the target currency's exchange currency is one we're interested in
if target_currency.exchange_currency not in exchange_currencies:
continue
base_currency = target_currency.exchange_currency
# The exchange rate for the same currency is always 1
if base_currency.code == target_currency.code:
rate = Decimal("1")
results.append((base_currency, target_currency, rate))
continue
# Construct the symbol in the format "BASE/TARGET", e.g., "EUR/USD"
symbol = f"{base_currency.code}/{target_currency.code}"
try:
params = {
"symbol": symbol,
"apikey": self.api_key,
}
response = self.session.get(self.BASE_URL, params=params)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
data = response.json()
# The API may return an error message in a JSON object
if "rate" not in data:
error_message = data.get("message", "Rate not found in response.")
logger.error(
f"Could not fetch rate for {symbol} from Twelve Data: {error_message}"
)
continue
# Convert the rate to a Decimal for precision
rate = Decimal(str(data["rate"]))
results.append((base_currency, target_currency, rate))
logger.info(f"Successfully fetched rate for {symbol} from Twelve Data.")
time.sleep(
60
) # We sleep every pair as to not step over TwelveData's minute limit
except requests.RequestException as e:
logger.error(
f"Error fetching rate from Twelve Data API for symbol {symbol}: {e}"
)
except KeyError as e:
logger.error(
f"Unexpected response structure from Twelve Data API for symbol {symbol}: Missing key {e}"
)
except Exception as e:
logger.error(
f"An unexpected error occurred while processing Twelve Data for {symbol}: {e}"
)
return results
class TwelveDataMarketsProvider(ExchangeRateProvider):
"""
Provides prices for market instruments (stocks, ETFs, etc.) using the Twelve Data API.
This provider performs a multi-step process:
1. Parses instrument codes which can be symbols, FIGI, CUSIP, or ISIN.
2. For CUSIPs, it defaults the currency to USD. For all others, it searches
for the instrument to determine its native trading currency.
3. Fetches the latest price for the instrument in its native currency.
4. Converts the price to the requested target exchange currency.
"""
SYMBOL_SEARCH_URL = "https://api.twelvedata.com/symbol_search"
PRICE_URL = "https://api.twelvedata.com/price"
EXCHANGE_RATE_URL = "https://api.twelvedata.com/exchange_rate"
rates_inverted = True
def __init__(self, api_key: str):
super().__init__(api_key)
self.session = requests.Session()
@classmethod
def requires_api_key(cls) -> bool:
return True
def _parse_code(self, raw_code: str) -> Tuple[str, str]:
"""Parses the raw code to determine its type and value."""
if raw_code.startswith("figi:"):
return "figi", raw_code.removeprefix("figi:")
if raw_code.startswith("cusip:"):
return "cusip", raw_code.removeprefix("cusip:")
if raw_code.startswith("isin:"):
return "isin", raw_code.removeprefix("isin:")
return "symbol", raw_code
def get_rates(
self, target_currencies: QuerySet, exchange_currencies: set
) -> List[Tuple[Currency, Currency, Decimal]]:
results = []
for asset in target_currencies:
if asset.exchange_currency not in exchange_currencies:
continue
code_type, code_value = self._parse_code(asset.code)
original_currency_code = None
try:
# Determine the instrument's native currency
if code_type == "cusip":
# CUSIP codes always default to USD
original_currency_code = "USD"
logger.info(f"Defaulting CUSIP {code_value} to USD currency.")
else:
# For all other types, find currency via symbol search
search_params = {"symbol": code_value, "apikey": "demo"}
search_res = self.session.get(
self.SYMBOL_SEARCH_URL, params=search_params
)
search_res.raise_for_status()
search_data = search_res.json()
if not search_data.get("data"):
logger.warning(
f"TwelveDataMarkets: Symbol search for '{code_value}' returned no results."
)
continue
instrument_data = search_data["data"][0]
original_currency_code = instrument_data.get("currency")
if not original_currency_code:
logger.error(
f"TwelveDataMarkets: Could not determine original currency for '{code_value}'."
)
continue
# Get the instrument's price in its native currency
price_params = {code_type: code_value, "apikey": self.api_key}
price_res = self.session.get(self.PRICE_URL, params=price_params)
price_res.raise_for_status()
price_data = price_res.json()
if "price" not in price_data:
error_message = price_data.get(
"message", "Price key not found in response"
)
logger.error(
f"TwelveDataMarkets: Could not get price for {code_type} '{code_value}': {error_message}"
)
continue
price_in_original_currency = Decimal(price_data["price"])
# Convert price to the target exchange currency
target_exchange_currency = asset.exchange_currency
if (
original_currency_code.upper()
== target_exchange_currency.code.upper()
):
final_price = price_in_original_currency
else:
rate_symbol = (
f"{original_currency_code}/{target_exchange_currency.code}"
)
rate_params = {"symbol": rate_symbol, "apikey": self.api_key}
rate_res = self.session.get(
self.EXCHANGE_RATE_URL, params=rate_params
)
rate_res.raise_for_status()
rate_data = rate_res.json()
if "rate" not in rate_data:
error_message = rate_data.get(
"message", "Rate key not found in response"
)
logger.error(
f"TwelveDataMarkets: Could not get conversion rate for '{rate_symbol}': {error_message}"
)
continue
conversion_rate = Decimal(str(rate_data["rate"]))
final_price = price_in_original_currency * conversion_rate
results.append((target_exchange_currency, asset, final_price))
logger.info(
f"Successfully processed price for {asset.code} as {final_price} {target_exchange_currency.code}"
)
time.sleep(
60
) # We sleep every pair as to not step over TwelveData's minute limit
except requests.RequestException as e:
logger.error(
f"TwelveDataMarkets: API request failed for {code_value}: {e}"
)
except (KeyError, IndexError) as e:
logger.error(
f"TwelveDataMarkets: Error processing API response for {code_value}: {e}"
)
except Exception as e:
logger.error(
f"TwelveDataMarkets: An unexpected error occurred for {code_value}: {e}"
)
return results