from decimal import Decimal
from pyqqq.brokerage.kis.domestic_stock import KISDomesticStock
from pyqqq.brokerage.kis.oauth import KISAuth
from pyqqq.data.realtime import get_all_last_trades
from pyqqq.datatypes import *
from pyqqq.utils.market_schedule import get_market_schedule
from pyqqq.utils.mock_api import with_mock
from typing import AsyncGenerator, Dict, List, Optional
import asyncio
import datetime as dtm
import pandas as pd
[docs]
class KISStockPosition(StockPosition):
pass
[docs]
class KISStockOrder(StockOrder):
pass
[docs]
class KISSimpleDomesticStock:
"""
한국투자증권 국내 주식 API 사용하여 주식 거래를 하기 위한 클래스입니다.
기존 KISDomesticStock 클래스를 감싸고, 간단한 주문/조회 기능을 제공합니다.
Attributes:
auth (KISAuth): 인증 정보
account_no (str): 계좌 번호
account_product_code (str): 계좌 상품 코드
hts_id (Optional[str]): HTS ID
corp_data (Optional[dict]): 기업 고객의 경우 추가로 필요한 정보를 담고 있는 객체
"""
[docs]
def __init__(
self,
auth: KISAuth,
account_no: str,
account_product_code: str,
hts_id: Optional[str] = None,
corp_data: Optional[dict] = None,
):
self.account_no = account_no
self.account_product_code = account_product_code
self.stock_api = KISDomesticStock(auth, corp_data)
self.hts_id = hts_id
[docs]
def get_account(self) -> dict:
"""
계좌 정보를 조회합니다.
Returns:
dict: 계좌 정보
- total_balance (int): 총 평가 금액
- purchase_amount (int): 매입 금액
- evaluated_amount (int): 평가 금액
- pnl_amount (int): 손익 금액
- pnl_rate (Decimal): 손익률
"""
r = self.stock_api.inquire_balance(self.account_no, self.account_product_code, "01")
data = r["output2"][0]
purchase_amount = int(data["pchs_amt_smtl_amt"])
pnl_amount = int(data["evlu_pfls_smtl_amt"])
pnl_rate = Decimal(pnl_amount / purchase_amount) * 100 if purchase_amount != 0 else Decimal(0)
result = {
"total_balance": int(data["tot_evlu_amt"]),
"purchase_amount": purchase_amount,
"evaluated_amount": int(data["evlu_amt_smtl_amt"]),
"pnl_amount": pnl_amount,
"pnl_rate": pnl_rate,
}
return result
[docs]
def get_possible_quantity(
self,
asset_code: str,
order_type: OrderType = OrderType.MARKET,
price: int = 0,
) -> dict:
"""
주문 가능 수량을 조회합니다.
Args:
asset_code (str): 종목 코드
order_type (OrderType): 주문 유형
price (int): 주문 가격 (지정가 주문일 경우에만 필요)
Returns:
dict: 주문 가능 수량 정보
- investable_cash (int): 주문 가능 현금
- reusable_amount (int): 재사용 가능 금액
- price (int): 계산 기준 단가
- quantity (int): 주문 가능 수량
- amount (int): 주문 시 소요 금액
"""
def __get_order_type_code():
if order_type == OrderType.MARKET:
return "01"
elif order_type == OrderType.LIMIT:
return "00"
elif order_type == OrderType.LIMIT_IOC:
return "11"
elif order_type == OrderType.LIMIT_FOK:
return "12"
elif order_type == OrderType.MARKET_IOC:
return "13"
elif order_type == OrderType.MARKET_FOK:
return "14"
else:
raise ValueError("지원하지 않는 주문 유형입니다.")
resp = self.stock_api.inquire_psbl_order(
self.account_no,
self.account_product_code,
asset_code,
__get_order_type_code(),
price,
)
data = resp["output"]
result = {
"investable_cash": data["ord_psbl_cash"],
"reusable_amount": data["ruse_psbl_amt"],
"price": data["psbl_qty_calc_unpr"],
"quantity": data["max_buy_qty"],
"amount": data["max_buy_amt"],
}
return result
[docs]
def get_positions(self) -> List[StockPosition]:
"""
보유 종목을 조회합니다.
Returns:
List[StockPosition]: 보유 종목 정보 리스트
"""
fetching = True
ctx_area_fk100 = ""
ctx_area_nk100 = ""
tr_cont = ""
result = []
while fetching:
r = self.stock_api.inquire_balance(
self.account_no,
self.account_product_code,
"02",
tr_cont=tr_cont,
ctx_area_fk100=ctx_area_fk100,
ctx_area_nk100=ctx_area_nk100,
)
for el in r["output1"]:
position = StockPosition(
asset_code=el["pdno"],
asset_name=el["prdt_name"],
quantity=int(el["hldg_qty"]),
sell_possible_quantity=int(el["ord_psbl_qty"]),
average_purchase_price=Decimal(el["pchs_avg_pric"]),
current_price=int(el["prpr"]),
current_value=int(el["evlu_amt"]),
current_pnl=Decimal(el["evlu_pfls_rt"]),
current_pnl_value=el["evlu_pfls_amt"],
)
if position.quantity > 0:
result.append(position)
if r["tr_cont"] in ["F", "M"]:
tr_cont = "N"
ctx_area_fk100 = r["ctx_area_fk100"]
ctx_area_nk100 = r["ctx_area_nk100"]
else:
fetching = False
return result
[docs]
def get_historical_daily_data(self, asset_code: str, first_date: dtm.date, last_date: dtm.date, adjusted_price: bool = True) -> pd.DataFrame:
"""
일봉 데이터 검색
Args:
asset_code(str): 종목코드
first_date(datetime.date): 조회 시작일자
last_date(datetime.date): 조회 종료일자
adjusted_price(bool): 수정 주가 여부
Returns:
pd.DataFrame: 일봉 데이터
"""
assert first_date <= last_date, "last_date는 first_date와 같거나, 이후 날짜여야 합니다"
assert last_date <= dtm.date.today(), "last_date는 오늘과 같거나 이전이어야 합니다."
max_days_per_request = 100
total_days = (last_date - first_date).days
result = []
for i in range(0, total_days + 1, max_days_per_request + 1):
search_start = first_date + dtm.timedelta(days=i)
search_end = min(first_date + dtm.timedelta(days=i + max_days_per_request), last_date)
r = self.stock_api.inquire_daily_itemchartprice(asset_code, search_start, search_end, fid_period_div_code="D", fid_org_adj_prc="0" if adjusted_price else "1")
chunk = []
for item in r["output2"]:
if not item:
continue
chunk.append(
{
"date": item["stck_bsop_date"],
"open": item["stck_oprc"],
"high": item["stck_hgpr"],
"low": item["stck_lwpr"],
"close": item["stck_clpr"],
"volume": item["acml_vol"],
}
)
chunk.reverse()
result.extend(chunk)
df = pd.DataFrame(result)
df["date"] = pd.to_datetime(df["date"])
df.set_index("date", inplace=True)
return df
[docs]
def get_today_minute_data(self, asset_code: str) -> pd.DataFrame:
"""
분봉 데이터 검색
Args:
asset_code(str): 종목코드
Returns:
pd.DataFrame: 분봉 데이터
"""
request_time = dtm.datetime.now().replace(second=0, microsecond=0)
result = []
schedule = get_market_schedule(dtm.date.today())
while True:
r = self.stock_api.inquire_time_itemchartprice(asset_code, request_time.time(), fid_pw_data_incu_yn="N")
output = r["output2"]
if len(output) == 0:
break
for item in output:
if not item:
continue
result.append(
{
"time": dtm.datetime.combine(item["stck_bsop_date"], item["stck_cntg_hour"]),
"open": item["stck_oprc"],
"high": item["stck_hgpr"],
"low": item["stck_lwpr"],
"close": item["stck_prpr"],
"volume": item["cntg_vol"],
"cum_value": item["acml_tr_pbmn"],
}
)
last_item_time = result[-1]["time"]
request_time = last_item_time - dtm.timedelta(minutes=1)
if request_time.time() < schedule.open_time:
break
prev_cum_value = None
prev_cum_volume = None
for i in range(len(result)):
idx = len(result) - i - 1
curr = result[idx]
if prev_cum_value is None:
curr["value"] = curr["cum_value"]
else:
curr["value"] = curr["cum_value"] - prev_cum_value
if prev_cum_volume is None:
curr["cum_volume"] = curr["volume"]
else:
curr["cum_volume"] = curr["volume"] + prev_cum_volume
prev_cum_value = curr["cum_value"]
prev_cum_volume = curr["cum_volume"]
df = pd.DataFrame(result, columns=["time", "open", "high", "low", "close", "volume", "value", "cum_volume", "cum_value"])
df["time"] = pd.to_datetime(df["time"], format="%H:%M:%S")
df.set_index("time", inplace=True)
return df
[docs]
def get_price(self, asset_code: str) -> dict:
"""
주식 현재 가격 조회
Args:
asset_code(str): 종목코드
Returns:
dict: 현재 가격 정보
- code (str): 종목 코드
- current_price (int): 현재 가격
- volume (int): 거래량
- open_price (int): 시가
- high_price (int): 고가
- low_price (int): 저가
- max_price (int): 상한가
- min_price (int): 하한가
- diff (int): 전일대비
- diff_rate (float): 전일대비율
"""
r = self.stock_api.get_price(asset_code)
data = r["output"]
result = {
"code": asset_code,
"current_price": data["stck_prpr"],
"volume": data["acml_vol"],
"open_price": data["stck_oprc"],
"high_price": data["stck_hgpr"],
"low_price": data["stck_lwpr"],
"max_price": data["stck_mxpr"],
"min_price": data["stck_llam"],
"diff": data["prdy_vrss"],
"diff_rate": float(data["prdy_ctrt"]),
}
return result
[docs]
def get_price_for_multiple_stock(self, asset_codes: List[str]) -> pd.DataFrame:
"""
여러 종목의 현재 가격 조회
Args:
asset_codes(List[str]): 종목 코드 리스트
Returns:
pd.DataFrame: 현재 가격 정보
- code (str): 종목 코드
- current_price (int): 현재 가격
- volume (int): 거래량
- open_price (int): 시가
- high_price (int): 고가
- low_price (int): 저가
- diff (int): 전일대비
- diff_rate (float): 전일대비율
"""
r = get_all_last_trades()
result = []
for item in r:
if item["shcode"] in asset_codes:
result.append(
{
"code": item["shcode"],
"current_price": item["price"],
"volume": item["volume"],
"open_price": item["open"],
"high_price": item["high"],
"low_price": item["low"],
"diff": item["change"],
"diff_rate": round(item["drate"], 2),
}
)
df = pd.DataFrame(result)
df.set_index("code", inplace=True)
return df
[docs]
@with_mock()
def create_order(self, asset_code: str, side: OrderSide, quantity: int, order_type: OrderType = OrderType.MARKET, price: int = 0) -> str:
"""
주문을 생성합니다.
Args:
asset_code (str): 종목 코드
side (OrderSide): 주문 방향
quantity (int): 주문 수량
order_type (OrderType): 주문 유형
price (int): 주문 가격 (지정가 주문일 경우에만 필요)
Returns:
str: 주문 번호
"""
def __get_order_type_code():
if order_type == OrderType.LIMIT:
return "00"
elif order_type == OrderType.MARKET:
return "01"
elif order_type == OrderType.LIMIT_CONDITIONAL:
return "02"
elif order_type == OrderType.BEST_PRICE:
return "03"
elif order_type == OrderType.PRIMARY_PRICE:
return "04"
elif order_type == OrderType.PRE_MARKET:
return "05"
elif order_type == OrderType.AFTER_MARKET:
return "06"
elif order_type == OrderType.AFTER_MARKET_SINGLE_PRICE:
return "07"
elif order_type == OrderType.LIMIT_IOC:
return "11"
elif order_type == OrderType.LIMIT_FOK:
return "12"
elif order_type == OrderType.MARKET_IOC:
return "13"
elif order_type == OrderType.MARKET_FOK:
return "14"
elif order_type == OrderType.BEST_PRICE_IOC:
return "15"
elif order_type == OrderType.BEST_PRICE_FOK:
return "16"
else:
raise ValueError("지원하지 않는 주문 유형입니다.")
def __get_order_side_code():
if side == OrderSide.BUY:
return "buy"
elif side == OrderSide.SELL:
return "sell"
else:
raise ValueError("지원하지 않는 주문 방향입니다.")
r = self.stock_api.order_cash(
self.account_no,
self.account_product_code,
__get_order_side_code(),
asset_code,
__get_order_type_code(),
quantity,
price,
)
return r["output"]["ODNO"]
[docs]
@with_mock()
def update_order(self, org_order_no: str, order_type: OrderType, price: int, quantity: int = 0) -> str:
"""
주문을 수정합니다.
Args:
org_order_no (str): 원주문번호
order_type (OrderType): 주문 유형
price (int): 정정 가격
quantity (int): 주문 수량
Returns:
str: 주문 번호
"""
def __get_order_type_code():
if order_type == OrderType.LIMIT:
return "00"
elif order_type == OrderType.MARKET:
return "01"
elif order_type == OrderType.LIMIT_CONDITIONAL:
return "02"
elif order_type == OrderType.BEST_PRICE:
return "03"
elif order_type == OrderType.PRIMARY_PRICE:
return "04"
elif order_type == OrderType.PRE_MARKET:
return "05"
elif order_type == OrderType.AFTER_MARKET:
return "06"
elif order_type == OrderType.AFTER_MARKET_SINGLE_PRICE:
return "07"
elif order_type == OrderType.LIMIT_IOC:
return "11"
elif order_type == OrderType.LIMIT_FOK:
return "12"
elif order_type == OrderType.MARKET_IOC:
return "13"
elif order_type == OrderType.MARKET_FOK:
return "14"
elif order_type == OrderType.BEST_PRICE_IOC:
return "15"
elif order_type == OrderType.BEST_PRICE_FOK:
return "16"
else:
raise ValueError("지원하지 않는 주문 유형입니다.")
qty_all_ord_yn = "Y" if quantity == 0 else "N"
r = self.stock_api.order_rvsecncl(self.account_no, self.account_product_code, org_order_no, __get_order_type_code(), "01", quantity, price, qty_all_ord_yn) # 정정
return r["output"]["ODNO"]
[docs]
@with_mock()
def cancel_order(self, org_order_no: str, quantity: int = 0) -> str:
"""
주문을 취소합니다.
Args:
org_order_no (str): 원주문번호
quantity (int): 취소 수량 (일부 취소의 경우 지정)
"""
r = self.stock_api.order_rvsecncl(self.account_no, self.account_product_code, org_order_no, "00", "02", quantity, 0, "Y" if quantity == 0 else "N") # 취소
return r["output"]["ODNO"]
[docs]
def get_pending_orders(self) -> List[StockOrder]:
"""
미체결 주문을 조회합니다.
Returns:
List[StockOrder]: 미체결 주문 리스트
"""
fetching = True
ctx_area_fk100 = ""
ctx_area_nk100 = ""
tr_cont = ""
result: List[StockOrder] = []
asset_codes = set()
while fetching:
r = self.stock_api.inquire_daily_ccld(
self.account_no,
self.account_product_code,
inqr_strt_dt=dtm.date.today(),
inqr_end_dt=dtm.date.today(),
ccld_dvsn="02", # 미체결
ctx_area_fk100=ctx_area_fk100,
ctx_area_nk100=ctx_area_nk100,
tr_cont=tr_cont,
)
output1 = r["output1"]
for item in output1:
quantity = int(item["ord_qty"])
filled_quantity = int(item["tot_ccld_qty"])
filled_amount = int(item["tot_ccld_amt"])
pending_quantity = int(item["rmn_qty"])
filled_price = filled_amount // filled_quantity if filled_quantity > 0 else 0
order = StockOrder(
order_no=item["odno"],
asset_code=item["pdno"],
side=self._get_order_side_code(item["sll_buy_dvsn_cd"]),
quantity=quantity,
price=int(item["ord_unpr"]),
filled_quantity=filled_quantity,
filled_price=filled_price,
pending_quantity=pending_quantity,
order_time=item["ord_tmd"],
current_price=0,
is_pending=True,
org_order_no=item["orgn_odno"],
order_type=self._get_order_type(item["ord_dvsn_cd"]),
req_type=self._get_req_type(item["sll_buy_dvsn_cd_name"]),
)
if order.org_order_no == "0000000000":
order.org_order_no = None
asset_codes.add(item["pdno"])
result.append(order)
if r["tr_cont"] in ["F", "M"]:
ctx_area_fk100 = r["ctx_area_fk100"]
ctx_area_nk100 = r["ctx_area_nk100"]
tr_cont = "N"
else:
fetching = False
if len(asset_codes) > 0:
price_df = self.get_price_for_multiple_stock(list(asset_codes))
for order in result:
order.current_price = int(price_df.loc[order.asset_code, "current_price"])
return result
def get_pending_orders_old(self) -> List[StockOrder]:
"""
미체결 주문을 조회합니다.
Returns:
List[StockOrder]: 미체결 주문 리스트
"""
fetching = True
ctx_area_fk100 = ""
ctx_area_nk100 = ""
result: List[StockOrder] = []
asset_codes = []
while fetching:
r = self.stock_api.inquire_psbl_rvsecncl(
self.account_no,
self.account_product_code,
ctx_area_fk100=ctx_area_fk100,
ctx_area_nk100=ctx_area_nk100,
)
for item in r["output"]:
filled_amount = int(item.get("tot_ccld_amt", 0))
filled_quantity = int(item.get("tot_ccld_qty", 0))
filled_price = filled_amount // filled_quantity if filled_quantity > 0 else 0
asset_code = item["pdno"]
order = StockOrder(
order_no=item["odno"],
asset_code=asset_code,
side=self._get_order_side_code(item["sll_buy_dvsn_cd"]),
price=int(item["ord_unpr"]),
filled_price=filled_price,
current_price=0,
quantity=int(item["ord_qty"]),
filled_quantity=filled_quantity,
pending_quantity=item["psbl_qty"],
order_time=item["ord_tmd"],
order_type=self._get_order_type(item["ord_dvsn_cd"]),
req_type=self._get_req_type(item["rvse_cncl_dvsn_name"]),
)
asset_codes.append(asset_code)
result.append(order)
if r["tr_cont"] in ["F", "M"]:
ctx_area_fk100 = r["ctx_area_fk100"]
ctx_area_nk100 = r["ctx_area_nk100"]
else:
fetching = False
if len(asset_codes) > 0:
price_df = self.get_price_for_multiple_stock(asset_codes)
for order in result:
order.current_price = int(price_df.loc[order.asset_code, "current_price"])
return result
[docs]
def get_today_order_history(
self,
target_date: dtm.date = None,
order_no: str = "",
) -> List[StockOrder]:
"""
오늘 주문 내역을 조회합니다.
Returns:
List[StockOrder]: 오늘 주문 내역 리스트
"""
result = []
fetching = True
ctx_area_fk100 = ""
ctx_area_nk100 = ""
tr_cont = ""
pending_order_list = [item.order_no for item in self.get_pending_orders()]
if target_date is None:
target_date = dtm.date.today()
while fetching:
r = self.stock_api.inquire_daily_ccld(
self.account_no,
self.account_product_code,
inqr_strt_dt=target_date,
inqr_end_dt=target_date,
odno=order_no,
ctx_area_fk100=ctx_area_fk100,
ctx_area_nk100=ctx_area_nk100,
tr_cont=tr_cont,
)
for item in r["output1"]:
item["pending"] = item["odno"] in pending_order_list
quantity = int(item["ord_qty"])
filled_quantity = int(item["tot_ccld_qty"])
pending_quantity = quantity - filled_quantity
filled_price = int(item["avg_prvs"]) if "avg_prvs" in item else 0
order = StockOrder(
order_no=item["odno"],
asset_code=item["pdno"],
side=self._get_order_side_code(item["sll_buy_dvsn_cd"]),
quantity=quantity,
price=int(item["ord_unpr"]),
filled_quantity=filled_quantity,
filled_price=filled_price,
pending_quantity=pending_quantity,
order_time=item["ord_tmd"],
current_price=0,
is_pending=item["odno"] in pending_order_list,
org_order_no=item["orgn_odno"],
order_type=self._get_order_type(item["ord_dvsn_cd"]),
req_type=self._get_req_type(item["sll_buy_dvsn_cd_name"]),
)
result.append(order)
if r["tr_cont"] in ["F", "M"]:
ctx_area_fk100 = r["ctx_area_fk100"]
ctx_area_nk100 = r["ctx_area_nk100"]
tr_cont = "N"
else:
fetching = False
return result
[docs]
def get_order(self, odno: str) -> StockOrder:
"""
주문 번호로 주문 정보를 조회합니다.
당일 주문만 조회 가능합니다.
Args:
odno (str): 주문 번호
Returns:
StockOrder: 주문 정보
"""
result = self.get_today_order_history(order_no=odno)
if len(result) > 0:
return result[0]
else:
return None
def _get_order_side_code(self, sll_buy_dvsn_cd: str) -> OrderSide:
if sll_buy_dvsn_cd == "01":
return OrderSide.SELL
elif sll_buy_dvsn_cd == "02":
return OrderSide.BUY
else:
raise ValueError("지원하지 않는 주문 방향입니다.")
def _get_req_type(self, sll_buy_dvsn_cd_name: str) -> OrderRequestType:
if "취소" in sll_buy_dvsn_cd_name:
return OrderRequestType.CANCEL
elif "정정" in sll_buy_dvsn_cd_name:
return OrderRequestType.MODIFY
else:
return OrderRequestType.NEW
def _get_order_type(self, ord_dvsn_cd: str) -> OrderType:
if ord_dvsn_cd == "00":
return OrderType.LIMIT
elif ord_dvsn_cd == "01":
return OrderType.MARKET
elif ord_dvsn_cd == "02":
return OrderType.LIMIT_CONDITIONAL
elif ord_dvsn_cd == "03":
return OrderType.BEST_PRICE
elif ord_dvsn_cd == "04":
return OrderType.PRIMARY_PRICE
elif ord_dvsn_cd == "05":
return OrderType.PRE_MARKET
elif ord_dvsn_cd == "06":
return OrderType.AFTER_MARKET
elif ord_dvsn_cd == "07":
return OrderType.AFTER_MARKET_SINGLE_PRICE
elif ord_dvsn_cd == "08":
return OrderType.SELF_STOCK
elif ord_dvsn_cd == "09":
return OrderType.SELF_STOCK_S_OPTION
elif ord_dvsn_cd == "10":
return OrderType.SELF_STOCK_MONETARY_TRUST
elif ord_dvsn_cd == "11":
return OrderType.LIMIT_IOC
elif ord_dvsn_cd == "12":
return OrderType.LIMIT_FOK
elif ord_dvsn_cd == "13":
return OrderType.MARKET_IOC
elif ord_dvsn_cd == "14":
return OrderType.MARKET_FOK
elif ord_dvsn_cd == "15":
return OrderType.BEST_PRICE_IOC
elif ord_dvsn_cd == "16":
return OrderType.BEST_PRICE_FOK
else:
raise ValueError(f"지원하지 않는 주문 유형입니다. {ord_dvsn_cd}")
[docs]
async def listen_order_event(self, stop_event: Optional[asyncio.Event] = None) -> AsyncGenerator:
"""
계좌 주문 이벤트를 수신하는 메서드
Args:
stop_event (asyncio.Event): 종료 이벤트
Yields:
OrderEvent: 주문 이벤트
"""
assert self.hts_id is not None, "HTS ID가 필요합니다."
async for data in self.stock_api.listen_order_event(self.hts_id, stop_event):
order_event = self.map_order_event(data)
yield order_event
def map_order_event(self, data: dict) -> OrderEvent:
def __find_order_type(value):
value_map = {
"00": OrderType.LIMIT,
"01": OrderType.MARKET,
"02": OrderType.LIMIT_CONDITIONAL,
"03": OrderType.BEST_PRICE,
"04": OrderType.PRIMARY_PRICE,
"11": OrderType.LIMIT_IOC,
"12": OrderType.LIMIT_FOK,
"13": OrderType.MARKET_IOC,
"14": OrderType.MARKET_FOK,
"15": OrderType.BEST_PRICE_IOC,
"16": OrderType.BEST_PRICE_FOK,
}
# 장전후 시간외 및 단일거 거래는 시장가로 반환
return value_map.get(value, OrderType.MARKET)
acpt_yn = data.get("acpt_yn") # 1:주문접수 2:확인 3:취소(FOK/IOC)
cntg_yn = data.get("cntg_yn") # 1:주문,정정,취소,거부 2:체결
account_no = data["acnt_no"]
asset_code = data["stck_shrn_iscd"]
order_no = data["oder_no"]
side = OrderSide.SELL if data["seln_byov_cls"] == "01" else OrderSide.BUY
order_type = None
quantity = 0
price = None
event_type = ""
filled_quantity = 0
filled_price = None
filled_time = data["stck_cntg_hour"]
org_order_no = data["ooder_no"]
rejected = data.get("rfus_yn") == "1"
rctf_cls = data.get("rctf_cls") # 0: 신규, 1: 정정, 2: 취소
cancelled = rctf_cls == "2" and acpt_yn == "2" and cntg_yn == "1"
updated = rctf_cls == "1" and acpt_yn == "2" and cntg_yn == "1"
accepted = rctf_cls == "0" and acpt_yn == "1" and cntg_yn == "1"
executed = cntg_yn == "2"
if cntg_yn == "1": # 주문,정정,취소,거부
price = data["cntg_unpr"]
quantity = data["cntg_qty"]
order_type = __find_order_type(data["oder_kind"])
elif cntg_yn == "2": # 체결
price = int(data["oder_prc"])
quantity = data["oder_qty"]
filled_price = data["cntg_unpr"]
filled_quantity = data["cntg_qty"]
if rejected:
event_type = "rejected"
elif cancelled:
event_type = "cancelled"
order_no = org_order_no # 취소 주문의 경우 원주문번호로 변경
org_order_no = None
elif accepted or updated:
event_type = "accepted"
elif executed:
event_type = "executed"
if filled_time is not None:
t = dtm.datetime.strptime(filled_time, "%H%M%S").time()
filled_time = dtm.datetime.combine(dtm.date.today(), t)
print(f"- rctf_cls={rctf_cls} acpt_yn={acpt_yn} cntg_yn={cntg_yn} accepted={accepted} executed={executed} rejected={rejected} status={event_type}")
return OrderEvent(
asset_code,
order_no,
side,
order_type,
quantity,
price,
event_type,
account_no,
filled_quantity,
filled_price,
filled_time,
org_order_no,
)
[docs]
def get_orderbook(self, asset_code: str) -> Dict:
"""
특정 종목의 호가/잔량 정보를 조회하여 반환합니다.
Args:
asset_code (str): 조회할 자산의 종목번호.
Returns:
dict: 호가 정보가 포함된 사전.
- total_bid_volume (int): 총 매수 잔량.
- total_ask_volume (int): 총 매도 잔량.
- ask_price (int): 1차 매도 호가 가격.
- ask_volume (int): 1차 매도 호가 잔량.
- bid_price (int): 1차 매수 호가 가격.
- bid_volume (int): 1차 매수 호가 잔량.
- time (dtm.datetime): 현지 기준 호가 정보 조회 시간.
- bids (list): 매수 호가 목록 (각 항목은 price와 volume을 포함하는 dict).
- asks (list): 매도 호가 목록 (각 항목은 price과 volume을 포함하는 dict).
Raises:
ValueError: 주어진 티커가 유효하지 않거나 찾을 수 없는 경우 발생.
"""
r = self.stock_api.inquire_asking_price_exp_ccn(asset_code)
o1 = r["output1"]
result = {
"total_bid_volume": o1["total_bidp_rsqn"],
"total_ask_volume": o1["total_askp_rsqn"],
"ask_price": o1["askp1"],
"ask_volume": o1["askp_rsqn1"],
"bid_price": o1["bidp1"],
"bid_volume": o1["bidp_rsqn1"],
"time": dtm.datetime.combine(dtm.date.today(), o1["aspr_acpt_hour"]),
"bids": [{"price": o1[f"bidp{i}"], "volume": o1[f"bidp_rsqn{i}"]} for i in range(1, 11)],
"asks": [{"price": o1[f"askp{i}"], "volume": o1[f"askp_rsqn{i}"]} for i in range(1, 11)],
}
return result