Source code for pyqqq.backtest.broker

import datetime as dtm
import os
from abc import ABC, abstractmethod
from dataclasses import asdict
from decimal import Decimal
from functools import lru_cache
from typing import List, Literal, Optional, Union

import numpy as np
import pandas as pd

from pyqqq.backtest.logger import Logger, get_logger
from pyqqq.backtest.positionprovider import BasePositionProvider
from pyqqq.backtest.wallclock import WallClock
from pyqqq.brokerage.kis.simple import KISSimpleDomesticStock
from pyqqq.brokerage.kis.simple_overseas import KISSimpleOverseasStock
from pyqqq.data.daily import get_ohlcv_by_codes_for_period as get_kr_daily_data
from pyqqq.data.domestic import get_ticker_info as get_kr_ticker_info
from pyqqq.data.minutes import get_all_day_data as get_kr_minute_data
from pyqqq.data.us_stocks import get_all_day_data as get_us_minute_data
from pyqqq.data.us_stocks import \
    get_ohlcv_by_codes_for_period as get_us_daily_data
from pyqqq.data.us_stocks import get_ticker_info as get_us_ticker_info
from pyqqq.datatypes import *
from pyqqq.utils.casting import casting
from pyqqq.utils.compute import quantize_krx_price
from pyqqq.utils.market_schedule import (get_last_trading_day,
                                         get_market_schedule,
                                         get_trading_day_with_offset)

MarketType = Literal["kr_stock", "us_stock"]


