고급 Hook 활용하기#

app.yaml#

app.yaml 에는 전략의 실행 환경 관련 설정을 나열합니다.

executor: hook # hook 또는 simple (기본값: simple)
  • executor: 전략 실행 방식을 지정합니다. Hook 기반의 전략의 경우 값을 hook 으로 지정합니다.

.env 설정#

.env 파일에는 증권사 API 관련 설정을 나열합니다.

증권사 API Key 설정#

한국투자증권

KIS_APP_KEY=<한투 Open API App Key>
KIS_APP_SECRET=<한투 Open API App Secret>
KIS_CANO=<한투 계좌번호>
KIS_ACNT_PRDT_CD=<한투 계좌 상품번호>

LS(구 이베스트투자)증권

EBEST_APP_KEY=<이베스트 Open API App Key>
EBEST_APP_SECRET=<이베스트 Open API App Secret>

초기화#

전략 실행 관련 세부 설정을 위한 Hook 입니다. (Optional) 이 Hook을 직접 구현하지 않을 경우 아래 주석에 적혀있는 기본값이 적용됩니다.

def on_initialize() -> dict:
    return {
        # 모의투자 여부 (기본값: False)
        'paper_trading': False,

        # 전략 실행 주기 (기본값: 60초)
        'interval': dtm.timedelta(seconds=60),
        # (정규장)개장시간과 거래시간 사이의 여유시간 (기본값: 5분)
        'open_market_time_margin': dtm.timedelta(minutes=5),
        # (정규장)폐장시간과 거래시간 사이의 여유시간 (기본값: 5분)
        'market_close_time_margin': dtm.timedelta(minutes=5),
        # 장시작 전 호출되는 'on_before_market_open'과 개장 시간과의 간격 (기본값: 1시간)
        'before_market_open_schedule': dtm.timedelta(hours=1),
        # 장종료 후 호출되는 'on_after_market_close'과 폐장 시간과의 간격 (기본값: 1시간)
        'after_market_close_schedule': dtm.timedelta(hours=1),
        # 개장 시간 강제 설정 (기본값: None, 정규장 시작 시간 이후로만 지정 가능)
        'market_open_time': dtm.time(9, 0),
        # 폐장 시간 강제 설정 (기본값: None, 정규장 종료 시간 이전으로만 지정 가능)
        'market_close_time': dtm.time(15, 20),


        # 익절 조건 설정 (기본값: None)
        'take_profit_config': {
            '005930': 2, # 삼성전자 수익률이 2%에 도달하면 익절
        },
        # 손절 조건 설정 (기본값: None)
        'stop_loss_config': {
            '005930': -2, # 삼성전자 수익률이 -2%에 도달하면 손절
        },
    }

거래 지시#

# 거래 시간 시작 시 1회 호출되는 함수
def on_market_open(account_info, pending_orders, positions, broker_api) -> List[Tuple]
    ...

# 거래 시간 종료 시 1회 호출되는 함수
def on_market_close(account_info, pending_orders, positions, broker_api) -> List[Tuple]
    ...

# 장 중에 설정된 주기마다 호출되는 함수
def trade_func(account_info, pending_orders, positions, broker_api) -> List[Tuple]:
    return [
        ('005930', 81700, 10 ), # 삼성전자 10주를 81700에 지정가 매수,
        ('035420', 0, -5 ), # NAVER 5주를 시장가에 매도
    ]
  • 전략을 실행하고 거래할 내역을 반환합니다. (3가지 함수 모두 호출되는 시점만 다를 뿐 같은 역할을 합니다)

  • 가격을 0으로 지정하면 시장가로, 0보다 큰 값으로 지정하면 지정가로 주문을 넣습니다.

Parameters#

account_info (dict)

계좌정보를 포함하고 있는 딕셔너리 객체입니다.

