Skip to content

Commit

Permalink
[DCA][Staggered] refactor optimize portfolio
Browse files Browse the repository at this point in the history
  • Loading branch information
GuillaumeDSM committed Oct 11, 2023
1 parent d52608d commit af81c92
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 63 deletions.
12 changes: 6 additions & 6 deletions Trading/Mode/dca_trading_mode/dca_trading.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,28 +664,28 @@ def get_current_state(self) -> (str, float):
else self.producers[0].final_eval
)

async def optimize_initial_portfolio(self, sellable_assets: list, tickers: dict = None) -> (list, dict):
async def optimize_initial_portfolio(self, sellable_assets: list, tickers: dict) -> list:
if not self.producers:
# nothing to do
return [], tickers
return []
target_asset = exchange_util.get_common_traded_quote(self.exchange_manager)
if target_asset is None:
self.logger.error(f"Impossible to optimize initial portfolio with different quotes in traded pairs")
return [], tickers
return []
async with self.producers[0].trading_mode_trigger():
if self.producers[0].producer_exchange_wide_lock(self.exchange_manager).locked():
# already locked by another trading mode instance: this other trading mode will do the rebalancing
self.logger.info(
f"Skipping portfolio optimization for trading mode with symbol {self.symbol}: "
f"portfolio optimization already in progress"
)
return [], tickers
return []
async with self.producers[0].producer_exchange_wide_lock(self.exchange_manager):
self.logger.info(f"Optimizing portfolio: selling {sellable_assets} to buy {target_asset}")
created_orders, tickers = await trading_modes.convert_assets_to_target_asset(
created_orders = await trading_modes.convert_assets_to_target_asset(
self, sellable_assets, target_asset, tickers
)
if not created_orders:
self.logger.info("Optimizing portfolio: no order to create")
await trading_modes.notify_portfolio_optimization_complete()
return created_orders, tickers
return created_orders
149 changes: 92 additions & 57 deletions Trading/Mode/staggered_orders_trading_mode/staggered_orders_trading.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,15 @@ def get_is_symbol_wildcard(cls) -> bool:
def set_default_config(self):
raise RuntimeError(f"Impossible to start {self.get_name()} without a valid configuration file.")

async def optimize_initial_portfolio(self, sellable_assets: list, tickers: dict = None) -> (list, dict):
async def optimize_initial_portfolio(self, sellable_assets: list, tickers: dict) -> list:
if not self.producers:
# nothing to do
return [], tickers
return []
producer = self.producers[0]
common_quote = exchange_util.get_common_traded_quote(self.exchange_manager)
if common_quote is None:
self.logger.error(f"Impossible to optimize initial portfolio with different quotes in traded pairs")
return [], tickers
return []
portfolio = self.exchange_manager.exchange_personal_data.portfolio_manager.portfolio
# first acquire trading mode lock to be sure we are not in during init phase
async with producer.trading_mode_trigger():
Expand All @@ -271,73 +271,108 @@ async def optimize_initial_portfolio(self, sellable_assets: list, tickers: dict
f"Skipping portfolio optimization for trading mode with symbol {self.symbol}: "
f"portfolio optimization already initialized"
)
return [], tickers
return []
async with producer.producer_exchange_wide_lock(self.exchange_manager):
self.logger.info(f"Starting portfolio optimization using trading mode with symbol {self.symbol}")
self.logger.info(f"Optimizing portfolio: cancelling existing open orders on "
f"{self.exchange_manager.exchange_config.traded_symbol_pairs}")
to_sell_assets = set(sellable_assets)
pair_bases = set()
configured_pairs = []
# 1. cancel open orders
cancelled_orders = []
for symbol in self.exchange_manager.exchange_config.traded_symbol_pairs:
if producer.get_symbol_trading_config(symbol) is not None:
configured_pairs.append(symbol)
pair_bases.add(symbol_util.parse_symbol(symbol).base)
for order in self.exchange_manager.exchange_personal_data.orders_manager.get_open_orders(
symbol=symbol
):
if not (order.is_cancelled() or order.is_closed()):
await self.cancel_order(order)
cancelled_orders.append(order)
try:
cancelled_orders = await self._cancel_associated_orders(producer, pair_bases)
except Exception as err:
self.logger.exception(err, True, f"Error during portfolio optimization cancel orders step: {err}")
cancelled_orders = []

# 2. convert assets to sell funds into target assets
to_sell_assets = to_sell_assets.union(pair_bases)
self.logger.info(f"Optimizing portfolio: selling {to_sell_assets} to buy {common_quote}")
# need portfolio available to be up-to-date with cancelled orders
part_1_orders, tickers = await trading_modes.convert_assets_to_target_asset(
self, list(to_sell_assets), common_quote, tickers
)
if part_1_orders:
await asyncio.gather(
*[
trading_personal_data.wait_for_order_fill(
order, producer.MISSING_MIRROR_ORDERS_MARKET_REBALANCE_TIMEOUT, True
) for order in part_1_orders
]
try:
part_1_orders = await self._convert_assets_into_target(
producer, pair_bases, common_quote, set(sellable_assets), tickers
)
except Exception as err:
self.logger.exception(
err, True, f"Error during portfolio optimization convert into target step: {err}"
)
part_1_orders = []

# 3. compute necessary funds for each configured_pairs
trading_pairs_count = len(pair_bases)
# need portfolio available to be up-to-date with balancing orders
kept_quote_amount = portfolio.portfolio[common_quote].available / decimal.Decimal(2)
converted_quote_amount_per_symbol = (
(portfolio.portfolio[common_quote].available - kept_quote_amount) /
decimal.Decimal(trading_pairs_count)
converted_quote_amount_per_symbol = self._get_converted_quote_amount_per_symbol(
portfolio, pair_bases, common_quote
)

# 4. buy assets
part_2_orders = []
for base in pair_bases:
self.logger.info(
f"Optimizing portfolio: buying {base} with "
f"{float(converted_quote_amount_per_symbol)} {common_quote}"
)
orders, tickers = await trading_modes.convert_asset_to_target_asset(
self, common_quote, base, asset_amount=converted_quote_amount_per_symbol, tickers=tickers
)
part_2_orders += orders
if part_2_orders:
await asyncio.gather(
*[
trading_personal_data.wait_for_order_fill(
order, producer.MISSING_MIRROR_ORDERS_MARKET_REBALANCE_TIMEOUT, True
) for order in part_2_orders
]
)
part_2_orders = await self._buy_assets(
producer, pair_bases, common_quote, converted_quote_amount_per_symbol, tickers
)

await trading_modes.notify_portfolio_optimization_complete()
return [cancelled_orders, part_1_orders, part_2_orders], tickers
return [cancelled_orders, part_1_orders, part_2_orders]

async def _cancel_associated_orders(self, producer, pair_bases) -> list:
cancelled_orders = []
self.logger.info(f"Optimizing portfolio: cancelling existing open orders on "
f"{self.exchange_manager.exchange_config.traded_symbol_pairs}")
for symbol in self.exchange_manager.exchange_config.traded_symbol_pairs:
if producer.get_symbol_trading_config(symbol) is not None:
pair_bases.add(symbol_util.parse_symbol(symbol).base)
for order in self.exchange_manager.exchange_personal_data.orders_manager.get_open_orders(
symbol=symbol
):
if not (order.is_cancelled() or order.is_closed()):
await self.cancel_order(order)
cancelled_orders.append(order)
return cancelled_orders

async def _convert_assets_into_target(self, producer, pair_bases, common_quote, to_sell_assets, tickers) -> list:
to_sell_assets = to_sell_assets.union(pair_bases)
self.logger.info(f"Optimizing portfolio: selling {to_sell_assets} to buy {common_quote}")
# need portfolio available to be up-to-date with cancelled orders
orders = await trading_modes.convert_assets_to_target_asset(
self, list(to_sell_assets), common_quote, tickers
)
if orders:
await asyncio.gather(
*[
trading_personal_data.wait_for_order_fill(
order, producer.MISSING_MIRROR_ORDERS_MARKET_REBALANCE_TIMEOUT, True
) for order in orders
]
)
return orders

async def _buy_assets(self, producer, pair_bases, common_quote, converted_quote_amount_per_symbol, tickers) -> list:
created_orders = []
for base in pair_bases:
self.logger.info(
f"Optimizing portfolio: buying {base} with "
f"{float(converted_quote_amount_per_symbol)} {common_quote}"
)
try:
created_orders += await trading_modes.convert_asset_to_target_asset(
self, common_quote, base, tickers, asset_amount=converted_quote_amount_per_symbol
)
except Exception as err:
self.logger.exception(err, True, f"Error when creating order to buy {base}: {err}")
if created_orders:
await asyncio.gather(
*[
trading_personal_data.wait_for_order_fill(
order, producer.MISSING_MIRROR_ORDERS_MARKET_REBALANCE_TIMEOUT, True
) for order in created_orders
]
)
return created_orders

def _get_converted_quote_amount_per_symbol(self, portfolio, pair_bases, common_quote) -> decimal.Decimal:
trading_pairs_count = len(pair_bases)
# need portfolio available to be up-to-date with balancing orders
kept_quote_amount = portfolio.portfolio[common_quote].available / decimal.Decimal(2)
try:
return (
(portfolio.portfolio[common_quote].available - kept_quote_amount) /
decimal.Decimal(trading_pairs_count)
)
except (decimal.DivisionByZero, decimal.InvalidOperation):
# no pair_bases
return trading_constants.ZERO


class StaggeredOrdersTradingModeConsumer(trading_modes.AbstractTradingModeConsumer):
Expand Down

0 comments on commit af81c92

Please sign in to comment.