고급 Hook 활용하기#
app.yaml
#
app.yaml 에는 전략의 실행 환경 관련 설정을 나열합니다.
executor: hook # hook 또는 simple (기본값: simple)
executor: 전략 실행 방식을 지정합니다. Hook 기반의 전략의 경우 값을
hook
으로 지정합니다.
.env
설정#
.env
파일에는 증권사 API 관련 설정을 나열합니다.
증권사 API Key 설정#
한국투자증권
KIS_APP_KEY=<한투 Open API App Key>
KIS_APP_SECRET=<한투 Open API App Secret>
KIS_CANO=<한투 계좌번호>
KIS_ACNT_PRDT_CD=<한투 계좌 상품번호>
LS(구 이베스트투자)증권
EBEST_APP_KEY=<이베스트 Open API App Key>
EBEST_APP_SECRET=<이베스트 Open API App Secret>
초기화#
전략 실행 관련 세부 설정을 위한 Hook 입니다. (Optional) 이 Hook을 직접 구현하지 않을 경우 아래 주석에 적혀있는 기본값이 적용됩니다.
def on_initialize() -> dict:
return {
# 모의투자 여부 (기본값: False)
'paper_trading': False,
# 전략 실행 주기 (기본값: 60초)
'interval': dtm.timedelta(seconds=60),
# (정규장)개장시간과 거래시간 사이의 여유시간 (기본값: 5분)
'open_market_time_margin': dtm.timedelta(minutes=5),
# (정규장)폐장시간과 거래시간 사이의 여유시간 (기본값: 5분)
'market_close_time_margin': dtm.timedelta(minutes=5),
# 장시작 전 호출되는 'on_before_market_open'과 개장 시간과의 간격 (기본값: 1시간)
'before_market_open_schedule': dtm.timedelta(hours=1),
# 장종료 후 호출되는 'on_after_market_close'과 폐장 시간과의 간격 (기본값: 1시간)
'after_market_close_schedule': dtm.timedelta(hours=1),
# 개장 시간 강제 설정 (기본값: None, 정규장 시작 시간 이후로만 지정 가능)
'market_open_time': dtm.time(9, 0),
# 폐장 시간 강제 설정 (기본값: None, 정규장 종료 시간 이전으로만 지정 가능)
'market_close_time': dtm.time(15, 20),
# 익절 조건 설정 (기본값: None)
'take_profit_config': {
'005930': 2, # 삼성전자 수익률이 2%에 도달하면 익절
},
# 손절 조건 설정 (기본값: None)
'stop_loss_config': {
'005930': -2, # 삼성전자 수익률이 -2%에 도달하면 손절
},
}
거래 지시#
# 거래 시간 시작 시 1회 호출되는 함수
def on_market_open(account_info, pending_orders, positions, broker_api) -> List[Tuple]
...
# 거래 시간 종료 시 1회 호출되는 함수
def on_market_close(account_info, pending_orders, positions, broker_api) -> List[Tuple]
...
# 장 중에 설정된 주기마다 호출되는 함수
def trade_func(account_info, pending_orders, positions, broker_api) -> List[Tuple]:
return [
('005930', 81700, 10 ), # 삼성전자 10주를 81700에 지정가 매수,
('035420', 0, -5 ), # NAVER 5주를 시장가에 매도
]
전략을 실행하고 거래할 내역을 반환합니다. (3가지 함수 모두 호출되는 시점만 다를 뿐 같은 역할을 합니다)
가격을 0으로 지정하면 시장가로, 0보다 큰 값으로 지정하면 지정가로 주문을 넣습니다.
Parameters#
account_info (dict)
계좌정보를 포함하고 있는 딕셔너리 객체입니다.
{
'total_balance': 999289, # 계좌 평가 금액 (현금+주식)
'purchase_amount': 912690, # 주식 매입 금액
'evaluated_amount': 913360, # 주식 평가 금액
'pnl_amount': 670, # 수익
'pnl_rate': Decimal('0.07340937229508376566169958188') # 수익률(%)
}
위 값을 얻는 각 증권사 별 API는 다음과 같습니다.
pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_account
pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_account
pending_orders (list)
미체결 주문 내역을 전달합니다. 각 주문 객체는 다음과 같은 값들을 포함하고 있습니다.
@dataclass
class KISStockOrder:
"""
주식 주문 정보를 담는 데이터 클래스입니다.
"""
order_no: str
""" 주문 번호 """
asset_code: str
""" 종목 코드 """
side: OrderSide
""" 주문 방향 """
quantity: int
""" 주문 수량 """
price: int
""" 주문 가격 """
filled_quantity: int
""" 체결 수량 """
filled_price: int
""" 체결 가격 """
pending_quantity: int
""" 미체결 수량 """
order_time: dtm.datetime
""" 주문 시각 """
current_price: int
""" 현재 가격 """
is_pending: Optional[bool] = None
""" 미체결 여부 """
org_order_no: Optional[str] = None
""" 원주문번호 """
order_type: Optional[OrderType] = None
""" 주문 유형 """
req_type: Optional[OrderRequestType] = None
""" 주문 요청 유형 """
각 증권사별 객체 정의는 다음과 같습니다.
각 증권사별 해당 값을 얻기 위한 API는 다음과 같습니다.
pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_pending_orders
pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_pending_orders
positions (list)
보유 포지션 내역을 전달합니다. 각 항목은 다음과 같은 값들을 포함하고 있습니다.
@dataclass
class KISStockPosition:
"""
주식 보유 종목 정보를 담는 데이터 클래스입니다.
"""
asset_code: str
""" 종목 코드 """
asset_name: str
""" 종목 이름 """
quantity: int
""" 보유 수량 """
average_purchase_price: Decimal
""" 평균 매입 가격 """
current_price: int
""" 현재 가격 """
current_value: int
""" 현재 가치 """
current_pnl: Decimal
""" 현재 손익률 """
current_pnl_value: int
""" 현재 손익 금액 """
각 증권사별 객체 정의는 다음과 같습니다.
각 증권사별 해당 값을 얻기 위한 API는 다음과 같습니다.
pyqqq.brokerage.kis.simple.KISSimpleDomesticStock.get_positions
pyqqq.brokerage.ebest.simple.EBestSimpleDomesticStock.get_positions
broker_api (KISSimpleDomesticStock|EBestSimpleDomesticStock)
증권사 API wrapper를 전달합니다. 시세나 주문상태 조회를 위해 사용할 수 있습니다.
시세 조회
시세: get_price, get_price_for_multiple_stock
일봉: get_historical_daily_data
분봉: get_today_minute_data
주문 내역
당일 전체 주문 내역: get_today_order_history
미체결 주문 내역: get_pending_orders
주문번호로 조회: get_order
각 증권사별 클래스는 다음과 같습니다.
미체결 주문 처리#
미체결 주문을 확인하고 정정/취소할 내역을 반환합니다. 매 주기마다 trade_func 가 호출되기 전 미체결주문이 남아있으면 호출됩니다.
Note
이 hook을 구현하지 않은 경우 기본 동작은 미체결 주문을 모두 취소하는 것입니다.
def handle_pending_orders(pending_orders, broker_api) -> List:
result = []
for o in pending_orders:
if o.price != o.current_price:
if o.side == OrderSide.SELL:
# 매도 주문의 경우 현재 가격으로 정정주문을 제출합니다
result.append((o.asset_code, o.order_no, o.current_price, o.pending_quantity))
elif o.side == OrderSide.BUY:
# 매수 주문의 경우 취소주문을 제출합니다
result.append((o.asset_code, o.order_no, None, -1 * o.pending_quantity))
return result
응답의 각 항목은
(종목코드, 원주문번호, 정정가격, 정정수량)
을 갖는 Tuple 형태여야 합니다.전량/일부 취소의 경우 가격은
None
으로 지정하고, 취소하고자 하는 수량을 음수로 지정합니다정정의 경우 가격과 수량을 지정합니다.
장전후 처리를 위한 함수#
def on_before_market_open(account_info, pending_orders, positions, broker_api)
def on_after_market_close(account_info, pending_orders, positions, broker_api)
매일 장전/후로 처리해야할 작업이 있으면 위 함수를 구현해 처리할 수 있습니다.
각 함수가 호출되는 시점은
on_initialize
에서 반환하는before_market_open_schedule
과after_market_close_schedule
값을 지정하여 설정할 수 있습니다.
Full example#
올웨더 포트폴리오 전략을 구현한 예제 코드입니다.
app.yaml
#
executor: hook
.env
#
KIS_APP_KEY=
KIS_APP_SECRET=
KIS_CANO=
KIS_ACNT_PRDT_CD=
requirements.txt
#
외부 패키지를 사용하는 경우 requirements.txt
에 나열합니다. 이 전략에서는 ta 패키지를 사용합니다.
ta>=0.11.0
all_weather_portfolio.py#
from decimal import Decimal
from pyqqq.utils.logger import get_logger
from pyqqq.utils.market_schedule import get_market_schedule
from pyqqq.utils.compute import get_krx_tick_size, quantize_krx_price
from ta.volume import VolumeWeightedAveragePrice
from typing import Dict, List, Tuple
import datetime as dtm
import time
logger = get_logger("all_weather_port")
portfolio = {
"143850": 0.175, # TIGER 미국S&P500 선물
"294400": 0.175, # KODEF 200TR
"148070": 0.25, # KOSEF 국고채10년
"305080": 0.25, # TIGER 미국채10년선물
"319640": 0.15, # TIGER 골드 선물
}
def on_initialize() -> dict:
return {
"paper_trading": False,
"open_market_time_margin": dtm.timedelta(minutes=60),
"interval": dtm.timedelta(seconds=60),
}
def on_market_open(account_info, pending_orders, positions, broker_api) -> List[Tuple]:
today = dtm.date.today()
start_of_month = today.replace(day=1)
first_trading_date_of_month = get_closest_trading_day(start_of_month)
if today != first_trading_date_of_month:
# 매월 첫 거래일에만 리밸런싱을 수행합니다.
return []
# 지난 20분간의 vwap을 기준으로 매수가를 결정합니다
vwaps = get_vwaps(broker_api, list(portfolio.keys()), window=20)
# 목표 수량을 계산
target_quantities = get_target_quantities(account_info, portfolio, vwaps)
# 보유 중인 수량 확인
current_quantities = get_current_quantities(list(portfolio.keys()), positions)
# 주문 수량을 계산 하고 주문 순서를 정합니다
order_sequence = calculate_order_sequence(current_quantities, target_quantities)
result = []
for asset_code, quantity in order_sequence:
result.append((asset_code, vwaps[asset_code], quantity))
logger.info(f"on_market_open: order instructions - {result}")
return result
async def handle_pending_orders(pending_orders, broker_api) -> List[Tuple]:
result = []
now = dtm.datetime.now()
today = now.date()
schedule = get_market_schedule(today)
market_close_time = dtm.datetime.combine(today, schedule.close_time)
# 장 마감 30분 전까지 체결이 안된 경우, 시장가로 정정주문을 재출합니다.
market_order_time = market_close_time - dtm.timedelta(minutes=30)
for o in pending_orders:
logger.info(f"pending_orders: {o}")
if now < market_order_time:
vwap = get_vwap(broker_api, o.asset_code, window=20)
tick_size = get_krx_tick_size(o.price, etf_etn=True)
diff = abs(o.price - vwap) # 주문가격과 vwap의 차이
depth = diff // tick_size
if depth >= 2: # 주문가격과 vwap이 2틱 이상 차이나면 정정주문 재출
result.append((o.asset_code, o.order_no, vwap, o.pending_quantity))
else:
# 장 마감 30분 전에도 체결이 안된 주문은 시장가로 제출합니다.
result.append((o.asset_code, o.order_no, 0, o.pending_quantity))
return result
def trade_func(account_info, pending_orders, positions, broker_api) -> List[Tuple]:
# 장 중에는 매수/매도를 하지 않습니다.
logger.info("")
logger.info("# STATISTICS")
logger.info("{:-^110}".format(""))
logger.info(
f"{'Code':10} {'PnL':5} {'Quantity':10} {'Avg Price':10} {'Current Price':10} {'Current Value':15} Name"
)
for p in positions:
logger.info(
f"{p.asset_code:10} {p.current_pnl:5} {p.quantity:10} {p.average_purchase_price:10} {p.current_price:10} {p.current_value:15} {p.asset_name:30} "
)
logger.info("{:-^110}".format(""))
logger.info(f"- Total balance: {account_info['total_balance']:,}원")
logger.info(f"- Purchase amount: {account_info['purchase_amount']:,}원")
logger.info(f"- Evaluated amount: {account_info['evaluated_amount']:,}원")
logger.info(
f"- PnL rate: {account_info['pnl_rate'].quantize(Decimal('0.01'))}% ({account_info['pnl_amount']:,}원)"
)
def calculate_quantities(account_info, ratio: float, vwap: int):
"""계좌의 총 평가 자산을 기준으로 주어진 비중으로 목표 수량을 계산합니다."""
total_balance = account_info["total_balance"]
budget = round(total_balance * ratio)
price = vwap
quantity = budget // price
return quantity
def get_current_quantities(asset_codes: List[str], positions) -> Dict:
"""포트폴리오 자산의 현재 보유 수량을 조회합니다"""
result = {}
for p in positions:
if p.asset_code in asset_codes:
result[p.asset_code] = p.quantity
return result
def get_target_quantities(
account_info, asset_allocations: Dict[str, float], vwaps: Dict[str, int]
) -> Dict:
"""포트폴리오 자산의 목표 보유 수량을 계산합니다"""
result = {}
for asset_code, ratio in asset_allocations.items():
result[asset_code] = calculate_quantities(
account_info, ratio, vwaps[asset_code]
)
return result
def calculate_order_sequence(
current_quantities: Dict[str, int], target_quantities: Dict[str, int]
) -> List[Tuple[str, int]]:
"""주문을 실행하기 위한 순서를 계산합니다"""
result = []
for asset_code, target_quantity in target_quantities.items():
current_quantity = current_quantities.get(asset_code, 0)
if current_quantity != target_quantity:
result.append((asset_code, target_quantity - current_quantity))
# 수량이 작은 순으로 정렬
result = sorted(result, key=lambda x: x[1])
return result
def get_closest_trading_day(date: dtm.date, direction: str = "forward") -> dtm.date:
"""가장 가까운 거래일을 구함"""
while True:
schedule = get_market_schedule(date)
if schedule.full_day_closed:
if direction == "foward":
date += dtm.timedelta(days=1)
else:
date -= dtm.timedelta(days=1)
time.sleep(0.01)
else:
break
return date
def get_vwaps(broker_api, asset_codes, window=20) -> dict[str, int]:
"""주어진 자산들의 최근 VWAP을 조회합니다."""
result = {}
for asset_code in asset_codes:
vwap = get_vwap(broker_api, asset_code, window)
result[asset_code] = vwap
return result
def get_vwap(broker_api, asset_code, window=20) -> int:
"""주어진 자산의 최근 VWAP을 조회합니다."""
df = broker_api.get_today_minute_data(asset_code)
df = df.iloc[:window]
df = df.iloc[::-1]
vwap = VolumeWeightedAveragePrice(
high=df["high"],
low=df["low"],
close=df["close"],
volume=df["volume"],
window=window,
)
vwap_price = vwap.volume_weighted_average_price()[-1]
return quantize_krx_price(vwap_price, etf_etn=True, rounding="round")
로컬 테스트#
qqq run
명령어를 사용하여 실행 해볼 수 있습니다.
all_weather_portfolio $ qqq run ./all_weather_portfolio.py
Looking in indexes: https://pypi.org/simple, https://_json_key_base64:****@asia-northeast3-python.pkg.dev/qupiato/qupiato-python/simple/
Requirement already satisfied: ta>=0.11.0 in /Users/tester/miniconda3/lib/python3.11/site-packages (from -r requirements.txt (line 1)) (0.11.0)
Requirement already satisfied: numpy in /Users/tester/miniconda3/lib/python3.11/site-packages (from ta>=0.11.0->-r requirements.txt (line 1)) (1.26.0)
Requirement already satisfied: pandas in /Users/tester/miniconda3/lib/python3.11/site-packages (from ta>=0.11.0->-r requirements.txt (line 1)) (1.5.3)
Requirement already satisfied: python-dateutil>=2.8.1 in /Users/tester/miniconda3/lib/python3.11/site-packages (from pandas->ta>=0.11.0->-r requirements.txt (line 1)) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /Users/tester/miniconda3/lib/python3.11/site-packages (from pandas->ta>=0.11.0->-r requirements.txt (line 1)) (2023.3.post1)
Requirement already satisfied: six>=1.5 in /Users/tester/miniconda3/lib/python3.11/site-packages (from python-dateutil>=2.8.1->pandas->ta>=0.11.0->-r requirements.txt (line 1)) (1.16.0)
2024-04-19 11:18:59,990 I executor: [EXECUTOR] Call on_initialize hook
2024-04-19 11:18:59,990 I executor: Update interval to 0:01:00
2024-04-19 11:18:59,991 I executor: New day. 2024-04-19
2024-04-19 11:19:00,108 I executor: - before_open_call_time: 09:00:00
2024-04-19 11:19:00,109 I executor: - market_open_time: 09:00:00
2024-04-19 11:19:00,109 I executor: - user_market_open_time: 10:00:00
2024-04-19 11:19:00,109 I executor: - user_market_close_time: 15:15:00
2024-04-19 11:19:00,109 I executor: - market_close_time: 15:20:00
2024-04-19 11:19:00,109 I executor: - after_close_call_time: 16:15:00
2024-04-19 11:19:00,109 I executor: - interval: 60 seconds
2024-04-19 11:19:00,109 I executor: - start_margin: 3600 seconds
2024-04-19 11:19:00,109 I executor: - end_margin: 300 seconds
2024-04-19 11:19:00,109 I executor: brokerage: kis
2024-04-19 11:19:00,109 I executor: paper_trading: False
2024-04-19 11:19:00,109 I executor:
2024-04-19 11:19:01,270 I executor: [EXECUTOR] Call handle_pending_orders hook
배포#
qqq deploy
명령어를 사용하여 배포할 수 있습니다.
all_weather_portfolio % qqq deploy ./all_weather_portfolio.py
Deploying ./all_weather_portfolio.py as all-weather-portfolio
Uploading ./all_weather_portfolio.py to GCS bucket
done. aa58144124684897b3485b9753bb230a.zip
Wait for deployment ready...
Deployment is ready - name: rpuvosbv60yhg6cuzrclfixd5cx2-all-weather-portfolio
Done. You can show logs with following command:
$ qqq logs rpuvosbv60yhg6cuzrclfixd5cx2-all-weather-portfolio