from dotenv import load_dotenv
from pyqqq.brokerage.ebest.simple import EBestSimpleDomesticStock
from pyqqq.brokerage.kis.simple import KISSimpleDomesticStock
from pyqqq.datatypes import *
from pyqqq.utils.kvstore import KVStore
from pyqqq.utils.logger import get_logger
from pyqqq.brokerage.tracker import TradingTracker
load_dotenv()
[docs]
class PositionClassifier:
"""주문, 포지션의 주체를 분류하기 위한 클래스입니다.
자동 거래 프로그램에서 사용중인 계좌로 직접 HTS, MTS 등으로 거래를 하는 유저를 위해 만들어진 분류기 입니다.
"""
logger = get_logger(__name__ + ".PositionClassifier")
DEF_DIRECT_POSITION_KEY = "direct_position"
DEF_DIRECT_ORDER_KEY = "direct_order"
DEF_AUTO_POSITION_KEY = "auto_position"
DEF_AUTO_ORDER_KEY = "auto_order"
DEF_TAG_AUTO_ORDER_KEY = "tag_auto_order"
[docs]
def __init__(
self,
simple_data_api: EBestSimpleDomesticStock | KISSimpleDomesticStock,
kv_store_collection,
default_type="auto",
):
"""OrderClassifier 초기화 메서드입니다
Args:
data_api (KISSimpleDomesticStock):
시세 조회 및 시장 데이터 조회를 위한 API 인터페이스
kv_store_collection (String):
kv_store에서 사용할 콜렉션
default_type (String):
태깅되지 않은 종목을 무엇으로 간주할지.
'auto', 'direct' 둘 중 한 값을 가짐
Note:
생성된 인스턴스는 즉시 거래가 가능한 상태가 되며,
모든 거래 관련 작업은 자동으로 로깅됩니다.
"""
# key: asset_code(str), value: quantity(int)
self.auto_positions = {}
self.direct_positions = {}
# key: order_no(str), value: remain_quantity(int)
self.auto_pending_orders = {}
self.direct_pending_orders = {}
# list of order_no(str)
self.tagged_auto_pending_orders = []
self.kv_store = KVStore(kv_store_collection)
self.api = simple_data_api
if default_type in ['auto', 'direct']:
self.default_type = default_type
else:
self.default_type = 'auto'
self.tracker = TradingTracker(simple_data_api)
self.tracker.add_pending_order_update_callback(self._on_pending_order_update)
self.set_initial_position()
[docs]
def set_initial_position(self) -> None:
"""코드 시작시 현재 account에 있는 position과 kv_store를 참고해서
기존 포지션을 분류한다.
Args:
"""
cur_pos = self.api.get_positions()
kv_auto_pos = self.kv_store.get(self.DEF_AUTO_POSITION_KEY) or {}
kv_direct_pos = self.kv_store.get(self.DEF_DIRECT_POSITION_KEY) or {}
self.logger.info(f"set_initial_position. kv_auto_pos: {kv_auto_pos}, kv_direct_pos: {kv_direct_pos}")
for pos in cur_pos:
asset_code = pos.asset_code
quantity = pos.quantity
if kv_auto_pos is not None and asset_code in kv_auto_pos:
self.auto_positions[asset_code] = min([quantity, kv_auto_pos[asset_code]])
quantity -= self.auto_positions[asset_code]
if asset_code in kv_direct_pos:
self.direct_positions[asset_code] = min(quantity, kv_direct_pos[asset_code])
quantity -= self.direct_positions[asset_code]
if quantity > 0:
if self.default_type == "auto":
if asset_code in self.auto_positions:
self.auto_positions[asset_code] += quantity
else:
self.auto_positions[asset_code] = quantity
elif self.default_type == "direct":
if asset_code in self.direct_positions:
self.direct_positions[asset_code] += quantity
else:
self.direct_positions[asset_code] = quantity
self.update_positions()
self.logger.info(f"set_initial_position.\npositions: {cur_pos}\nauto_positions: {self.auto_positions}\ndirect_positions: {self.direct_positions}")
[docs]
def set_initial_order(self):
cur_order = self.api.get_pending_orders()
kv_auto_orders = self.kv_store.get(self.DEF_AUTO_ORDER_KEY) or {}
kv_direct_orders = self.kv_store.get(self.DEF_DIRECT_ORDER_KEY) or {}
kv_auto_tag_orders = self.kv_store.get(self.DEF_TAG_AUTO_ORDER_KEY) or []
self.logger.info(f"set_initial_order. kv_auto_orders: {kv_auto_orders}, kv_direct_orders: {kv_direct_orders}")
for order in cur_order:
order_no = cur_order.order_no
quantity = cur_order.pending_quantity
if order_no in kv_auto_tag_orders:
self.tagged_auto_pending_orders.append(order_no)
if order_no in kv_auto_orders:
self.auto_pending_orders[order_no] = min(quantity, kv_auto_orders[order_no])
elif order_no in kv_direct_orders:
self.direct_pending_orders[order_no] = min(quantity, kv_direct_orders[order_no])
else:
if self.default_type == "auto":
self.auto_pending_orders[order_no] = quantity
elif self.default_type == "direct":
self.direct_pending_orders[order_no] = quantity
self.update_orders()
self.logger.info(f"set_initial_order.\orders: {cur_order} \nauto_pending_orders: {self.auto_pending_orders} \ndirect_pending_orders: {self.direct_pending_orders}")
[docs]
def update_positions(self) -> None:
self.kv_store.set(self.DEF_AUTO_POSITION_KEY, self.auto_positions)
self.kv_store.set(self.DEF_DIRECT_POSITION_KEY, self.direct_positions)
[docs]
def update_orders(self):
self.kv_store.set(self.DEF_AUTO_ORDER_KEY, self.auto_pending_orders)
self.kv_store.set(self.DEF_TAG_AUTO_ORDER_KEY, self.tagged_auto_pending_orders)
self.kv_store.set(self.DEF_DIRECT_ORDER_KEY, self.direct_pending_orders)
[docs]
def clear_orders(self) -> None:
"""
날 바뀌고 kv store에 남아있는 오더는 모두 유효지 않음
장 시작 전에 호출해주면 좋은 함수
"""
self.logger.info("clear_orders")
self.auto_pending_orders = {}
self.direct_pending_orders = {}
self.tagged_auto_pending_orders = []
self.update_orders()
[docs]
def get_sellable_quantity_by_auto(self, asset_code, quantity) -> int:
"""pyqqq로 매수했던 수량을 체크해서, 매도간에 pyqqq로 샀던게 몇 주인지 확인하는 함수
Args:
asset_code (String): 종목코드 6자리
quantity (int): 체크 하려는 수량.
만약 프로그램으로 5주, 손으로 5주 매수해서 10주 들고있는 상황에서 quantity가 8이면
5를 return 한다.
"""
ret = 0
positions = self.api.get_positions()
for position in positions:
if asset_code == position.asset_code:
remained = min(quantity, position.sell_possible_quantity)
if asset_code in self.auto_positions:
diff = min(self.auto_positions[asset_code], remained)
ret += diff
remained -= diff
if self.default_type == "auto":
if asset_code in self.direct_positions:
diff = min(self.direct_positions[asset_code], remained)
remained -= diff
ret += remained
break
return ret
# 매수, 매도 할때 명시적으로 호출해줘야함
[docs]
def tagging_order_auto(self, order_no: str):
self.logger.info(f"tagging_order_auto. order_no: {order_no}")
if order_no in self.auto_pending_orders:
self.logger.info(f"tagging_order_auto. {order_no} already exist")
return
elif order_no in self.direct_pending_orders:
self.logger.info(f"tagging_order_auto. {order_no} has been in direct_pending_orders")
self.auto_pending_orders[order_no] = self.direct_pending_orders[order_no]
del self.direct_pending_orders[order_no]
elif order_no not in self.tagged_auto_pending_orders:
self.logger.info(f"tagging_order_auto. {order_no} is tagged.")
self.tagged_auto_pending_orders.append(order_no)
self.update_orders()
def _on_pending_order_update(self, status, order: StockOrder):
"""Tracker를 통해
TradingTracker의 add_pending_order_update_callback 함수에 인자로 넣으면 정상 동작함.
"""
asset_code = order.asset_code
order_no = order.order_no
pending_quantity = order.pending_quantity
self.logger.info(f"_on_pending_order_update. status: {status} order: {order}")
# 주문 처리시
if order.side == OrderSide.BUY:
if status in ["partial", "completed"]:
if order_no in self.auto_pending_orders:
prev_filled_quantity = order.quantity - self.auto_pending_orders[order_no]
filled_quantity = order.filled_quantity - prev_filled_quantity
self._add_dict_with_key(self.auto_pending_orders, order_no, -filled_quantity)
self._add_dict_with_key(self.auto_positions, asset_code, filled_quantity)
elif order_no in self.direct_pending_orders:
prev_filled_quantity = order.quantity - self.direct_pending_orders[order_no]
filled_quantity = order.filled_quantity - prev_filled_quantity
self._add_dict_with_key(self.direct_pending_orders, order_no, -filled_quantity)
self._add_dict_with_key(self.direct_positions, asset_code, filled_quantity)
elif order_no in self.tagged_auto_pending_orders or self.default_type == "auto":
self.auto_pending_orders[order_no] = pending_quantity
self._add_dict_with_key(self.auto_positions, asset_code, order.filled_quantity)
elif self.default_type == "direct":
self.direct_pending_orders[order_no] = pending_quantity
self._add_dict_with_key(self.direct_positions, asset_code, order.filled_quantity)
# 주문 접수시
elif status in ["accepted"]:
if order_no in self.tagged_auto_pending_orders:
self.auto_pending_orders[order_no] = order.pending_quantity
else:
self.direct_pending_orders[order_no] = order.pending_quantity
elif status in ["cancelled"]:
if order_no in self.auto_pending_orders:
self.auto_pending_orders[order_no] -= order.quantity
elif order_no in self.direct_pending_orders:
self.direct_pending_orders[order_no] -= order.quantity
elif order.side == OrderSide.SELL:
if status in ["accepted"]:
# 매도시엔 보유 잔량을 주문 접수시 미리 차감한다.
if order_no in self.tagged_auto_pending_orders:
self.auto_pending_orders[order_no] = pending_quantity
remain = self._add_dict_with_key(self.auto_positions, asset_code, -pending_quantity)
if remain < 0:
self._add_dict_with_key(self.direct_positions, asset_code, remain)
else:
self.direct_pending_orders[order_no] = pending_quantity
remain = self._add_dict_with_key(self.direct_positions, asset_code, -pending_quantity)
if remain < 0:
self._add_dict_with_key(self.auto_positions, asset_code, remain)
elif status in ["cancelled"]:
# 주문 취소시엔 보유잔량을 복구한다.
if order_no in self.auto_pending_orders:
# order.quantity가 주문 취소한 수량임
self.auto_pending_orders[order_no] -= order.quantity
self._add_dict_with_key(self.auto_positions, asset_code, order.quantity)
elif order_no in self.direct_pending_orders:
self.direct_pending_orders[order_no] -= order.quantity
self._add_dict_with_key(self.direct_positions, asset_code, order.quantity)
elif status in ["partial"]:
# 부분 체결시엔 미체결분량으로 pending order 잔량을 업데이트한다.
if order_no in self.auto_pending_orders:
self.auto_pending_orders[order_no] = pending_quantity
elif order_no in self.direct_pending_orders:
self.direct_pending_orders[order_no] = pending_quantity
elif status in ["completed"]:
# 전체 체결시엔 메모리 정리를 위해 해당 주문번호를 비워준다. 진행하지 않아도 무방함
if order_no in self.auto_pending_orders:
del self.auto_pending_orders[order_no]
elif order_no in self.direct_pending_orders:
del self.direct_pending_orders[order_no]
# clear
self._clear_below_zero_key_value(self.auto_pending_orders, order_no)
self._clear_below_zero_key_value(self.direct_pending_orders, order_no)
self._clear_below_zero_key_value(self.auto_positions, asset_code)
self._clear_below_zero_key_value(self.direct_positions, asset_code)
self.update_orders()
self.update_positions()
[docs]
def print_current_status(self):
self.logger.info(
f"print_current_status. auto positions: {self.auto_positions} direct positions: {self.direct_positions}.\nauto orders: {self.auto_pending_orders} direct_orders: {self.direct_pending_orders}.\n tagged order_nos: {self.tagged_auto_pending_orders}"
)
[docs]
async def start(self):
return await self.tracker.start()
def _add_dict_with_key(self, target_dict, key, diff):
"""
target dict가 비어있든 아니든 적절히 diff를 더해주는 연산.
만약 연산 결과가 음수라면 그 음수값을 return하여 적절히 사용할 수 있도록 함
"""
if key in target_dict:
target_dict[key] += diff
else:
target_dict[key] = diff
if target_dict[key] < 0:
return target_dict[key]
else:
return 0
def _clear_below_zero_key_value(self, target_dict, key):
if key in target_dict and target_dict[key] <= 0:
del target_dict[key]