{
    'total_balance': 999289, # 계좌 평가 금액 (현금+주식)
    'purchase_amount': 912690, # 주식 매입 금액
    'evaluated_amount': 913360, # 주식 평가 금액
    'pnl_amount': 670, # 수익
    'pnl_rate': Decimal('0.07340937229508376566169958188') # 수익률(%)
}

위 값을 얻는 각 증권사 별 API는 다음과 같습니다.

pending_orders (list)

미체결 주문 내역을 전달합니다. 각 주문 객체는 다음과 같은 값들을 포함하고 있습니다.

@dataclass
class KISStockOrder:
    """
    주식 주문 정보를 담는 데이터 클래스입니다.
    """

    order_no: str
    """ 주문 번호 """
    asset_code: str
    """ 종목 코드 """
    side: OrderSide
    """ 주문 방향 """
    quantity: int
    """ 주문 수량 """
    price: int
    """ 주문 가격 """
    filled_quantity: int
    """ 체결 수량 """
    filled_price: int
    """ 체결 가격 """
    pending_quantity: int
    """ 미체결 수량 """
    order_time: dtm.datetime
    """ 주문 시각 """
    current_price: int
    """ 현재 가격 """
    is_pending: Optional[bool] = None
    """ 미체결 여부 """
    org_order_no: Optional[str] = None
    """ 원주문번호 """
    order_type: Optional[OrderType] = None
    """ 주문 유형 """
    req_type: Optional[OrderRequestType] = None
    """ 주문 요청 유형 """

각 증권사별 객체 정의는 다음과 같습니다.

각 증권사별 해당 값을 얻기 위한 API는 다음과 같습니다.

positions (list)

보유 포지션 내역을 전달합니다. 각 항목은 다음과 같은 값들을 포함하고 있습니다.

@dataclass
class KISStockPosition:
    """
    주식 보유 종목 정보를 담는 데이터 클래스입니다.
    """

    asset_code: str
    """ 종목 코드 """
    asset_name: str
    """ 종목 이름 """
    quantity: int
    """ 보유 수량 """
    average_purchase_price: Decimal
    """ 평균 매입 가격 """
    current_price: int
    """ 현재 가격 """
    current_value: int
    """ 현재 가치 """
    current_pnl: Decimal
    """ 현재 손익률 """
    current_pnl_value: int
    """ 현재 손익 금액 """

각 증권사별 객체 정의는 다음과 같습니다.

각 증권사별 해당 값을 얻기 위한 API는 다음과 같습니다.

broker_api (KISSimpleDomesticStock|EBestSimpleDomesticStock)

증권사 API wrapper를 전달합니다. 시세나 주문상태 조회를 위해 사용할 수 있습니다.

시세 조회

  • 시세: get_price, get_price_for_multiple_stock

  • 일봉: get_historical_daily_data

  • 분봉: get_today_minute_data

주문 내역

  • 당일 전체 주문 내역: get_today_order_history

  • 미체결 주문 내역: get_pending_orders

  • 주문번호로 조회: get_order

각 증권사별 클래스는 다음과 같습니다.

미체결 주문 처리#

미체결 주문을 확인하고 정정/취소할 내역을 반환합니다. 매 주기마다 trade_func 가 호출되기 전 미체결주문이 남아있으면 호출됩니다.

Note

이 hook을 구현하지 않은 경우 기본 동작은 미체결 주문을 모두 취소하는 것입니다.

def handle_pending_orders(pending_orders, broker_api) -> List:
    result = []

    for o in pending_orders:
        if o.price != o.current_price:
            if o.side == OrderSide.SELL:
                # 매도 주문의 경우 현재 가격으로 정정주문을 제출합니다
                result.append((o.asset_code, o.order_no, o.current_price, o.pending_quantity))
            elif o.side == OrderSide.BUY:
                # 매수 주문의 경우 취소주문을 제출합니다
                result.append((o.asset_code, o.order_no, None, -1 * o.pending_quantity))

    return result
  • 응답의 각 항목은 (종목코드, 원주문번호, 정정가격, 정정수량)을 갖는 Tuple 형태여야 합니다.

  • 전량/일부 취소의 경우 가격은 None 으로 지정하고, 취소하고자 하는 수량을 음수로 지정합니다

  • 정정의 경우 가격과 수량을 지정합니다.

