from dataclasses import dataclass
from decimal import Decimal
from pyqqq.brokerage.ebest.domestic_stock import EBestDomesticStock
from pyqqq.brokerage.ebest.oauth import EBestAuth
from pyqqq.data.realtime import get_all_last_trades
from pyqqq.datatypes import *
from pyqqq.utils.logger import get_logger
from pyqqq.utils.mock_api import with_mock
from typing import AsyncGenerator, List, Set
import asyncio
import datetime as dtm
import pandas as pd
import time
[docs]
@dataclass
class EBestStockPosition(StockPosition):
pass
[docs]
@dataclass
class EBestStockOrder(StockOrder):
pass
[docs]
class EBestSimpleDomesticStock:
"""
LS(구 이베스트투자)증권 국내 주식 API 사용하여 주식 거래를 하기 위한 클래스입니다.
기존 EBestDomesticStock 클래스를 감싸고, 간단한 주문/조회 기능을 제공합니다.
Attributes:
auth (EBestAuth): 인증 정보
"""
[docs]
def __init__(self, auth: EBestAuth):
self.stock_api = EBestDomesticStock(auth)
self.logger = get_logger(__name__ + ".EBestSimpleDomesticStock")
[docs]
@with_mock()
def get_account(self) -> dict:
"""
계좌 정보를 조회합니다.
Returns:
dict: 계좌 정보
- account_no (str): 계좌 번호
- total_balance (int): 총 평가 금액
- purchase_amount (int): 매입 금액
- evaluated_amount (int): 평가 금액
- pnl_amount (int): 손익 금액
- pnl_rate (Decimal): 손익률
"""
r = self.stock_api.get_stock_balance()
data = r["output1"]
purchase_amount = data["mamt"]
evaluated_amount = data["tappamt"]
pnl_amount = data["tdtsunik"]
pnl_rate = Decimal(pnl_amount / purchase_amount * 100) if purchase_amount != 0 else Decimal(0)
result = {
"total_balance": data["sunamt"],
"purchase_amount": purchase_amount,
"evaluated_amount": evaluated_amount,
"pnl_amount": pnl_amount,
"pnl_rate": pnl_rate,
}
r = self.stock_api.get_account_deposit_orderable_total_evaluation()
result["account_no"] = r["output1"]["AcntNo"]
result["investable_cash"] = r["output2"]["MnyOrdAbleAmt"]
return result
[docs]
@with_mock()
def get_possible_quantity(self, asset_code: str, order_type: OrderType = OrderType.MARKET, price: int = 0) -> dict:
"""
주문 가능 수량을 조회합니다.
Args:
asset_code (str): 종목 코드
order_type (OrderType): 주문 유형
price (int): 주문 가격 (지정가 주문일 경우에만 필요)
Returns:
dict: 주문 가능 수량 정보
- investable_cash (int): 주문 가능 현금
- reusable_amount (int): 재사용 가능 금액
- price (int): 계산 기준 단가
- quantity (int): 주문 가능 수량
- amount (int): 주문 시 소요 금액
"""
resp = self.stock_api.get_account_orderable_quantity(
2,
asset_code,
price,
)
output1 = resp["output1"]
output2 = resp["output2"]
result = {
"investable_cash": output2["MnyOrdAbleAmt"],
"reusable_amount": output2["RuseObjAmt"],
"price": int(Decimal(output1["OrdPrc"])),
"quantity": output2["OrdAbleQty"],
"amount": output2["OrdAbleAmt"],
}
return result
[docs]
@with_mock()
def get_positions(self) -> List[StockPosition]:
"""
보유 종목을 조회합니다.
Returns:
List[StockPosition]: 보유 종목 정보 리스트
"""
tr_cont_key = ""
cts_expcode = ""
fetching = True
result = []
while fetching:
r = self.stock_api.get_stock_balance(tr_cont_key=tr_cont_key, cts_expcode=cts_expcode)
output1 = r["output1"]
output2 = r["output2"]
tr_cont_key = output1.get("tr_cont_key", "")
cts_expcode = output1.get("cts_expcode", "")
for el in output2:
position = StockPosition(
asset_code=el["expcode"],
asset_name=el["hname"],
quantity=int(el["janqty"]),
sell_possible_quantity=int(el["mdposqt"]),
average_purchase_price=Decimal(el["pamt"]),
current_price=int(el["price"]),
current_value=int(el["appamt"]),
current_pnl=Decimal(el["sunikrt"]),
current_pnl_value=int(el["dtsunik"]),
)
if position.quantity > 0:
result.append(position)
if cts_expcode.strip() == "":
fetching = False
return result
[docs]
@with_mock()
def get_historical_daily_data(
self,
asset_code: str,
first_date: dtm.date,
last_date: dtm.date,
adjusted_price: bool = True,
) -> pd.DataFrame:
"""
일봉 데이터 검색
Args:
asset_code(str): 종목코드
first_date(datetime.date): 조회 시작일자
last_date(datetime.date): 조회 종료일자
adjusted_price(bool): 수정 주가 여부
Returns:
pd.DataFrame: 일봉 데이터
"""
assert first_date <= last_date, "last_date는 first_date와 같거나, 이후 날짜여야 합니다"
assert last_date <= dtm.date.today(), "last_date는 오늘과 같거나 이전이어야 합니다."
cts_date = ""
tr_cont_key = ""
fetching = True
result = []
while fetching:
r = self.stock_api.get_stock_chart_dwmy(
asset_code,
"d",
last_date,
first_date,
cts_date=cts_date,
tr_cont_key=tr_cont_key,
sujung_yn=adjusted_price,
)
output1 = r["output1"]
tr_cont_key = r.get("tr_cont_key", "")
cts_date = output1.get("cts_date", "").strip()
output2 = r["output2"]
output2.reverse()
for item in output2:
if not item:
continue
result.append(
{
"date": dtm.datetime.strptime(item["date"], "%Y%m%d").date(),
"open": item["open"],
"high": item["high"],
"low": item["low"],
"close": item["close"],
"volume": item["jdiff_vol"],
}
)
if r["tr_cont"] == "N" or cts_date == "":
fetching = False
df = pd.DataFrame(result)
df["date"] = pd.to_datetime(df["date"])
df.set_index("date", inplace=True)
return df
[docs]
def get_today_minute_data(self, asset_code: str) -> pd.DataFrame:
"""
분봉 데이터 검색
Args:
asset_code(str): 종목코드
Returns:
pd.DataFrame: 분봉 데이터
"""
fetching = True
tr_cont_key = ""
cts_date = ""
cts_time = ""
result = []
while fetching:
r = self.stock_api.get_stock_chart_minutes(
asset_code,
1,
500,
0,
dtm.date.today(),
dtm.date.today(),
cts_date,
cts_time,
tr_cont_key=tr_cont_key,
)
tr_cont = r["tr_cont"]
tr_cont_key = r["tr_cont_key"]
output1 = r["output1"]
cts_date = output1.get("cts_date", "")
cts_time = output1.get("cts_time", "")
output2 = r["output2"]
output2.reverse()
for item in output2:
if not item:
continue
datetime = dtm.datetime.strptime(f"{item['date']} {item['time']}", "%Y%m%d %H%M%S")
result.append(
{
"time": datetime,
"open": item["open"],
"high": item["high"],
"low": item["low"],
"close": item["close"],
"volume": item["jdiff_vol"],
}
)
if tr_cont == "N" or (cts_date == "" and cts_time == ""):
fetching = False
df = pd.DataFrame(result)
df["time"] = pd.to_datetime(df["time"], format="%Y-%m-%d %H:%M:%S")
df.set_index("time", inplace=True)
return df
[docs]
def get_price(self, asset_code: str) -> dict:
"""
주식 현재 가격 조회
Args:
asset_code(str): 종목코드
Returns:
dict: 현재 가격 정보
- code (str): 종목 코드
- current_price (int): 현재 가격
- volume (int): 거래량
- open_price (int): 시가
- high_price (int): 고가
- low_price (int): 저가
- max_price (int): 상한가
- min_price (int): 하한가
- diff (int): 전일 대비 가격 변화
- diff_rate (float): 전일 대비 가격 변화율
"""
r = self.stock_api.get_price(asset_code)
data = r["output"]
result = {
"code": asset_code,
"current_price": data["price"],
"volume": data["volume"],
"open_price": data["open"],
"high_price": data["high"],
"low_price": data["low"],
"max_price": data["uplmtprice"],
"min_price": data["dnlmtprice"],
"diff": data["change"],
"diff_rate": float(data["diff"]),
}
return result
[docs]
def get_price_for_multiple_stock(self, asset_codes: Set[str]) -> pd.DataFrame:
"""
여러 종목의 현재 가격 조회
Args:
asset_codes(Set[str]): 종목 코드 리스트
Returns:
pd.DataFrame: 현재 가격 정보
- code (str): 종목 코드
- current_price (int): 현재 가격
- volume (int): 거래량
- open_price (int): 시가
- high_price (int): 고가
- low_price (int): 저가
- diff (int): 전일 대비 가격 변화
- diff_rate (float): 전일 대비 가격 변화율
"""
r = get_all_last_trades()
result = []
for item in r:
if item["shcode"] in asset_codes:
result.append(
{
"code": item["shcode"],
"current_price": item["price"],
"volume": item["volume"],
"open_price": item["open"],
"high_price": item["high"],
"low_price": item["low"],
"diff": item["change"],
"diff_rate": round(item["drate"], 2),
}
)
df = pd.DataFrame(result)
df.set_index("code", inplace=True)
return df
[docs]
@with_mock()
def create_order(
self,
asset_code: str,
side: OrderSide,
quantity: int,
order_type: OrderType = OrderType.MARKET,
price: int = 0,
) -> str:
"""
주문을 생성합니다.
Args:
asset_code (str): 종목 코드
side (OrderSide): 주문 방향
quantity (int): 주문 수량
order_type (OrderType): 주문 유형
price (int): 주문 가격 (지정가 주문일 경우에만 필요)
Returns:
str: 주문 번호
"""
def __get_ord_prc_ptn_code():
if order_type == OrderType.MARKET:
return "03"
elif order_type == OrderType.LIMIT:
return "00"
elif order_type == OrderType.LIMIT_IOC:
return "00"
elif order_type == OrderType.LIMIT_FOK:
return "00"
elif order_type == OrderType.MARKET_IOC:
return "03"
elif order_type == OrderType.MARKET_FOK:
return "03"
else:
raise ValueError("지원하지 않는 주문 유형입니다.")
def __get_ord_cndi_tp_code() -> int:
if order_type in [OrderType.MARKET, OrderType.LIMIT]:
return 0
elif order_type in [OrderType.MARKET_IOC, OrderType.LIMIT_IOC]:
return 1
elif order_type in [OrderType.MARKET_FOK, OrderType.LIMIT_FOK]:
return 2
else:
raise ValueError("지원하지 않는 주문 유형입니다.")
def __get_bns_tp_code() -> int:
if side == OrderSide.BUY:
return 2
elif side == OrderSide.SELL:
return 1
else:
raise ValueError("지원하지 않는 주문 방향입니다.")
r = self.stock_api.create_order(
asset_code,
quantity,
price,
__get_bns_tp_code(),
__get_ord_prc_ptn_code(),
ord_cndi_tp_code=__get_ord_cndi_tp_code(),
)
self.logger.info(f"create_order result - {r['rsp_cd']} {r['rsp_msg']}")
if "OrdNo" not in r["output2"]:
self.logger.error(f"create_order failed. {asset_code} {side} {quantity} {order_type} {price} - {r['rsp_cd']} {r['rsp_msg']}")
return r["output2"].get("OrdNo", None)
[docs]
@with_mock()
def update_order(
self,
org_order_no: str,
asset_code: str,
order_type: OrderType,
price: int,
quantity: int = 0,
) -> str:
"""
주문을 수정합니다.
Args:
org_order_no (str): 원주문번호
order_type (OrderType): 주문 유형
price (int): 정정 가격
quantity (int): 주문 수량
Returns:
str: 주문 번호
"""
def __get_ord_prc_ptn_code():
if order_type == OrderType.MARKET:
return "03"
elif order_type == OrderType.LIMIT:
return "00"
elif order_type == OrderType.LIMIT_IOC:
return "00"
elif order_type == OrderType.LIMIT_FOK:
return "00"
elif order_type == OrderType.MARKET_IOC:
return "03"
elif order_type == OrderType.MARKET_FOK:
return "03"
else:
raise ValueError("지원하지 않는 주문 유형입니다.")
def __get_ord_cndi_tp_code() -> int:
if order_type in [OrderType.MARKET, OrderType.LIMIT]:
return 0
elif order_type in [OrderType.MARKET_IOC, OrderType.LIMIT_IOC]:
return 1
elif order_type in [OrderType.MARKET_FOK, OrderType.LIMIT_FOK]:
return 2
else:
raise ValueError("지원하지 않는 주문 유형입니다.")
r = self.stock_api.update_order(
org_order_no,
asset_code,
quantity,
__get_ord_prc_ptn_code(),
__get_ord_cndi_tp_code(),
price,
)
return r["output2"]["OrdNo"]
[docs]
@with_mock()
def cancel_order(self, org_order_no: str, asset_code: str, quantity: int) -> str:
"""
주문을 취소합니다.
Args:
org_order_no (str): 원주문번호
quantity (int): 취소 수량
"""
r = self.stock_api.cancel_order(org_order_no, asset_code, quantity)
return r["output2"]["OrdNo"]
[docs]
def get_pending_orders(self) -> List[StockOrder]:
"""
미체결 주문을 조회합니다.
Returns:
List[StockOrder]: 미체결 주문 리스트
"""
fetching = True
tr_cont_key = ""
cts_ordno = ""
result: List[StockOrder] = []
asset_codes = set()
def __find_req_type(medosu: str) -> OrderRequestType:
if "취소" in medosu:
return OrderRequestType.CANCEL
elif "정정" in medosu:
return OrderRequestType.MODIFY
else:
return OrderRequestType.NEW
while fetching:
r = self.stock_api.get_order_list(
chegb="2",
cts_ordno=cts_ordno,
tr_cont_key=tr_cont_key,
)
output1 = r["output1"]
output2 = r["output2"]
tr_cont_key = r["tr_cont_key"]
cts_ordno = output1.get("cts_ordno", "")
for item in output2:
asset_code = item["expcode"]
ord_time = dtm.datetime.strptime(item["ordtime"], "%H%M%S%f")
order = StockOrder(
order_no=str(item["ordno"]),
asset_code=asset_code,
side=OrderSide.SELL if "매도" in item["medosu"] else OrderSide.BUY,
price=int(item["price"]),
filled_price=item["cheprice"],
current_price=0,
quantity=int(item["qty"]),
filled_quantity=item["cheqty"],
pending_quantity=item["ordrem"],
order_time=dtm.datetime.combine(dtm.date.today(), ord_time.time()),
org_order_no=item.get("orgordno", None),
is_pending=True,
req_type=__find_req_type(item["medosu"]),
)
asset_codes.add(asset_code)
result.append(order)
if cts_ordno == "":
fetching = False
if len(asset_codes) > 0:
price_df = self.get_price_for_multiple_stock(asset_codes)
for order in result:
order.current_price = int(price_df.loc[order.asset_code, "current_price"])
return result
[docs]
def get_today_order_history(self) -> List[StockOrder]:
"""
오늘 주문 내역을 조회합니다.
Returns:
List[StockOrder]: 오늘 주문 내역 리스트
"""
fetching = True
tr_cont_key = ""
cts_ordno = ""
result: List[StockOrder] = []
asset_codes = set()
detail_info_dict = self._get_today_order_details()
while fetching:
r = self.stock_api.get_order_list(
cts_ordno=cts_ordno,
tr_cont_key=tr_cont_key,
)
output1 = r["output1"]
output2 = r["output2"]
tr_cont_key = r["tr_cont_key"]
cts_ordno = output1.get("cts_ordno", "")
for item in output2:
asset_code = item["expcode"]
ord_time = dtm.datetime.strptime(item["ordtime"], "%H%M%S%f")
order_no = str(item["ordno"])
order = StockOrder(
order_no=order_no,
asset_code=asset_code,
side=detail_info_dict[order_no]["side"],
price=int(item["price"]),
filled_price=item["cheprice"],
current_price=0,
quantity=int(item["qty"]),
filled_quantity=item["cheqty"],
pending_quantity=item["ordrem"],
order_time=dtm.datetime.combine(dtm.date.today(), ord_time.time()),
org_order_no=item.get("orgordno", None),
is_pending=(not item["status"] in ["완료", "체결", "취소확인"]),
req_type=detail_info_dict[order_no]["req_type"],
order_type=detail_info_dict[order_no]["order_type"],
)
asset_codes.add(asset_code)
result.append(order)
if cts_ordno == "":
fetching = False
if len(asset_codes) > 0:
price_df = self.get_price_for_multiple_stock(asset_codes)
for order in result:
order.current_price = int(price_df.loc[order.asset_code, "current_price"])
return result
def _get_today_order_details(self) -> dict:
r = self.stock_api.get_deposit_order_list()
result = {}
def __find_req_type(mrc_tp_code: str) -> OrderRequestType:
if mrc_tp_code == "2":
return OrderRequestType.CANCEL
elif mrc_tp_code == "1":
return OrderRequestType.MODIFY
else:
return OrderRequestType.NEW
def __find_side(bns_tp_code: str) -> OrderSide:
if bns_tp_code == "2":
return OrderSide.BUY
elif bns_tp_code == "1":
return OrderSide.SELL
else:
raise ValueError("지원하지 않는 주문 방향입니다.")
def __find_order_type(ord_prc_ptn_code: str, ord_cndi_tp_code: str) -> OrderType:
if ord_prc_ptn_code == "00":
if ord_cndi_tp_code == "0":
return OrderType.LIMIT
elif ord_cndi_tp_code == "1":
return OrderType.LIMIT_IOC
elif ord_cndi_tp_code == "2":
return OrderType.LIMIT_FOK
elif ord_prc_ptn_code == "03":
if ord_cndi_tp_code == "0":
return OrderType.MARKET
elif ord_cndi_tp_code == "1":
return OrderType.MARKET_IOC
elif ord_cndi_tp_code == "2":
return OrderType.MARKET_FOK
else:
raise ValueError("지원하지 않는 주문 유형입니다.")
for item in r["output3"]:
order_no = str(item["OrdNo"])
result[order_no] = {
"side": __find_side(item["BnsTpCode"]),
"req_type": __find_req_type(item["MrcTpCode"]),
"order_type": __find_order_type(item["OrdprcPtnCode"], item["OrdCndiTpCode"]),
}
return result
[docs]
def get_tick_data(self, asset_code) -> list:
fetching = True
tr_cont_key = ""
tr_cont = ""
cts_time = ""
result = []
while fetching:
r = self.stock_api.get_stock_tick_data_today_yesterday(
daygb=0,
timegb=1,
shcode=asset_code,
endtime=dtm.datetime.now().time(),
cts_time=cts_time,
tr_cont_key=tr_cont_key,
)
output1 = r["output1"]
output2 = r["output2"]
cts_time = output1.get("cts_time", "")
tr_cont_key = output1.get("tr_cont_key", "")
for item in output2:
result.append(item)
if tr_cont == "N" or cts_time == "":
fetching = False
else:
tr_cont = "Y"
self.logger.debug(f"get_tick_data: sleep 1 for next page request. current length: {len(result)} cts_time={cts_time} tr_cont_key={tr_cont_key}")
time.sleep(1)
return result
[docs]
async def listen_order_event(self, stop_event: asyncio.Event) -> AsyncGenerator:
"""
계좌 주문 이벤트를 수신하는 메서드
Args:
stop_event (asyncio.Event): 종료 이벤트
Yields:
OrderEvent: 주문 이벤트
"""
async for data in self.stock_api.listen_order_event(stop_event):
order_event = self._map_order_event(data)
if order_event is not None:
yield order_event
def _map_order_event(self, data: dict) -> OrderEvent:
if data.get("ordchegb") == "03" and data["event_type"] == "accepted":
# 주문 취소 요청에 대한 접수 이벤트는 무시
return
if data.get("ordchegb") == "02" and data["event_type"] == "accepted":
# 주문 정정 요청에 대한 접수 이벤트는 무시
return
def __find_order_type(data: dict) -> str:
value_map = {
"00": OrderType.LIMIT,
"03": OrderType.MARKET,
"05": OrderType.LIMIT_CONDITIONAL,
"06": OrderType.BEST_PRICE,
"07": OrderType.PRIMARY_PRICE,
"13": OrderType.MARKET_IOC,
"16": OrderType.BEST_PRICE_IOC,
"26": OrderType.BEST_PRICE_FOK,
}
if "ordprcptncode" in data:
code = data["ordprcptncode"]
return value_map.get(code, code)
elif "etfhogagb" in data:
code = data["etfhogagb"]
return value_map.get(code, code)
def __find_status(data: dict) -> str:
event_type = data["event_type"]
if event_type == "accepted" or event_type == "updated":
return "accepted"
elif event_type == "executed":
return "executed"
elif event_type == "cancelled":
return "cancelled"
elif event_type == "denied":
return "rejected"
else:
return event_type
asset_code = ""
if "shtcode" in data:
asset_code = data["shtcode"]
elif "shtnIsuno" in data:
asset_code = data["shtnIsuno"]
if asset_code[0] in ["A", "J"]:
asset_code = asset_code[1:]
account_no = data["accno"]
# 취소 주문의 경우 원주문번호를 사용
order_no = str(data["orgordno"] if data["event_type"] == "cancelled" else data["ordno"])
side = OrderSide.BUY if data["bnstp"] == "2" else OrderSide.SELL
order_type = __find_order_type(data)
quantity = int(data["ordqty"])
price = int(data["ordprice"]) if "ordprice" in data else int(data["ordprc"])
event_type = __find_status(data)
filled_quantity = int(data["execqty"]) if data.get("execqty", "").strip() != "" else None
filled_price = int(data["execprc"]) if data.get("execprc", "").strip() != "" else None
pending_quantity = int(data["unercqty"]) if data.get("unercqty", "").strip() != "" else None
filled_time = data.get("rcptexectime")
org_order_no = str(data.get("orgordno")) if data["event_type"] != "cancelled" else None
self.logger.debug(f"order_event: order_no={order_no} / filled={filled_quantity} order={quantity} pending={pending_quantity} event_type={event_type}")
if filled_time is not None:
t = dtm.datetime.strptime(filled_time, "%H%M%S%f").time()
filled_time = dtm.datetime.combine(dtm.date.today(), t)
order_event = OrderEvent(
asset_code,
order_no,
side,
order_type,
quantity,
price,
event_type,
account_no,
filled_quantity,
filled_price,
filled_time,
org_order_no,
)
return order_event