# 고급 Hook 활용하기 ## `app.yaml` app.yaml 에는 전략의 실행 환경 관련 설정을 나열합니다. ```yaml executor: hook # hook 또는 simple (기본값: simple) ``` - executor: 전략 실행 방식을 지정합니다. Hook 기반의 전략의 경우 값을 `hook` 으로 지정합니다. ## `.env` 설정 `.env` 파일에는 증권사 API 관련 설정을 나열합니다. ### 증권사 API Key 설정 **한국투자증권** ``` KIS_APP_KEY=<한투 Open API App Key> KIS_APP_SECRET=<한투 Open API App Secret> KIS_CANO=<한투 계좌번호> KIS_ACNT_PRDT_CD=<한투 계좌 상품번호> ``` **LS(구 이베스트투자)증권** ``` EBEST_APP_KEY=<이베스트 Open API App Key> EBEST_APP_SECRET=<이베스트 Open API App Secret> ``` ## 초기화 전략 실행 관련 세부 설정을 위한 Hook 입니다. (Optional) 이 Hook을 직접 구현하지 않을 경우 아래 주석에 적혀있는 기본값이 적용됩니다. ```python def on_initialize() -> dict: return { # 모의투자 여부 (기본값: False) 'paper_trading': False, # 전략 실행 주기 (기본값: 60초) 'interval': dtm.timedelta(seconds=60), # (정규장)개장시간과 거래시간 사이의 여유시간 (기본값: 5분) 'open_market_time_margin': dtm.timedelta(minutes=5), # (정규장)폐장시간과 거래시간 사이의 여유시간 (기본값: 5분) 'market_close_time_margin': dtm.timedelta(minutes=5), # 장시작 전 호출되는 'on_before_market_open'과 개장 시간과의 간격 (기본값: 1시간) 'before_market_open_schedule': dtm.timedelta(hours=1), # 장종료 후 호출되는 'on_after_market_close'과 폐장 시간과의 간격 (기본값: 1시간) 'after_market_close_schedule': dtm.timedelta(hours=1), # 개장 시간 강제 설정 (기본값: None, 정규장 시작 시간 이후로만 지정 가능) 'market_open_time': dtm.time(9, 0), # 폐장 시간 강제 설정 (기본값: None, 정규장 종료 시간 이전으로만 지정 가능) 'market_close_time': dtm.time(15, 20), # 익절 조건 설정 (기본값: None) 'take_profit_config': { '005930': 2, # 삼성전자 수익률이 2%에 도달하면 익절 }, # 손절 조건 설정 (기본값: None) 'stop_loss_config': { '005930': -2, # 삼성전자 수익률이 -2%에 도달하면 손절 }, } ``` ## 거래 지시 ```python # 거래 시간 시작 시 1회 호출되는 함수 def on_market_open(account_info, pending_orders, positions, broker_api) -> List[Tuple] ... # 거래 시간 종료 시 1회 호출되는 함수 def on_market_close(account_info, pending_orders, positions, broker_api) -> List[Tuple] ... # 장 중에 설정된 주기마다 호출되는 함수 def trade_func(account_info, pending_orders, positions, broker_api) -> List[Tuple]: return [ ('005930', 81700, 10 ), # 삼성전자 10주를 81700에 지정가 매수, ('035420', 0, -5 ), # NAVER 5주를 시장가에 매도 ] ``` - 전략을 실행하고 거래할 내역을 반환합니다. (3가지 함수 모두 호출되는 시점만 다를 뿐 같은 역할을 합니다) - 가격을 0으로 지정하면 시장가로, 0보다 큰 값으로 지정하면 지정가로 주문을 넣습니다. ### Parameters **account_info (dict)** 계좌정보를 포함하고 있는 딕셔너리 객체입니다. ``` { 'total_balance': 999289, # 계좌 평가 금액 (현금+주식) 'purchase_amount': 912690, # 주식 매입 금액 'evaluated_amount': 913360, # 주식 평가 금액 'pnl_amount': 670, # 수익 'pnl_rate': Decimal('0.07340937229508376566169958188') # 수익률(%) } ``` 위 값을 얻는 각 증권사 별 API는 다음과 같습니다. - [pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_account](#pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_account) - [pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_account](#pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_account) **pending_orders (list)** 미체결 주문 내역을 전달합니다. 각 주문 객체는 다음과 같은 값들을 포함하고 있습니다. ``` @dataclass class KISStockOrder: """ 주식 주문 정보를 담는 데이터 클래스입니다. """ order_no: str """ 주문 번호 """ asset_code: str """ 종목 코드 """ side: OrderSide """ 주문 방향 """ quantity: int """ 주문 수량 """ price: int """ 주문 가격 """ filled_quantity: int """ 체결 수량 """ filled_price: int """ 체결 가격 """ pending_quantity: int """ 미체결 수량 """ order_time: dtm.datetime """ 주문 시각 """ current_price: int """ 현재 가격 """ is_pending: Optional[bool] = None """ 미체결 여부 """ org_order_no: Optional[str] = None """ 원주문번호 """ order_type: Optional[OrderType] = None """ 주문 유형 """ req_type: Optional[OrderRequestType] = None """ 주문 요청 유형 """ ``` 각 증권사별 객체 정의는 다음과 같습니다. - [pyqqq.brokerage.kis.simple.KISStockOrder](#pyqqq.brokerage.kis.simple.KISStockOrder) - [pyqqq.brokerage.ebest.simple.EBestStockOrder](#pyqqq.brokerage.ebest.simple.EBestStockOrder) 각 증권사별 해당 값을 얻기 위한 API는 다음과 같습니다. - [pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_pending_orders](#pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_pending_orders) - [pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_pending_orders](#pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_pending_orders) **positions (list)** 보유 포지션 내역을 전달합니다. 각 항목은 다음과 같은 값들을 포함하고 있습니다. ``` @dataclass class KISStockPosition: """ 주식 보유 종목 정보를 담는 데이터 클래스입니다. """ asset_code: str """ 종목 코드 """ asset_name: str """ 종목 이름 """ quantity: int """ 보유 수량 """ average_purchase_price: Decimal """ 평균 매입 가격 """ current_price: int """ 현재 가격 """ current_value: int """ 현재 가치 """ current_pnl: Decimal """ 현재 손익률 """ current_pnl_value: int """ 현재 손익 금액 """ ``` 각 증권사별 객체 정의는 다음과 같습니다. - [pyqqq.brokerage.kis.simple.KISStockPosition](#pyqqq.brokerage.kis.simple.KISStockPosition) - [pyqqq.brokerage.ebest.simple.EBestStockPosition](#pyqqq.brokerage.ebest.simple.EBestStockPosition) 각 증권사별 해당 값을 얻기 위한 API는 다음과 같습니다. - [pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_positions](#pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_positions) - [pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_positions](#pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_positions) **broker_api (KISSimpleDomesticStock|EBestSimpleDomesticStock)** 증권사 API wrapper를 전달합니다. 시세나 주문상태 조회를 위해 사용할 수 있습니다. **시세 조회** - 시세: get_price, get_price_for_multiple_stock - 일봉: get_historical_daily_data - 분봉: get_today_minute_data **주문 내역** - 당일 전체 주문 내역: get_today_order_history - 미체결 주문 내역: get_pending_orders - 주문번호로 조회: get_order 각 증권사별 클래스는 다음과 같습니다. - [pyqqq.brokerage.kis.simple.KISSimpleDomesticStock](#pyqqq.brokerage.kis.simple.KISSimpleDomesticStock) - [pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock](#pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock) ## 미체결 주문 처리 미체결 주문을 확인하고 정정/취소할 내역을 반환합니다. 매 주기마다 trade_func 가 호출되기 전 미체결주문이 남아있으면 호출됩니다. ```{note} 이 hook을 구현하지 않은 경우 기본 동작은 미체결 주문을 모두 취소하는 것입니다. ``` ```python def handle_pending_orders(pending_orders, broker_api) -> List: result = [] for o in pending_orders: if o.price != o.current_price: if o.side == OrderSide.SELL: # 매도 주문의 경우 현재 가격으로 정정주문을 제출합니다 result.append((o.asset_code, o.order_no, o.current_price, o.pending_quantity)) elif o.side == OrderSide.BUY: # 매수 주문의 경우 취소주문을 제출합니다 result.append((o.asset_code, o.order_no, None, -1 * o.pending_quantity)) return result ``` - 응답의 각 항목은 `(종목코드, 원주문번호, 정정가격, 정정수량)`을 갖는 Tuple 형태여야 합니다. - 전량/일부 취소의 경우 가격은 `None` 으로 지정하고, 취소하고자 하는 수량을 음수로 지정합니다 - 정정의 경우 가격과 수량을 지정합니다. ## 장전후 처리를 위한 함수 ```python def on_before_market_open(account_info, pending_orders, positions, broker_api) def on_after_market_close(account_info, pending_orders, positions, broker_api) ``` - 매일 장전/후로 처리해야할 작업이 있으면 위 함수를 구현해 처리할 수 있습니다. - 각 함수가 호출되는 시점은 `on_initialize` 에서 반환하는 `before_market_open_schedule` 과 `after_market_close_schedule` 값을 지정하여 설정할 수 있습니다. ## Full example 올웨더 포트폴리오 전략을 구현한 예제 코드입니다. ### `app.yaml` ```yaml executor: hook ``` ### `.env` ``` KIS_APP_KEY= KIS_APP_SECRET= KIS_CANO= KIS_ACNT_PRDT_CD= ``` ### `requirements.txt` 외부 패키지를 사용하는 경우 `requirements.txt` 에 나열합니다. 이 전략에서는 ta 패키지를 사용합니다. ``` ta>=0.11.0 ``` ### all_weather_portfolio.py ```python from decimal import Decimal from pyqqq.utils.logger import get_logger from pyqqq.utils.market_schedule import get_market_schedule from pyqqq.utils.compute import get_krx_tick_size, quantize_krx_price from ta.volume import VolumeWeightedAveragePrice from typing import Dict, List, Tuple import datetime as dtm import time logger = get_logger("all_weather_port") portfolio = { "143850": 0.175, # TIGER 미국S&P500 선물 "294400": 0.175, # KODEF 200TR "148070": 0.25, # KOSEF 국고채10년 "305080": 0.25, # TIGER 미국채10년선물 "319640": 0.15, # TIGER 골드 선물 } def on_initialize() -> dict: return { "paper_trading": False, "open_market_time_margin": dtm.timedelta(minutes=60), "interval": dtm.timedelta(seconds=60), } def on_market_open(account_info, pending_orders, positions, broker_api) -> List[Tuple]: today = dtm.date.today() start_of_month = today.replace(day=1) first_trading_date_of_month = get_closest_trading_day(start_of_month) if today != first_trading_date_of_month: # 매월 첫 거래일에만 리밸런싱을 수행합니다. return [] # 지난 20분간의 vwap을 기준으로 매수가를 결정합니다 vwaps = get_vwaps(broker_api, list(portfolio.keys()), window=20) # 목표 수량을 계산 target_quantities = get_target_quantities(account_info, portfolio, vwaps) # 보유 중인 수량 확인 current_quantities = get_current_quantities(list(portfolio.keys()), positions) # 주문 수량을 계산 하고 주문 순서를 정합니다 order_sequence = calculate_order_sequence(current_quantities, target_quantities) result = [] for asset_code, quantity in order_sequence: result.append((asset_code, vwaps[asset_code], quantity)) logger.info(f"on_market_open: order instructions - {result}") return result async def handle_pending_orders(pending_orders, broker_api) -> List[Tuple]: result = [] now = dtm.datetime.now() today = now.date() schedule = get_market_schedule(today) market_close_time = dtm.datetime.combine(today, schedule.close_time) # 장 마감 30분 전까지 체결이 안된 경우, 시장가로 정정주문을 재출합니다. market_order_time = market_close_time - dtm.timedelta(minutes=30) for o in pending_orders: logger.info(f"pending_orders: {o}") if now < market_order_time: vwap = get_vwap(broker_api, o.asset_code, window=20) tick_size = get_krx_tick_size(o.price, etf_etn=True) diff = abs(o.price - vwap) # 주문가격과 vwap의 차이 depth = diff // tick_size if depth >= 2: # 주문가격과 vwap이 2틱 이상 차이나면 정정주문 재출 result.append((o.asset_code, o.order_no, vwap, o.pending_quantity)) else: # 장 마감 30분 전에도 체결이 안된 주문은 시장가로 제출합니다. result.append((o.asset_code, o.order_no, 0, o.pending_quantity)) return result def trade_func(account_info, pending_orders, positions, broker_api) -> List[Tuple]: # 장 중에는 매수/매도를 하지 않습니다. logger.info("") logger.info("# STATISTICS") logger.info("{:-^110}".format("")) logger.info( f"{'Code':10} {'PnL':5} {'Quantity':10} {'Avg Price':10} {'Current Price':10} {'Current Value':15} Name" ) for p in positions: logger.info( f"{p.asset_code:10} {p.current_pnl:5} {p.quantity:10} {p.average_purchase_price:10} {p.current_price:10} {p.current_value:15} {p.asset_name:30} " ) logger.info("{:-^110}".format("")) logger.info(f"- Total balance: {account_info['total_balance']:,}원") logger.info(f"- Purchase amount: {account_info['purchase_amount']:,}원") logger.info(f"- Evaluated amount: {account_info['evaluated_amount']:,}원") logger.info( f"- PnL rate: {account_info['pnl_rate'].quantize(Decimal('0.01'))}% ({account_info['pnl_amount']:,}원)" ) def calculate_quantities(account_info, ratio: float, vwap: int): """계좌의 총 평가 자산을 기준으로 주어진 비중으로 목표 수량을 계산합니다.""" total_balance = account_info["total_balance"] budget = round(total_balance * ratio) price = vwap quantity = budget // price return quantity def get_current_quantities(asset_codes: List[str], positions) -> Dict: """포트폴리오 자산의 현재 보유 수량을 조회합니다""" result = {} for p in positions: if p.asset_code in asset_codes: result[p.asset_code] = p.quantity return result def get_target_quantities( account_info, asset_allocations: Dict[str, float], vwaps: Dict[str, int] ) -> Dict: """포트폴리오 자산의 목표 보유 수량을 계산합니다""" result = {} for asset_code, ratio in asset_allocations.items(): result[asset_code] = calculate_quantities( account_info, ratio, vwaps[asset_code] ) return result def calculate_order_sequence( current_quantities: Dict[str, int], target_quantities: Dict[str, int] ) -> List[Tuple[str, int]]: """주문을 실행하기 위한 순서를 계산합니다""" result = [] for asset_code, target_quantity in target_quantities.items(): current_quantity = current_quantities.get(asset_code, 0) if current_quantity != target_quantity: result.append((asset_code, target_quantity - current_quantity)) # 수량이 작은 순으로 정렬 result = sorted(result, key=lambda x: x[1]) return result def get_closest_trading_day(date: dtm.date, direction: str = "forward") -> dtm.date: """가장 가까운 거래일을 구함""" while True: schedule = get_market_schedule(date) if schedule.full_day_closed: if direction == "foward": date += dtm.timedelta(days=1) else: date -= dtm.timedelta(days=1) time.sleep(0.01) else: break return date def get_vwaps(broker_api, asset_codes, window=20) -> dict[str, int]: """주어진 자산들의 최근 VWAP을 조회합니다.""" result = {} for asset_code in asset_codes: vwap = get_vwap(broker_api, asset_code, window) result[asset_code] = vwap return result def get_vwap(broker_api, asset_code, window=20) -> int: """주어진 자산의 최근 VWAP을 조회합니다.""" df = broker_api.get_today_minute_data(asset_code) df = df.iloc[:window] df = df.iloc[::-1] vwap = VolumeWeightedAveragePrice( high=df["high"], low=df["low"], close=df["close"], volume=df["volume"], window=window, ) vwap_price = vwap.volume_weighted_average_price()[-1] return quantize_krx_price(vwap_price, etf_etn=True, rounding="round") ``` ## 로컬 테스트 `qqq run` 명령어를 사용하여 실행 해볼 수 있습니다. ``` all_weather_portfolio $ qqq run ./all_weather_portfolio.py Looking in indexes: https://pypi.org/simple, https://_json_key_base64:****@asia-northeast3-python.pkg.dev/qupiato/qupiato-python/simple/ Requirement already satisfied: ta>=0.11.0 in /Users/tester/miniconda3/lib/python3.11/site-packages (from -r requirements.txt (line 1)) (0.11.0) Requirement already satisfied: numpy in /Users/tester/miniconda3/lib/python3.11/site-packages (from ta>=0.11.0->-r requirements.txt (line 1)) (1.26.0) Requirement already satisfied: pandas in /Users/tester/miniconda3/lib/python3.11/site-packages (from ta>=0.11.0->-r requirements.txt (line 1)) (1.5.3) Requirement already satisfied: python-dateutil>=2.8.1 in /Users/tester/miniconda3/lib/python3.11/site-packages (from pandas->ta>=0.11.0->-r requirements.txt (line 1)) (2.8.2) Requirement already satisfied: pytz>=2020.1 in /Users/tester/miniconda3/lib/python3.11/site-packages (from pandas->ta>=0.11.0->-r requirements.txt (line 1)) (2023.3.post1) Requirement already satisfied: six>=1.5 in /Users/tester/miniconda3/lib/python3.11/site-packages (from python-dateutil>=2.8.1->pandas->ta>=0.11.0->-r requirements.txt (line 1)) (1.16.0) 2024-04-19 11:18:59,990 I executor: [EXECUTOR] Call on_initialize hook 2024-04-19 11:18:59,990 I executor: Update interval to 0:01:00 2024-04-19 11:18:59,991 I executor: New day. 2024-04-19 2024-04-19 11:19:00,108 I executor: - before_open_call_time: 09:00:00 2024-04-19 11:19:00,109 I executor: - market_open_time: 09:00:00 2024-04-19 11:19:00,109 I executor: - user_market_open_time: 10:00:00 2024-04-19 11:19:00,109 I executor: - user_market_close_time: 15:15:00 2024-04-19 11:19:00,109 I executor: - market_close_time: 15:20:00 2024-04-19 11:19:00,109 I executor: - after_close_call_time: 16:15:00 2024-04-19 11:19:00,109 I executor: - interval: 60 seconds 2024-04-19 11:19:00,109 I executor: - start_margin: 3600 seconds 2024-04-19 11:19:00,109 I executor: - end_margin: 300 seconds 2024-04-19 11:19:00,109 I executor: brokerage: kis 2024-04-19 11:19:00,109 I executor: paper_trading: False 2024-04-19 11:19:00,109 I executor: 2024-04-19 11:19:01,270 I executor: [EXECUTOR] Call handle_pending_orders hook ``` ## 배포 `qqq deploy` 명령어를 사용하여 배포할 수 있습니다. ``` all_weather_portfolio % qqq deploy ./all_weather_portfolio.py Deploying ./all_weather_portfolio.py as all-weather-portfolio Uploading ./all_weather_portfolio.py to GCS bucket done. aa58144124684897b3485b9753bb230a.zip Wait for deployment ready... Deployment is ready - name: rpuvosbv60yhg6cuzrclfixd5cx2-all-weather-portfolio Done. You can show logs with following command: $ qqq logs rpuvosbv60yhg6cuzrclfixd5cx2-all-weather-portfolio ```