장전후 처리를 위한 함수#

def on_before_market_open(account_info, pending_orders, positions, broker_api)
def on_after_market_close(account_info, pending_orders, positions, broker_api)
  • 매일 장전/후로 처리해야할 작업이 있으면 위 함수를 구현해 처리할 수 있습니다.

  • 각 함수가 호출되는 시점은 on_initialize 에서 반환하는 before_market_open_scheduleafter_market_close_schedule 값을 지정하여 설정할 수 있습니다.

Full example#

올웨더 포트폴리오 전략을 구현한 예제 코드입니다.

app.yaml#

executor: hook

.env#

KIS_APP_KEY=
KIS_APP_SECRET=
KIS_CANO=
KIS_ACNT_PRDT_CD=

requirements.txt#

외부 패키지를 사용하는 경우 requirements.txt 에 나열합니다. 이 전략에서는 ta 패키지를 사용합니다.

ta>=0.11.0

all_weather_portfolio.py#

from decimal import Decimal
from pyqqq.utils.logger import get_logger
from pyqqq.utils.market_schedule import get_market_schedule
from pyqqq.utils.compute import get_krx_tick_size, quantize_krx_price
from ta.volume import VolumeWeightedAveragePrice
from typing import Dict, List, Tuple
import datetime as dtm
import time

logger = get_logger("all_weather_port")

portfolio = {
    "143850": 0.175,  # TIGER 미국S&P500 선물
    "294400": 0.175,  # KODEF 200TR
    "148070": 0.25,  # KOSEF 국고채10년
    "305080": 0.25,  # TIGER 미국채10년선물
    "319640": 0.15,  # TIGER 골드 선물
}


def on_initialize() -> dict:
    return {
        "paper_trading": False,
        "open_market_time_margin": dtm.timedelta(minutes=60),
        "interval": dtm.timedelta(seconds=60),
    }


def on_market_open(account_info, pending_orders, positions, broker_api) -> List[Tuple]:
    today = dtm.date.today()
    start_of_month = today.replace(day=1)
    first_trading_date_of_month = get_closest_trading_day(start_of_month)

    if today != first_trading_date_of_month:
        # 매월 첫 거래일에만 리밸런싱을 수행합니다.
        return []

    # 지난 20분간의 vwap을 기준으로 매수가를 결정합니다
    vwaps = get_vwaps(broker_api, list(portfolio.keys()), window=20)

    # 목표 수량을 계산
    target_quantities = get_target_quantities(account_info, portfolio, vwaps)
    # 보유 중인 수량 확인
    current_quantities = get_current_quantities(list(portfolio.keys()), positions)
    # 주문 수량을 계산 하고 주문 순서를 정합니다
    order_sequence = calculate_order_sequence(current_quantities, target_quantities)

    result = []
    for asset_code, quantity in order_sequence:
        result.append((asset_code, vwaps[asset_code], quantity))

    logger.info(f"on_market_open: order instructions - {result}")

    return result


async def handle_pending_orders(pending_orders, broker_api) -> List[Tuple]:
    result = []

    now = dtm.datetime.now()
    today = now.date()
    schedule = get_market_schedule(today)
    market_close_time = dtm.datetime.combine(today, schedule.close_time)

    # 장 마감 30분 전까지 체결이 안된 경우, 시장가로 정정주문을 재출합니다.
    market_order_time = market_close_time - dtm.timedelta(minutes=30)

    for o in pending_orders:
        logger.info(f"pending_orders: {o}")

        if now < market_order_time:
            vwap = get_vwap(broker_api, o.asset_code, window=20)
            tick_size = get_krx_tick_size(o.price, etf_etn=True)
            diff = abs(o.price - vwap)  # 주문가격과 vwap의 차이
            depth = diff // tick_size

            if depth >= 2:  # 주문가격과 vwap이 2틱 이상 차이나면 정정주문 재출
                result.append((o.asset_code, o.order_no, vwap, o.pending_quantity))
        else:
            # 장 마감 30분 전에도 체결이 안된 주문은 시장가로 제출합니다.
            result.append((o.asset_code, o.order_no, 0, o.pending_quantity))

    return result