[docs] class BaseBroker(ABC): """브로커 인터페이스를 구현하기 위한 기본 클래스입니다. 이 클래스는 모든 브로커 구현체가 따라야 하는 표준 인터페이스를 정의합니다. 계좌 관리, 시장 데이터 조회, 주문 실행을 위한 메서드들을 제공합니다. 구체적인 구현체는 모든 메서드를 오버라이드해야 합니다. Attributes: 없음 Note: 이 클래스는 추상 기본 클래스이므로 직접 인스턴스화해서는 안 됩니다. 모든 메서드는 NotImplementedError를 발생시키며 구체적인 하위 클래스에서 구현되어야 합니다. """
[docs] @abstractmethod def get_account(self) -> dict: """현재 계좌 정보를 조회합니다. Returns: dict: 다음 정보를 포함하는 계좌 정보: - total_balance (int|Decimal): 총 평가 금액 - purchase_amount (int|Decimal): 매입 금액 - evaluated_amount (int|Decimal): 평가 금액 - pnl_amount (int|Decimal): 손익 금액 - pnl_rate (Decimal): 손익률 Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. """ raise NotImplementedError
[docs] @abstractmethod def get_price(self, code: str) -> Decimal: """특정 종목의 현재 가격을 조회합니다. Args: code (str): 가격을 조회할 종목 코드 Returns: Decimal: 해당 종목의 현재 가격 Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. """ raise NotImplementedError
[docs] @abstractmethod def get_minute_price(self, code: str) -> pd.DataFrame: """특정 종목의 분단위 가격 데이터를 조회합니다. Args: code (str): 가격 데이터를 조회할 종목 코드 Returns: pd.DataFrame: 다음 열을 포함하는 분단위 가격 데이터: - time (datetime): 시간 - open (Decimal): 시가 - high (Decimal): 고가 - low (Decimal): 저가 - close (Decimal): 종가 - volume (int): 거래량 Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. """ raise NotImplementedError
[docs] @abstractmethod def get_daily_price(self, code: str, from_date: dtm.date, to_date: dtm.date) -> pd.DataFrame: """특정 기간 동안의 일별 가격 데이터를 조회합니다. Args: code (str): 가격 데이터를 조회할 종목 코드 from_date (datetime.date): 조회 시작일 (포함) to_date (datetime.date): 조회 종료일 (포함) Returns: pd.DataFrame: 다음 열을 포함하는 일별 가격 데이터: - date (datetime): 거래일 - open (Decimal): 시가 - high (Decimal): 고가 - low (Decimal): 저가 - close (Decimal): 종가 - volume (int): 거래량 Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. """ raise NotImplementedError
[docs] @abstractmethod def get_pending_orders(self) -> List[StockOrder]: """미체결된 모든 주문을 조회합니다. Returns: List[StockOrder]: 미체결 주문 객체 리스트 Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. """ raise NotImplementedError
[docs] @abstractmethod def get_positions(self) -> List[StockPosition]: """모든 종목의 현재 포지션을 조회합니다. Returns: List[StockPosition]: 현재 보유 중인 포지션을 나타내는 객체 리스트 Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. """ raise NotImplementedError
[docs] @abstractmethod def create_order(self, asset_code: str, side: OrderSide, quantity: int, order_type: OrderType = OrderType.MARKET, price: int | Decimal = 0) -> str: """새로운 주문을 생성합니다. Args: asset_code (str): 주문할 종목 코드 side (OrderSide): 매수/매도 구분 quantity (int): 주문 수량 order_type (OrderType, optional): 주문 타입 (기본값: MARKET) price (int, optional): 지정가 주문 시 주문 가격 (기본값: 0) Returns: str: 생성된 주문 번호 Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. ValueError: 잘못된 주문 정보가 제공된 경우 """ raise NotImplementedError
[docs] @abstractmethod def update_order(self, org_order_no: str, order_type: OrderType, price: int | Decimal, quantity: int = 0) -> str: """기존 주문을 수정합니다. Args: org_order_no (str): 원래 주문 번호 order_type (OrderType): 변경할 주문 타입 price (int): 변경할 주문 가격 quantity (int, optional): 변경할 주문 수량 (기본값: 0, 전량) Returns: str: 새로 생성된 주문 번호 Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. ValueError: 잘못된 주문 정보가 제공된 경우 Exception: 원래 주문을 찾을 수 없는 경우 """ raise NotImplementedError
[docs] @abstractmethod def cancel_order(self, order_no: str, quantity: int = 0) -> str: """주문을 취소합니다. Args: order_no (str): 취소할 주문 번호 quantity (int, optional): 취소할 수량 (기본값: 0, 전량 취소) Raises: NotImplementedError: 이 메서드는 구체적인 하위 클래스에서 구현되어야 합니다. Exception: 취소할 주문을 찾을 수 없는 경우 """ raise NotImplementedError
# 모의 계좌나 실제 계좌로 트레이딩 할때 사용되는 브로커
[docs] class TradingBroker(BaseBroker): """실제 거래를 수행하는 브로커 클래스입니다. 이 클래스는 실제 계좌나 모의 계좌를 통해 주식 거래를 수행합니다. 데이터 조회용 API와 거래용 API를 분리하여 관리하며, KIS (한국투자증권)의 국내주식과 해외주식 API를 지원합니다. Note: - data_api와 trading_api는 서로 다른 인스턴스일 수 있으며, 이를 통해 시세 조회와 거래를 분리하여 처리할 수 있습니다. - KIS의 국내주식과 해외주식 API를 모두 지원하므로, 필요에 따라 적절한 API를 선택하여 사용할 수 있습니다. - 모든 거래 관련 작업은 로그로 기록되며, logging 레벨은 DEBUG로 설정됩니다. Example: .. highlight:: python .. code-block:: python # 국내주식 거래의 경우 data_api = KISSimpleDomesticStock(...) trading_api = KISSimpleDomesticStock(...) broker = TradingBroker(data_api, trading_api) # 해외주식 거래의 경우 data_api = KISSimpleOverseasStock(...) trading_api = KISSimpleOverseasStock(...) broker = TradingBroker(data_api, trading_api) """ data_api: Union[KISSimpleDomesticStock, KISSimpleOverseasStock] """ 시세 조회 및 시장 데이터 조회를 위한 API 인터페이스 """ trading_api: Union[KISSimpleDomesticStock, KISSimpleOverseasStock] """ 실제 주문 및 거래 실행을 위한 API 인터페이스 """ logger: Logger """ 로깅을 위한 logger 인스턴스 """
[docs] def __init__( self, data_api: Union[KISSimpleDomesticStock, KISSimpleOverseasStock], trading_api: Union[KISSimpleDomesticStock, KISSimpleOverseasStock], clock: WallClock, ): """TradingBroker 클래스의 초기화 메서드입니다. Args: data_api (Union[KISSimpleDomesticStock, KISSimpleOverseasStock]): 시세 조회 및 시장 데이터 조회를 위한 API 인터페이스 trading_api (Union[KISSimpleDomesticStock, KISSimpleOverseasStock]): 실제 주문 및 거래 실행을 위한 API 인터페이스 Note: 생성된 인스턴스는 즉시 거래가 가능한 상태가 되며, 모든 거래 관련 작업은 자동으로 로깅됩니다. """ self.logger = get_logger("TradingBroker", clock) self.data_api = data_api self.trading_api = trading_api
[docs] def get_account(self) -> dict: return self.trading_api.get_account()
[docs] def get_price(self, code: str) -> Decimal: price_data = self.data_api.get_price(code) result = price_data.get('current_price') if result is None: raise ValueError(f"Current price not found: {code}") return Decimal(str(result))
[docs] def get_minute_price(self, code) -> pd.DataFrame: return self.data_api.get_today_minute_data(code)
[docs] def get_daily_price(self, code: str, from_date: dtm.date, to_date: dtm.date) -> pd.DataFrame: return self.data_api.get_historical_daily_data(code, from_date, to_date, adjusted=True)
[docs] def get_orderbook(self): return self.data_api.get_orderbook()
[docs] def get_pending_orders(self): return self.trading_api.get_pending_orders()
def _get_pending_order(self, order_no: str) -> StockOrder | None: orders = self.get_pending_orders() return next((o for o in orders if o["order_no"] == order_no), None)
[docs] def get_positions(self): return self.trading_api.get_positions()
[docs] def create_order(self, asset_code: str, side: OrderSide, quantity: int, order_type: OrderType = OrderType.MARKET, price: int | Decimal = 0): self.logger.debug(f"create_order: {asset_code} {side} {quantity} {order_type} {price}") return self.trading_api.create_order(asset_code, side, quantity, order_type, price)
[docs] def update_order(self, org_order_no: str, order_type: OrderType, price: int | Decimal, quantity: int = 0): if isinstance(self.trading_api, KISSimpleOverseasStock): order = self._get_pending_order(org_order_no) if order is not None: self.logger.debug(f"update_order: ({order.asset_code}) {org_order_no} {order_type} {price} {quantity}") return self.trading_api.update_order(order.asset_code, org_order_no, order_type, price, quantity) else: raise ValueError(f"order not found: {org_order_no}") else: self.logger.debug(f"update_order: {org_order_no} {order_type} {price} {quantity}") return self.trading_api.update_order(org_order_no, order_type, price, quantity)
[docs] def cancel_order(self, order_no: str, quantity: int = 0): if isinstance(self.trading_api, KISSimpleOverseasStock): order = self._get_pending_order(order_no) if order is not None: self.logger.debug(f"cancel_order: ({order.asset_code}) {order_no} {quantity}") return self.trading_api.cancel_order(order.asset_code, order_no, quantity) else: raise ValueError(f"order not found: {order_no}") else: self.logger.debug(f"cancel_order: {order_no} {quantity}") return self.trading_api.cancel_order(order_no, quantity)
# 백테스팅 할때 사용되는 브로커
[docs] class MockBroker(BaseBroker): """백테스팅을 위한 가상의 브로커 클래스입니다. 이 클래스는 과거 데이터를 기반으로 모의 거래를 수행하고 그 결과를 분석할 수 있게 해줍니다. 가상의 시계를 통해 시간을 제어하며, 실제 시장과 유사한 조건에서의 거래를 시뮬레이션합니다. 특징: - 한국/미국 주식 시장 지원 - 시장가/지정가/조건부지정가 주문 지원 - 실제 거래와 동일한 수수료 체계 적용 - 상세한 거래 이력 관리 및 분석 기능 - 포지션 추적 및 손익 계산 - 디버깅을 위한 상세 로깅 Example: >>> # 백테스팅 환경 설정 >>> clock = WallClock(start_date=date(2023, 1, 1)) >>> position_provider = MyPositionProvider() >>> broker = MockBroker(clock, position_provider, market="kr_stock") >>> >>> # 초기 자본금 설정 >>> broker.set_initial_cash(100_000_000) >>> >>> # 거래 실행 >>> broker.create_order("005930", OrderSide.BUY, 100, OrderType.LIMIT, 70000) """ DBG_POSTFIX = "" """ 디버그 파일 접미사 """ DEBUG_FILE_PATH = "./debug" """ 디버그 파일 저장 경로 """ logger: Logger """ 로깅을 위한 logger 인스턴스 """ clock: WallClock """ 백테스팅 시간 제어를 위한 가상 시계 """ cash: int """ 현재 보유 현금 """ positions: List[StockPosition] """ 현재 보유 중인 포지션 목록 """ pending_orders: List[StockOrder] """ 미체결 주문 목록 """ trading_history: List[TradingHistory] """ 거래 이력 """ position_provider: BasePositionProvider """ 초기 포지션 정보 제공자 """ time_unit: str """ 거래 시간 단위 ("minutes" 또는 "days") """ market: MarketType """ 거래 시장 구분 ("kr_stock" 또는 "us_stock") """ current_data_handling: Literal["include", "virtual", "exclude"] """ 현재 시점의 데이터(e.g. 분봉) 처리 방법 """
[docs] def __init__(self, clock: WallClock, position_provider: BasePositionProvider, market: MarketType = "kr_stock", time_unit="minutes", current_data_handling: Literal["include", "virtual", "exclude"] = "virtual"): """MockBroker 클래스의 초기화 메서드입니다. Args: clock (WallClock): 백테스팅 시간을 제어할 가상 시계 객체 position_provider: 초기 포지션 정보를 제공할 객체 market (MarketType): 거래 시장 구분 ("kr_stock" 또는 "us_stock") time_unit (str, optional): 거래 시간 단위. Defaults to "minutes". "minutes" 또는 "days" 사용 가능 current_data_handling (Literal["include", "virtual", "exclude"], optional): 현재 시점의 데이터(e.g. 분봉) 처리 방법. Defaults to "virtual". - "include": 현재(분)까지의 실제 데이터 반환 - "virtual": 직전(1분전)까지의 데이터 + 현재(분)의 시가로 통일한 가상 데이터 반환 (기본값) - "exclude": 직전(1분전)까지의 데이터만 반환 """ self.logger = get_logger("MockBroker", clock) self.clock = clock self.clock.on_time_change = self.on_time_change self.market = market self.next_order_no = 1000 self.cash = 1_000_000_000 self.pending_orders: List[StockOrder] = [] self.trading_history: List[TradingHistory] = [] self.position_provider = position_provider self.positions: List[StockPosition] = position_provider.get_positions(self.clock.today()) if position_provider else [] self.time_unit = time_unit self.current_data_handling = current_data_handling
[docs] def set_initial_cash(self, cash): """초기 자본금을 설정합니다. Args: cash (int): 설정할 초기 자본금 """ self.cash = cash
def increase_cash(self, amount): self.cash += casting(self.cash, amount) def decrease_cash(self, amount): self.cash -= casting(self.cash, amount) def get_account(self) -> dict: """ 계좌 요약 정보를 조회하여 총 잔고, 투자 가능 현금, 매입 금액 및 손익 정보를 반환합니다. """ positions = self.get_positions() purchase_amount = sum([p.average_purchase_price * p.quantity for p in positions]) evaluated_amount = sum([p.current_value for p in positions]) pnl_amount = evaluated_amount - purchase_amount pnl_rate = pnl_amount / purchase_amount * 100 if purchase_amount != 0 else 0 pending_orders = self.get_pending_orders() pending_amount = 0 for order in pending_orders: if order.side == OrderSide.BUY: pending_amount += order.price * order.pending_quantity total_balance = self.cash + casting(self.cash, evaluated_amount) + casting(self.cash, pending_amount) return { "total_balance": total_balance, "investable_cash": self.cash, "purchase_amount": purchase_amount, "evaluated_amount": evaluated_amount, "pnl_amount": pnl_amount, "pnl_rate": pnl_rate, } def _get_exchange_code(self) -> str: """시장 일정을 조회할 때 사용하는 시장 코드를 반환합니다""" if self.market == "us_stock": return "NYSE" else: return "KRX"
[docs] def get_price(self, code: str) -> Decimal: now = self.clock.now() today = now.date() def typed_result(value) -> Decimal: return Decimal(value).quantize(Decimal("0.0000")) if self.market == "us_stock" else Decimal(int(value)) schedule = get_market_schedule(today, exchange=self._get_exchange_code()) if now.time() < schedule.open_time: last_trading_day = get_last_trading_day(today, exchange=self._get_exchange_code()) df = self.get_daily_price(code, last_trading_day, last_trading_day) # 전일 종가는 일봉, 분봉 구분이 필요 없다. return typed_result(df["close"].iloc[-1]) else: if self.time_unit == "minutes": df = self.get_minute_price(code) if not df.empty: return typed_result(df["close"].iloc[-1]) else: # 현재 시간 데이터가 없을 경우 전일 종가로 대체 last_trading_day = get_last_trading_day(today, exchange=self._get_exchange_code()) df = self.get_daily_price(code, last_trading_day, last_trading_day) # 전일 종가는 일봉, 분봉 구분이 필요 없다. return typed_result(df["close"].iloc[-1]) elif self.time_unit == "days": df = self.get_daily_price(code, today, today) return typed_result(df["close"].iloc[-1]) else: raise ValueError(f"Invalid time unit: {self.time_unit}")
[docs] def get_minute_price(self, code: str) -> pd.DataFrame: """ self.clock 기준 오늘 정규장 시작부터 현재 시각 (이전) 까지의 분봉 데이터를 조회합니다. Args: code (str): 종목 코드 Returns: pd.DataFrame: 분봉 데이터 """ today = self.clock.today() df = self._get_minute_price_with_cache(today, code) if df.empty: return df # 정규장 정보로만 거르기 market_schedule = get_market_schedule(today, "NYSE" if self.market == "us_stock" else "KRX") open_time = dtm.datetime.combine(today, market_schedule.open_time) close_time = dtm.datetime.combine(today, market_schedule.close_time) if df.index.tz is not None: df.index = df.index.tz_localize(None) df = df[(df.index >= open_time) & (df.index <= close_time)] # 미래 정보 없애기 now = pd.Timestamp(self.clock.now()) include_df = df[df.index <= now].copy() if include_df.empty: return include_df if self.current_data_handling == "include": return include_df else: min_ago = min(now, close_time) - pd.Timedelta(minutes=1) exclude_df = df[df.index <= min_ago].copy() last_minute_df = include_df.iloc[-1] last_minute_df = last_minute_df.to_frame().T if self.current_data_handling == "exclude": if now <= close_time: return exclude_df else: return pd.concat([exclude_df, last_minute_df]) elif self.current_data_handling == "virtual": # 정규장 종료 시각 이전은 N분 0초 기준으로 해당 분의 시가로 통일함 # 다만, 정규장 종료 시각 이후는 동시호가 시장가 종가로 체결되므로 원 데이터를 그대로 사용함 if now < close_time: current_price = last_minute_df["open"].iloc[0] virtual_data = { "open": current_price, "high": current_price, "low": current_price, "close": current_price, "volume": 1, "value": 1, "cum_volume": exclude_df["cum_volume"].iloc[-1] if not exclude_df.empty else 1, "cum_value": exclude_df["cum_value"].iloc[-1] if not exclude_df.empty else 1, } last_minute_df = pd.DataFrame([virtual_data], index=[last_minute_df.index[0]]) return pd.concat([exclude_df, last_minute_df]) else: raise ValueError(f"Invalid current_data_handling: {self.current_data_handling}")
@lru_cache(maxsize=40) def _get_minute_price_with_cache(self, date, code) -> pd.DataFrame: if self.market == "us_stock": dfs = get_us_minute_data(date, [code]) else: dfs = get_kr_minute_data(date, [code], dtm.timedelta(minutes=1), source="kis", adjusted=True) df = dfs[code] if df.empty: return df return df
[docs] def get_daily_price(self, code: str, from_date: Optional[dtm.date] = None, end_date: Optional[dtm.date] = None): today = self.clock.today() def _exclude_today(df: pd.DataFrame) -> pd.DataFrame: if self.market == "kr_stock": return df[df.index < pd.Timestamp(today)] elif self.market == "us_stock": return df[df.index < pd.Timestamp(today).date()] else: raise ValueError(f"Invalid market: {self.market}") if from_date is None: from_date = get_trading_day_with_offset(today, -10, exchange=self._get_exchange_code()) if end_date is None: end_date = today if self.market == "us_stock": dfs = get_us_daily_data([code], from_date, end_date, adjusted=True, ascending=True) else: dfs = get_kr_daily_data([code], from_date, end_date, adjusted=True, ascending=True) df = dfs[code] if not df.empty and end_date == today: # 마지막 데이터가 오늘인 경우 장중 여부와 설정에따라 구분되어 처리될 필요가 있다. now = self.clock.now() schedule = get_market_schedule(today, exchange=self._get_exchange_code()) if schedule.full_day_closed: pass # end_date 가 폐장일이면 해당 날짜의 데이터는 이미 없음 else: if self.current_data_handling == "include": pass elif self.current_data_handling == "virtual": if now.time() < schedule.open_time: # 장 오픈 전은 전일 데이터 까지만 반환 df = _exclude_today(df) elif now.time() <= schedule.close_time: # 장중에는 분봉을 적절히 활용 minute_df = self.get_minute_price(code) if minute_df.empty: df = _exclude_today(df) else: # 현재까지의 분봉 df 를 이용해 오늘의 ohlcv 를 생성 today_index = pd.Timestamp(today) if self.market == "kr_stock" else pd.Timestamp(today).date() today_ohlcv = pd.Series({ 'open': minute_df.iloc[0]['open'], 'high': minute_df['high'].max(), 'low': minute_df['low'].min(), 'close': minute_df.iloc[-1]['close'], 'volume': minute_df.iloc[-1]['cum_volume'], 'value': minute_df.iloc[-1]['cum_value'], }, name=today_index) # 오늘 데이터를 새로운 OHLCV로 교체 df.loc[today_index] = today_ohlcv else: # 정규장 마감 이후는 조회된 일봉 그대로 반환 pass elif self.current_data_handling == "exclude": df = _exclude_today(df) return df
[docs] def get_pending_orders(self): return self.pending_orders
[docs] def get_positions(self): for p in self.positions: price = self.get_price(p.asset_code) p.current_price = price p.current_value = price * p.quantity p.current_pnl_value = int((price - p.average_purchase_price) * p.quantity) p.current_pnl = Decimal(0) if p.quantity == 0 else Decimal((p.current_pnl_value / (p.average_purchase_price * p.quantity)) * 100).quantize(Decimal("0.00")) positions = [p for p in self.positions if p.quantity > 0] return positions
[docs] def create_order(self, asset_code: str, side: OrderSide, quantity: int, order_type: OrderType = OrderType.MARKET, price: int = 0): price = self.get_price(asset_code) if order_type == OrderType.MARKET else price self.logger.info(f"CREATE ORDER: {self._get_asset_name(asset_code)} side:{side} price:{price} quantity:{quantity} order_type:{order_type}") order_no = str(self.next_order_no) self.next_order_no += 1 if side is OrderSide.BUY: total_amount = int(price * quantity) if self.cash < total_amount: raise ValueError("not enough cash") else: self.decrease_cash(total_amount) if isinstance(price, np.int64): raise ValueError(f"price must be int, not np.int64: {price} {type(price)}") order = StockOrder( order_no=order_no, asset_code=asset_code, side=side, price=price, quantity=quantity, filled_quantity=0, pending_quantity=quantity, order_type=order_type, order_time=self.clock.now(), ) self.pending_orders.append(order) self.logger.info(f"ORDER CREATED: {order}\n{self.pending_orders}") return order_no
[docs] def update_order(self, org_order_no: str, order_type: OrderType, price: int, quantity: int = 0): order = next((order for order in self.pending_orders if order.order_no == org_order_no), None) if order is None: raise Exception(f"order not found: {org_order_no}") if quantity > order.pending_quantity: raise ValueError("quantity must be less than pending quantity") if quantity == 0: quantity = order.pending_quantity if order.side == OrderSide.BUY: price_diff = price - order.price diff_value = int(price_diff * quantity) if self.cash < diff_value: raise ValueError("not enough cash") else: self.decrease_cash(diff_value) new_order_no = str(self.next_order_no) self.next_order_no += 1 new_order = StockOrder( org_order_no=order.order_no, order_no=new_order_no, asset_code=order.asset_code, side=order.side, price=price, quantity=quantity, filled_quantity=0, pending_quantity=quantity, order_type=order_type, order_time=self.clock.now(), ) order.pending_quantity -= quantity self.pending_orders = [o for o in self.pending_orders if o.pending_quantity > 0] self.pending_orders.append(new_order)
[docs] def cancel_order(self, order_no: str, quantity: int = 0): order = next((order for order in self.pending_orders if order.order_no == order_no), None) if order is None: raise Exception(f"order not found: {order_no}") if quantity == 0: self.pending_orders = [o for o in self.pending_orders if o.order_no != order_no] else: order.pending_quantity -= quantity if order.pending_quantity == 0: self.pending_orders = [o for o in self.pending_orders if o.order_no != order_no] if order.side == OrderSide.BUY: if quantity == 0: quantity = order.pending_quantity self.increase_cash(int(order.price * quantity * 1.00015))
def _cancel_order(self, order: StockOrder): """ on_time_change에서 데이터 누락 등의 오류 상황일 때 미리 뺀 cash를 원복한다. """ if order.side == OrderSide.BUY: self.increase_cash(int(order.price * order.pending_quantity)) self.pending_orders = [o for o in self.pending_orders if o.order_no != order.order_no] def on_time_change(self, current_time, before_time): today = current_time.date() market_schedule = get_market_schedule(today) preclose_auction_start_time = dtm.datetime.combine(today, market_schedule.close_time) - dtm.timedelta(minutes=10) close_time = dtm.datetime.combine(today, market_schedule.close_time) just_closed = current_time.replace(second=0, microsecond=0) == close_time.replace(second=0, microsecond=0) self.logger.info(f"just_closed: {just_closed} {self.pending_orders}") if market_schedule.full_day_closed and not just_closed: self.logger.warning(f"on_time_change: {before_time} ~ {current_time} market is closed") return for order in list(self.pending_orders): if self.time_unit == "minutes": if preclose_auction_start_time <= current_time < close_time: self.logger.info(f"ORDER SKIPPED: {order.asset_code} preclose auction time") continue if current_time < close_time and current_time - before_time < dtm.timedelta(seconds=60): # 강제로 1분 차이를 만들어 줌 before_time = current_time - dtm.timedelta(seconds=60) df = self.get_minute_price(order.asset_code) if df.empty: self.logger.warning(f"ORDER CANCELED: {order.asset_code} minute data is empty") self._cancel_order(order) continue df = df[(df.index >= before_time) & (df.index <= current_time)].copy() if df.empty: self.logger.warning(f"ORDER CANCELED: {order.asset_code} minute range data is empty: {before_time} ~ {current_time}") self._cancel_order(order) continue elif self.time_unit == "days": from_date = before_time.date() to_date = current_time.date() df = self.get_daily_price(order.asset_code, from_date, to_date) if df.empty: self.logger.info(f"ORDER CANCELED: {order.asset_code} daily data is empty") self._cancel_order(order) continue else: self._cancel_order(order) raise ValueError(f"Invalid time unit: {self.time_unit}") if df["volume"].iloc[-1] == 0 and current_time != close_time: self.logger.info(f"ORDER CANCELED: {order.asset_code} volume is 0") self._cancel_order(order) continue if "cvolume" in df.columns and df["cvolume"].iloc[-1] == 0: self.logger.info(f"ORDER CANCELED: {order.asset_code} cvolume is 0") self._cancel_order(order) continue if "cum_volume" in df.columns and df["cum_volume"].iloc[-1] == 0: self.logger.info(f"ORDER CANCELED: {order.asset_code} cum_volume is 0") self._cancel_order(order) continue if order.order_type == OrderType.MARKET or (order.order_type == OrderType.LIMIT_CONDITIONAL and current_time >= close_time): df = df[(df.index >= before_time) & (df.index <= current_time)].copy() self.logger.debug(f"on_time_change {before_time} => {current_time} asset_code={order.asset_code} df:\n{df}") if df.empty: self.logger.info(f"ORDER CANCELED: {order.asset_code} no data") self._cancel_order(order) continue filled_price = int(df["close"].iloc[-1]) order.quantity = int(order.quantity) if order.quantity: self.logger.info(f"ORDER FILLED: {order.asset_code}, price:{filled_price}, quantity:{order.quantity}, side:{order.side}, current_time:{current_time}, before_time:{before_time}") if order.side == OrderSide.BUY: self._buy_position(order.order_no, order.asset_code, order.quantity, filled_price, current_time) else: self._sell_position(order.order_no, order.asset_code, order.quantity, filled_price, current_time) else: df = df[(df.index >= before_time) & (df.index <= current_time)].copy() self.logger.debug(f"on_time_change {before_time} => {current_time} asset_code={order.asset_code} df:\n{df}") if df.empty: self.logger.info(f"ORDER CANCELED: {order.asset_code} no data") self._cancel_order(order) continue high = int(df["high"].max()) low = int(df["low"].min()) close = int(df["close"].iloc[-1]) typical_price = quantize_krx_price((high + low + close) / 3, etf_etn=False) filled_price = None if order.side == OrderSide.BUY: if order.price > high: filled_price = typical_price elif order.price < low: # not fill self.logger.info(f"BUY ORDER NOT FILLED: {self._get_asset_name(order.asset_code)} price:{order.price} low:{low}") pass else: filled_price = order.price elif order.side == OrderSide.SELL: if order.price > high: # not fill self.logger.info(f"SELL ORDER NOT FILLED: {self._get_asset_name(order.asset_code)} price:{order.price} high:{high}") pass elif order.price < low: filled_price = typical_price else: filled_price = order.price order.quantity = int(order.quantity) if filled_price and order.quantity: filled_price = int(filled_price) if order.side == OrderSide.BUY: self._buy_position(order.order_no, order.asset_code, order.quantity, filled_price, current_time) elif order.side == OrderSide.SELL: self._sell_position(order.order_no, order.asset_code, order.quantity, filled_price, current_time) def _sell_position(self, order_no: str, asset_code: str, quantity: int, price: int, executed_time: dtm.datetime): self.logger.info(f"SELL POSITION: {asset_code} price:{price} quantity:{quantity}\n{self.positions}") for pos in self.positions: if pos.asset_code == asset_code: pos.quantity -= quantity pos.quantity = max(0, pos.quantity) pos.sell_possible_quantity -= quantity pos.sell_possible_quantity = max(0, pos.sell_possible_quantity) pos.current_value = pos.current_price * pos.quantity pos.current_pnl_value = (pos.current_price - pos.average_purchase_price) * pos.quantity pos.current_pnl = Decimal(0) if pos.quantity == 0 else Decimal(pos.current_pnl_value / (pos.average_purchase_price * pos.quantity) * 100).quantize(Decimal("0.00")) sell_value = price * quantity # 시장별 수수료 및 세금 계산 if self.market == "us_stock": tax = 0 # 미국 주식은 매도 시 거래세 없음 fee = sell_value * Decimal("0.00025") # 수수료 0.025% else: tax = sell_value * Decimal("0.0023") # 거래세 0.23% fee = sell_value * Decimal("0.00015") # 수수료 0.015% buy_value = pos.average_purchase_price * quantity buy_fee = buy_value * Decimal(0.00015) pnl = sell_value - buy_value - fee - tax - buy_fee pnl_rate = pnl / buy_value * 100 if buy_value != 0 else 0 self.add_trading_history( TradingHistory( date=self.clock.today().strftime("%Y%m%d"), order_no=order_no, side=OrderSide.SELL, asset_code=asset_code, quantity=quantity, filled_price=price, average_purchase_price=pos.average_purchase_price, tax=tax, fee=fee, pnl=pnl, pnl_rate=pnl_rate, executed_time=int(executed_time.timestamp() * 1000), ) ) self.increase_cash(sell_value - fee - tax) self.positions = [p for p in self.positions if p.quantity > 0] self.pending_orders = [o for o in self.pending_orders if o.order_no != order_no] def _buy_position(self, order_no: str, asset_code: str, quantity: int, price: int, executed_time: dtm.datetime): self.logger.info(f"BUY POSITION: {asset_code} price:{price} quantity:{quantity}\n{self.positions}") if quantity is None or quantity < 1: self.logger.error(f"BUY_POSITION: quantity must be greater than 0: {quantity}") return if price is None or price <= 0: self.logger.error(f"BUY_POSITION: price must be greater than 0: {price}") return found = False for pos in self.positions: if pos.asset_code == asset_code: pos.average_purchase_price = (pos.average_purchase_price * pos.quantity + price * quantity) / Decimal(str(pos.quantity + quantity)) pos.quantity += quantity pos.sell_possible_quantity += quantity pos.current_value = pos.current_price * pos.quantity pos.current_pnl_value = (pos.current_price - pos.average_purchase_price) * pos.quantity pos.current_pnl = Decimal(0) if pos.quantity == 0 else Decimal(pos.current_pnl_value / (pos.average_purchase_price * pos.quantity) * 100).quantize(Decimal("0.00")) found = True break if not found: pos = StockPosition( asset_code=asset_code, asset_name=self._get_asset_name(asset_code), quantity=quantity, sell_possible_quantity=quantity, average_purchase_price=Decimal(price), current_price=price, current_value=price * quantity, current_pnl=Decimal(0), current_pnl_value=0, ) self.positions.append(pos) order = next((order for order in self.pending_orders if order.order_no == order_no), None) order_buy_value = order.price * order.pending_quantity buy_value = price * quantity # 시장별 수수료 계산 if self.market == "us_stock": fee = buy_value * Decimal("0.00025") # 수수료 0.025% else: fee = buy_value * Decimal("0.00015") # 수수료 0.015% # 주문가와 체결가가 다를 수 있음 if order_buy_value != buy_value: diff_amount = order_buy_value - buy_value self.increase_cash(diff_amount) self.add_trading_history( TradingHistory( date=self.clock.today().strftime("%Y%m%d"), order_no=order_no, side=OrderSide.BUY, asset_code=asset_code, quantity=quantity, filled_price=price, average_purchase_price=Decimal(price), fee=fee, tax=0, pnl=0, pnl_rate=0, executed_time=int(executed_time.timestamp() * 1000), ) ) self.pending_orders = [o for o in self.pending_orders if o.order_no != order_no] def add_trading_history(self, history): self.trading_history.append(history)
[docs] def show_trading_history_report(self, make_file: bool = False, filter_side: OrderSide = None, sort: str = None): """거래 이력 보고서를 생성하고 표시합니다. Args: make_file (bool, optional): CSV 파일로 저장할지 여부. Defaults to False. filter_side (OrderSide, optional): 특정 거래 방향(매수/매도)으로 필터링. Defaults to None. sort (str, optional): 정렬 기준. Defaults to None. Returns: dict: 거래 분석 결과를 포함하는 딕셔너리 - count (int): 총 거래 건수 - total_pnl (float): 총 손익률 - avg_pnl (float): 평균 손익률 - buy_sum (int): 총 매수 금액 - earn_sum (int): 총 실현 손익 - roi (float): 투자수익률 (ROI) Note: CSV 파일은 DEBUG_FILE_PATH 경로에 저장되며, 파일명은 날짜와 시간을 포함합니다. """ empty_ret = { "count": 0, "total_pnl": 0, "avg_pnl": 0, "buy_sum": 0, "earn_sum": 0, "roi": 0, } if len(self.trading_history) == 0: return empty_ret dict_list = [] for trade in self.trading_history: d = asdict(trade) if filter_side and d["side"] != filter_side: continue d["side"] = "BUY" if d["side"] == OrderSide.BUY else "SELL" d["name"] = self._get_asset_name(d["asset_code"]) d["time"] = dtm.datetime.fromtimestamp(d["executed_time"] / 1000).strftime("%Y-%m-%d %H:%M:%S") d["buy_value"] = d["average_purchase_price"] * d["quantity"] d.pop("executed_time") d.pop("date") d.pop("partial") d["average_purchase_price"] = d["average_purchase_price"].quantize(Decimal("0.00")) d["buy_value"] = int(d["buy_value"]) if d["buy_value"] else 0 d["tax"] = int(d["tax"]) if d["tax"] else 0 d["fee"] = int(d["fee"]) if d["fee"] else 0 d["pnl"] = int(d["pnl"]) if d["pnl"] else 0 d["pnl_rate"] = d["pnl_rate"].quantize(Decimal("0.00")) if d["pnl_rate"] else Decimal("0.00") minute_data = self.get_minute_price(d["asset_code"]) if not minute_data.empty: d["max_price"] = minute_data["high"].max() d["max_time"] = minute_data["high"].idxmax() d["max_buy_rate"] = (Decimal(str(d["max_price"])) - d["average_purchase_price"]) / d["average_purchase_price"] * 100 d["max_buy_rate"] = d["max_buy_rate"].quantize(Decimal("0.00")) d["min_price"] = minute_data["low"].min() d["min_time"] = minute_data["low"].idxmin() dict_list.append(d) df = pd.DataFrame(dict_list) if df.empty: return empty_ret df.set_index("time", inplace=True) df = df.sort_values(by="time") if make_file: # BacktestEnvironment의 end_time이 휴장일인 경우 분봉 데이터가 없음 if not minute_data.empty: df = df.sort_values(by="max_buy_rate", ascending=False) filename = f"{self.DEBUG_FILE_PATH}/trading_history_{self.clock.today().strftime('%Y%m%d')}_{int(dtm.datetime.now().timestamp())}_{self.DBG_POSTFIX}.csv" try: df.to_csv(filename) except IOError: os.makedirs(self.DEBUG_FILE_PATH, exist_ok=True) df.to_csv(filename) pd.set_option("display.max_rows", None) if not sort: df = df.sort_index() else: df = df.sort_values(by=sort, ascending=False) print(df[["order_no", "side", "asset_code", "quantity", "average_purchase_price", "buy_value", "filled_price", "tax", "fee", "pnl", "pnl_rate", "name"]]) pd.reset_option("display.max_rows") df.rename( columns={ "order_no": "주문번호", "asset_code": "종목코드", "name": "종목명", "side": "매매구분", "quantity": "수량", "filled_price": "체결가", "average_purchase_price": "평단가", "buy_value": "매수금액", "tax": "세금", "fee": "수수료", "pnl": "손익", "pnl_rate": "손익률", }, inplace=True, ) df.index.name = "체결시간" # 컬럼 순서 변경 df = df[["주문번호", "종목코드", "종목명", "매매구분", "수량", "체결가", "평단가", "매수금액", "세금", "수수료", "손익", "손익률"]] count = len(df) total_pnl = df["손익률"].sum() avg_pnl = df["손익률"].mean() buy_sum = df["매수금액"].sum() earn_sum = df["손익"].sum() roi_pnl = earn_sum / buy_sum * 100 if buy_sum != 0 else 0 print(f"Total {count} trades, Total PnL: {total_pnl:.2f}%, Avg PnL: {avg_pnl:.2f}% ROI: {roi_pnl:>7.2f}%") return { "count": count, "total_pnl": total_pnl, "avg_pnl": avg_pnl, "buy_sum": buy_sum, "earn_sum": earn_sum, "roi": roi_pnl / 100, }
def show_pnl_timeline(self): """시간별 손익 현황을 표시합니다. 각 포지션의 시간별 가치 변화와 전체 포트폴리오의 손익 변화를 분석하여 표시합니다. 최대/최소 손익 시점과 금액도 함께 표시됩니다. """ df = pd.DataFrame() for pos in self.position_provider.get_positions(self.clock.today()): print("Fetch minute price for", pos.asset_code) minute_price_df = self._get_minute_price_with_cache(self.clock.today(), pos.asset_code) minute_price_df.set_index("time", inplace=True) minute_price_df = minute_price_df.loc[: self.clock.now().replace(hour=15, minute=20, second=0, microsecond=0)].copy() df2 = pd.DataFrame() df2[f"{pos.asset_code}_current_price"] = minute_price_df["close"] df2[f"{pos.asset_code}_purchase_price"] = pos.average_purchase_price df2[f"{pos.asset_code}_quantity"] = pos.quantity df2[f"{pos.asset_code}_current_value"] = df2[f"{pos.asset_code}_current_price"] * pos.quantity df2[f"{pos.asset_code}_purchase_value"] = int(pos.average_purchase_price * pos.quantity) df2[f"{pos.asset_code}_pnl_value"] = df2[f"{pos.asset_code}_current_value"] - df2[f"{pos.asset_code}_purchase_value"] df = pd.concat([df, df2], axis=1) df["total_current_value"] = df.filter(like="_current_value").sum(axis=1) df["total_purchase_value"] = df.filter(like="_purchase_value").sum(axis=1) df["total_pnl_value"] = df.filter(like="_pnl_value").sum(axis=1) df["total_pnl_rate"] = df["total_pnl_value"] / df["total_purchase_value"] * 100 df = df[["total_current_value", "total_purchase_value", "total_pnl_value", "total_pnl_rate"]].copy() if len(df) > 0: pd.set_option("display.max_rows", None) print(df) pd.reset_option("display.max_rows") print(f"- Max Pnl Time: {df['total_pnl_value'].idxmax()} {df['total_pnl_value'].max()} ({df['total_pnl_rate'].max()}%)") print(f"- Min Pnl Time: {df['total_pnl_value'].idxmin()} {df['total_pnl_value'].min()} ({df['total_pnl_rate'].min()}%)")
[docs] def show_positions(self, sort: str = None): """현재 보유 중인 포지션 현황을 표시합니다. Args: sort (str, optional): 정렬 기준. Defaults to None. Returns: dict: 포지션 분석 결과를 포함하는 딕셔너리 - buy_value (int): 총 매수 금액 - pnl_value (int): 총 평가 손익 - pnl (float): 총 수익률 - current_value (int): 현재 평가 금액 """ positions = [p for p in self.positions if p.quantity > 0] data = [] buy_value = 0 pnl_value = 0 current_value = 0 pnl = 0 for pos in positions: d = asdict(pos) d["average_purchase_price"] = round(float(pos.average_purchase_price), 2) if pos.average_purchase_price else 0 d["pnl_rate"] = round(float(pos.current_pnl), 2) if pos.current_pnl else 0 d["current_pnl_value"] = int(pos.current_pnl_value) data.append(d) buy_value += d["average_purchase_price"] * d["quantity"] pnl_value += d["current_pnl_value"] current_value += d["current_value"] df = pd.DataFrame(data) if len(df) > 0: pnl = pnl_value / buy_value * 100 if buy_value != 0 else 0 if not sort: df = df.sort_index() else: df = df.sort_values(by=sort, ascending=False) print(df[["asset_code", "quantity", "sell_possible_quantity", "average_purchase_price", "current_price", "current_value", "current_pnl", "current_pnl_value", "pnl_rate", "asset_name"]]) print(f"Total Buy Value: {buy_value} Total PnL: {pnl_value} ({pnl}%)") return {"buy_value": buy_value, "pnl_value": pnl_value, "pnl": pnl, "current_value": current_value}
@lru_cache def _get_asset_name(self, code: str): if self.market == "us_stock": df = get_us_ticker_info(code) else: df = get_kr_ticker_info(code) return df["name"].iloc[-1]
if __name__ == "__main__": from pyqqq.backtest.positionprovider import ManualPositionProvider clock = WallClock(live_mode=False, start_time=dtm.datetime(2025, 2, 21, 8, 0, 0), end_time=dtm.datetime(2025, 2, 22, 17, 20, 0)) position_provider = ManualPositionProvider([ # StockPosition(asset_code="005930", asset_name="삼성전자", quantity=100, sell_possible_quantity=100, average_purchase_price=70000, current_price=70000, current_value=7000000, current_pnl=0, current_pnl_value=0) ]) broker = MockBroker(clock, position_provider, market="kr_stock", current_data_handling="virtual") # broker.create_order("005930", OrderSide.BUY, 100, OrderType.LIMIT, 70000) # broker.show_trading_history_report(make_file=True) # print(broker.get_minute_price("005930")) # print(broker.get_daily_price("005930", dtm.date(2025, 2, 13), dtm.date(2025, 2, 21))) os.environ["TZ"] = "America/New_York" broker2 = MockBroker(clock, position_provider, market="us_stock", current_data_handling="virtual") # print(broker2.get_minute_price("AAPL")) # print(broker2.get_daily_price("AAPL", dtm.date(2025, 2, 13), dtm.date(2025, 2, 21)))