def trade_func(account_info, pending_orders, positions, broker_api) -> List[Tuple]:
    # 장 중에는 매수/매도를 하지 않습니다.

    logger.info("")
    logger.info("# STATISTICS")
    logger.info("{:-^110}".format(""))
    logger.info(
        f"{'Code':10} {'PnL':5} {'Quantity':10} {'Avg Price':10} {'Current Price':10} {'Current Value':15} Name"
    )
    for p in positions:
        logger.info(
            f"{p.asset_code:10} {p.current_pnl:5} {p.quantity:10} {p.average_purchase_price:10} {p.current_price:10} {p.current_value:15}    {p.asset_name:30} "
        )

    logger.info("{:-^110}".format(""))

    logger.info(f"- Total balance: {account_info['total_balance']:,}원")
    logger.info(f"- Purchase amount: {account_info['purchase_amount']:,}원")
    logger.info(f"- Evaluated amount: {account_info['evaluated_amount']:,}원")
    logger.info(
        f"- PnL rate: {account_info['pnl_rate'].quantize(Decimal('0.01'))}% ({account_info['pnl_amount']:,}원)"
    )


def calculate_quantities(account_info, ratio: float, vwap: int):
    """계좌의 총 평가 자산을 기준으로 주어진 비중으로 목표 수량을 계산합니다."""
    total_balance = account_info["total_balance"]
    budget = round(total_balance * ratio)

    price = vwap
    quantity = budget // price

    return quantity


def get_current_quantities(asset_codes: List[str], positions) -> Dict:
    """포트폴리오 자산의 현재 보유 수량을 조회합니다"""
    result = {}

    for p in positions:
        if p.asset_code in asset_codes:
            result[p.asset_code] = p.quantity

    return result


def get_target_quantities(
    account_info, asset_allocations: Dict[str, float], vwaps: Dict[str, int]
) -> Dict:
    """포트폴리오 자산의 목표 보유 수량을 계산합니다"""
    result = {}

    for asset_code, ratio in asset_allocations.items():
        result[asset_code] = calculate_quantities(
            account_info, ratio, vwaps[asset_code]
        )

    return result


def calculate_order_sequence(
    current_quantities: Dict[str, int], target_quantities: Dict[str, int]
) -> List[Tuple[str, int]]:
    """주문을 실행하기 위한 순서를 계산합니다"""

    result = []

    for asset_code, target_quantity in target_quantities.items():
        current_quantity = current_quantities.get(asset_code, 0)

        if current_quantity != target_quantity:
            result.append((asset_code, target_quantity - current_quantity))

    # 수량이 작은 순으로 정렬
    result = sorted(result, key=lambda x: x[1])

    return result


def get_closest_trading_day(date: dtm.date, direction: str = "forward") -> dtm.date:
    """가장 가까운 거래일을 구함"""

    while True:
        schedule = get_market_schedule(date)

        if schedule.full_day_closed:
            if direction == "foward":
                date += dtm.timedelta(days=1)
            else:
                date -= dtm.timedelta(days=1)

            time.sleep(0.01)
        else:
            break

    return date


def get_vwaps(broker_api, asset_codes, window=20) -> dict[str, int]:
    """주어진 자산들의 최근 VWAP을 조회합니다."""
    result = {}
    for asset_code in asset_codes:
        vwap = get_vwap(broker_api, asset_code, window)
        result[asset_code] = vwap

    return result


def get_vwap(broker_api, asset_code, window=20) -> int:
    """주어진 자산의 최근 VWAP을 조회합니다."""
    df = broker_api.get_today_minute_data(asset_code)
    df = df.iloc[:window]
    df = df.iloc[::-1]

    vwap = VolumeWeightedAveragePrice(
        high=df["high"],
        low=df["low"],
        close=df["close"],
        volume=df["volume"],
        window=window,
    )

    vwap_price = vwap.volume_weighted_average_price()[-1]
    return quantize_krx_price(vwap_price, etf_etn=True, rounding="round")

로컬 테스트#

qqq run 명령어를 사용하여 실행 해볼 수 있습니다.

all_weather_portfolio $ qqq run ./all_weather_portfolio.py

Looking in indexes: https://pypi.org/simple, https://_json_key_base64:****@asia-northeast3-python.pkg.dev/qupiato/qupiato-python/simple/
Requirement already satisfied: ta>=0.11.0 in /Users/tester/miniconda3/lib/python3.11/site-packages (from -r requirements.txt (line 1)) (0.11.0)
Requirement already satisfied: numpy in /Users/tester/miniconda3/lib/python3.11/site-packages (from ta>=0.11.0->-r requirements.txt (line 1)) (1.26.0)
Requirement already satisfied: pandas in /Users/tester/miniconda3/lib/python3.11/site-packages (from ta>=0.11.0->-r requirements.txt (line 1)) (1.5.3)
Requirement already satisfied: python-dateutil>=2.8.1 in /Users/tester/miniconda3/lib/python3.11/site-packages (from pandas->ta>=0.11.0->-r requirements.txt (line 1)) (2.8.2)
Requirement already satisfied: pytz>=2020.1 in /Users/tester/miniconda3/lib/python3.11/site-packages (from pandas->ta>=0.11.0->-r requirements.txt (line 1)) (2023.3.post1)
Requirement already satisfied: six>=1.5 in /Users/tester/miniconda3/lib/python3.11/site-packages (from python-dateutil>=2.8.1->pandas->ta>=0.11.0->-r requirements.txt (line 1)) (1.16.0)
2024-04-19 11:18:59,990 I executor: [EXECUTOR] Call on_initialize hook
2024-04-19 11:18:59,990 I executor: Update interval to 0:01:00
2024-04-19 11:18:59,991 I executor: New day. 2024-04-19
2024-04-19 11:19:00,108 I executor: -  before_open_call_time: 09:00:00
2024-04-19 11:19:00,109 I executor: -       market_open_time: 09:00:00
2024-04-19 11:19:00,109 I executor: -  user_market_open_time: 10:00:00
2024-04-19 11:19:00,109 I executor: - user_market_close_time: 15:15:00
2024-04-19 11:19:00,109 I executor: -      market_close_time: 15:20:00
2024-04-19 11:19:00,109 I executor: -  after_close_call_time: 16:15:00
2024-04-19 11:19:00,109 I executor: -               interval: 60 seconds
2024-04-19 11:19:00,109 I executor: -           start_margin: 3600 seconds
2024-04-19 11:19:00,109 I executor: -             end_margin: 300 seconds
2024-04-19 11:19:00,109 I executor:                brokerage: kis
2024-04-19 11:19:00,109 I executor:            paper_trading: False
2024-04-19 11:19:00,109 I executor:
2024-04-19 11:19:01,270 I executor: [EXECUTOR] Call handle_pending_orders hook

배포#

qqq deploy 명령어를 사용하여 배포할 수 있습니다.

all_weather_portfolio % qqq deploy ./all_weather_portfolio.py
Deploying ./all_weather_portfolio.py as all-weather-portfolio
Uploading ./all_weather_portfolio.py to GCS bucket
done. aa58144124684897b3485b9753bb230a.zip
Wait for deployment ready...
Deployment is ready - name: rpuvosbv60yhg6cuzrclfixd5cx2-all-weather-portfolio
Done. You can show logs with following command:
$ qqq logs rpuvosbv60yhg6cuzrclfixd5cx2-all-weather-portfolio