Source code for pyqqq.data.realtime

import asyncio
import datetime as dtm
import inspect
import json
import os
from typing import List, Union

import websockets

import pyqqq.config as c
from pyqqq.datatypes import DataExchange, Exchange
from pyqqq.utils.api_client import raise_for_status, send_request
from pyqqq.utils.logger import get_logger
from pyqqq.utils.singleton import singleton
from pyqqq.utils.market_schedule import is_trading_time, get_next_trading_day, get_market_schedule

logger = get_logger(__name__)


[docs] def get_all_last_trades( codes: List[str] = None, exchange: Union[str, DataExchange] = "KRX", ): """ 모든 종목의 최근 체결 정보를 반환합니다. Note: - KRX, UN 거래소의 최근 체결 정보를 조회할 수 있습니다. NXT 거래소는 지원되지 않습니다. Args: codes (List[str], optional): 종목 코드 리스트. 모든 종목의 체결 정보를 반환하려면 None을 전달합니다. exchange (str): 거래소. 기본값은 "KRX"입니다. 지원하는 거래소는 "KRX", "UN" 입니다. Returns: list: - dict: - sign (int): 전일대비구분 - change (int): 전일대비가격 - drate (float): 전일대비등락율 - price (int): 체결가 - open (int): 시가 - high (int): 고가 - low (int): 저가 - cgubun (str): 체결구분 - cvolume (int): 체결량 - volume (int): 누적거래량 - value (int): 누적거래대금(백만) - mdvolume (int): 매도체결수량 - mdchecnt (int): 매도체결건수 - msvolume (int): 매수체결수량 - mschecnt (int): 매수체결건수 - cpower (float): 체결강도 - offerho (int): 매도호가 - bidho (int): 매수호가 - status (str): 장정보 - jnilvolume (int): 전일동시간대거래량 - shcode (str): 종목코드 - exchname (str): 거래소명 - date (str): 체결일자 - update_time (str): 업데이트 시간 Raises: ValueError: 지원하지 않는 거래소 코드가 전달된 경우. """ exchange = DataExchange.validate(exchange) if exchange == DataExchange.NXT: raise ValueError("NXT 거래소는 지원되지 않습니다.") params = {"exchange": exchange.value} if codes: params["codes"] = ",".join(codes) if isinstance(codes, list) else codes r = send_request("GET", f"{c.PYQQQ_API_URL}/domestic-stock/trades", params=params) raise_for_status(r) data = r.json() result = [data[k] for k in data.keys()] return result
def get_all_last_orderbooks(): """ 모든 종목의 최근 호가 정보를 반환합니다. Returns: list: - dict: - hotime (str): 호가 시간 (HHMMSS) - date (str): 호가 날짜 (YYYYMMDD) - bidho[1~10] (int): 매도 호가 - bidrem[1~10] (int): 매도 호가 잔량 - totbidrem (int): 총 매수 호가 잔량 - offerho[1~10] (int): 매수 호가 - offerrem[1~10] (int): 매수 호가 잔량 - totofferrem (int): 총 매수 호가 잔량 - donsigubun (str): 동시 호가 구분 - alloc_gubun (str or None): 배분 적용 구분 - volume (int): 누적 거래량 - shcode (str): 종목 코드 """ r = send_request("GET", f"{c.PYQQQ_API_URL}/domestic-stock/orderbooks") raise_for_status(r) data = r.json() result = [data[k] for k in data.keys()] return result @singleton class TickEventListener: """ 틱 데이터를 실시간으로 구독하는 서버와 websocket으로 연결되오, 원하는 조건이 충족되었을 때 콜백을 호출하는 클래스입니다. 객체를 생성 후, start() 하면 웹소켓이 연결됩니다. start() 전, 후에 관계없이 append_event()로 이벤트를 추가할 수 있습니다. Args: client_id (str): 클라이언트 ID. 고유한 값이어야 합니다. url (str): 웹소켓 서버 주소. TICKEVENT_URL 환경변수로 설정할 수 있습니다. config (dict): 설정값 event_add_delay (float): 이벤트 추가 딜레이 시간. 기본값은 1초입니다. """ LOG_TAG = "[TickEventListener]" def __init__(self, client_id, url=None, config={}): self.ws_connected = False self.events = {} self.pending_event_ids = [] self.event_add_delay = 1.0 self.health_ping_delay = 60 self.retry_cnt = 0 self.ws = None self.tot_retry_cnt = 0 self.reconnect_datetime = None logger.info(f"{self.LOG_TAG}init") if "event_add_delay" in config: self.event_add_delay = config["event_add_delay"] if "health_ping_delay" in config: self.health_ping_delay = config["health_ping_delay"] now_int_str = str(int(dtm.datetime.now().timestamp())) if client_id is not None: api_key = os.getenv("PYQQQ_API_KEY") or "" self.client_id = client_id + api_key + "_" + now_int_str else: self.client_id = api_key + "_" + now_int_str self.url = url if self.url is None: self.url = os.getenv("TICKEVENT_URL") or c.PYQQQ_EVENT_WS_URL async def start(self): """ 틱데이터 이벤트 리스너를 시작합니다 """ self.tasks = [asyncio.create_task(self.connect_ws()), asyncio.create_task(self.health_ping())] async def stop(self): """ 틱데이터 이벤트 리스너를 종료합니다. """ for t in self.tasks: t.cancel() await asyncio.gather(*self.tasks) async def connect_ws(self): """ 웹소켓 서버에 연결하여 틱데이터 이벤트를 수신합니다. """ try: async with websockets.connect(self.url) as websocket: self.retry_cnt = 0 self.ws_connected = True self.ws = websocket await self.register(websocket) asyncio.create_task(self.process_events_appending()) try: async for message in websocket: # logger.debug(f"{self.LOG_TAG}message: {message}") await self.handle_response(message) logger.warning(f"{self.LOG_TAG}Websocket connection closed by server. code: {websocket.close_code}, reason: {websocket.close_reason}") except websockets.exceptions.ConnectionClosed: logger.info(f"{self.LOG_TAG}Connection closed") if websocket.close_code == 4409: logger.info(f"{self.LOG_TAG}Connection closed by server with code: {websocket.close_code}, reason: {websocket.close_reason}") self.move_events_to_pending() if is_trading_time(dtm.datetime.now()): logger.info("try reconnect next minute") self.reconnect_datetime = dtm.datetime.now() + dtm.timedelta(minutes=1) else: logger.info("not trading time") next_trading_day = get_next_trading_day(dtm.datetime.now().date()) next_market_schedule = get_market_schedule(next_trading_day, Exchange.NXT) next_start_time = dtm.datetime.combine(next_trading_day, next_market_schedule.open_time) logger.info(f"reconnect at {next_start_time}") self.reconnect_datetime = next_start_time await asyncio.sleep(self.reconnect_datetime.timestamp() - dtm.datetime.now().timestamp()) await self.connect_ws() else: await self.on_connect_failed() except Exception as e: logger.info(f"{self.LOG_TAG}Connection failed. error: {e}") await self.on_connect_failed() finally: logger.info(f"{self.LOG_TAG}Websocket connection closed") self.ws_connected = False async def health_ping(self): """ 웹소켓 서버에 헬스체크를 보냅니다. """ if self.ws_connected: await self.ws.send(json.dumps({"type": "ping", "client_id": self.client_id})) await asyncio.sleep(self.health_ping_delay) async def on_connect_failed(self): self.ws_connected = False self.move_events_to_pending() self.retry_cnt += 1 sleep_time = min(60, self.retry_cnt * 2) logger.info(f"{self.LOG_TAG}retry cnt: {self.retry_cnt}. sleep_time: {sleep_time} sec") await asyncio.sleep(self.retry_cnt * 2) await self.connect_ws() def append_event(self, ticker, event_id, price, once, side, price_comparison, listen_callback, is_unified=False): """ 사용자로부터 틱데이터 이벤트 추가 요청을 등록합니다. 즉시 처리되진 않습니다. Args: ticker (str): 종목 코드 event_id (str): 이벤트 ID. 고유한 값이어야 합니다. price (int): 가격 once (bool): 한번만 실행할지 여부 side (int | OrderSide): 매도/매수 구분 (0: all, 1: sell[cgubun: "-"], 2: buy[cgubun: "+"]) default: 0 price_comparison (str): 가격 비교 방식 ("<", "<=", "==", ">=", ">") listen_callback (callable): 이벤트 발생시 호출할 콜백 함수 is_unified (bool): 통합 이벤트인지 여부. 기본값은 False입니다. """ self.events[event_id] = TickEvent( ticker=ticker, event_listener=self, client_id=self.client_id, event_id=event_id, price=price, once=once, side=side, price_comparison=price_comparison, listen_callback=listen_callback, is_unified=is_unified ) self.pending_event_ids.append(event_id) async def append_event_async(self, ticker, event_id, price, once, side, price_comparison, listen_callback, is_unified=False): """ 사용자로부터 틱데이터 이벤트 추가 요청을 등록합니다. 비동기로 가능한 한 즉시 처리됩니다. """ if self.ws_connected: self.events[event_id] = TickEvent( ticker=ticker, event_listener=self, client_id=self.client_id, event_id=event_id, price=price, once=once, side=side, price_comparison=price_comparison, listen_callback=listen_callback, is_unified=is_unified ) logger.info(f"{self.LOG_TAG}append_event_async {event_id}") await self.send_subscribe(event_id) else: logger.info(f"{self.LOG_TAG}append_event_async {event_id} failed") self.append_event(ticker, event_id, price, once, side, price_comparison, listen_callback, is_unified) async def close_event(self, event_id): """ 사용자로부터 틱데이터 이벤트 삭제 요청을 등록합니다. 비동기로 가능한 한 즉시 처리됩니다. """ await self.send_unsubscribe(event_id) self.remove_event(event_id) async def send_subscribe(self, event_id): """ 틱데이터 이벤트 구독 요청을 보냅니다. """ event = self.events[event_id] if event is not None and not event.check_removed(): dumpdata = self.events[event_id].get_subscribe_dump_data() await self.ws.send(dumpdata) async def send_unsubscribe(self, event_id): """ 틱데이터 이벤트 구독 해지 요청을 보냅니다. """ event = self.events[event_id] if event is not None and not event.check_removed(): dumpdata = event.get_unsubscribe_dump_data() await self.ws.send(dumpdata) event.remove_event() def remove_event(self, event_id): """ 특정 이벤트를 리스너에서 삭제합니다. 사용자가 직접 호출하는건 권장하지 않습니다. """ if event_id in self.events: event = self.events[event_id] del self.events[event_id] del event if event_id in self.pending_event_ids: self.pending_event_ids.remove(event_id) async def process_events_appending(self): """ 사용자로부터 받았던 틱데이터 이벤트 요청을 모아서 처리합니다. 웹소켓 연결 해제 후 재연결시에서도 사용됩니다. """ self.tot_retry_cnt += 1 cur_retry_cnt = self.tot_retry_cnt logger.info(f"{self.LOG_TAG} process_events_appending") while True: if cur_retry_cnt < self.tot_retry_cnt: logger.info(f"{self.LOG_TAG} old process task exit. {cur_retry_cnt}, {self.tot_retry_cnt}") return if self.ws_connected: res = [] # 날짜가 바뀌면 이벤트를 모두 날립니다 keys = list(self.events.keys()) for event_id in keys: if self.events[event_id].check_day_changed(): await self.close_event(event_id) cur_pending_event_ids = list(self.pending_event_ids) for pending_event_id in cur_pending_event_ids: if not self.events[pending_event_id].check_removed(): await self.send_subscribe(pending_event_id) res.append(pending_event_id) self.pending_event_ids.remove(pending_event_id) if len(res) > 0 or len(cur_pending_event_ids) > 0: logger.debug(f"{self.LOG_TAG} try connect events:{cur_pending_event_ids}, connected: {res}") else: logger.info(f"{self.LOG_TAG} not connected. stop process_events_appending") return await asyncio.sleep(self.event_add_delay) def move_events_to_pending(self): self.pending_event_ids = list(self.events.keys()) async def register(self, ws): """ 서버에 유저를 등록합니다. """ data = json.dumps({"type": "register", "client_id": self.client_id}) logger.info(f"{self.LOG_TAG}registering. data:{data}") await ws.send(data) async def handle_response(self, response): """ 서버로부터 받은 틱데이터 이벤트를 처리합니다. """ data = json.loads(response) if "event_id" not in data: # res_data = data["data"] # if res_data["message"] == "pong": # logger.debug(f"{self.LOG_TAG} pong response") return event_id = data["event_id"] if event_id in self.events: event = self.events[event_id] if event.check_removed(): self.events.remove(event_id) return else: await event.handle_tick_data(data) else: logger.warning(f"{self.LOG_TAG}event_id {event_id} not found") class TickEvent: """ TickEventListener 클래스에서 사용하는 틱데이터 이벤트 클래스입니다. """ LOG_TAG = "[TickEvent]" CLOSE_TIME = dtm.time(18, 0) def __init__(self, event_listener, ticker, client_id, event_id, price, once, side, price_comparison, listen_callback, is_unified=False): logger.info(f"{self.LOG_TAG} create {event_id}, {ticker}, {price}, {price_comparison}") self.removed = False self.event_listener = event_listener self.ticker = ticker self.client_id = client_id self.event_id = event_id self.price = price self.once = once self.side = side self.price_comparison = price_comparison self.listen_callback = listen_callback self.is_unified = is_unified # 통합 이벤트인지 여부 self.date = dtm.datetime.now().strftime("%Y%m%d") def get_subscribe_dump_data(self): logger.info(f"{self.LOG_TAG} subscribe {self.event_id}, {self.ticker}, {self.price}, {self.price_comparison}, {self.side}") ticker = "U" + self.ticker if self.is_unified else self.ticker return json.dumps({"type": "subscribe", "ticker": ticker, "client_id": self.client_id, "event_id": self.event_id, "price": self.price, "once": self.once, "side": self.side, "price_comparison": self.price_comparison}) def get_unsubscribe_dump_data(self): ticker = "U" + self.ticker if self.is_unified else self.ticker return json.dumps({"type": "unsubscribe", "ticker": ticker, "client_id": self.client_id, "event_id": self.event_id}) async def handle_tick_data(self, data): if self.listen_callback: if inspect.iscoroutinefunction(self.listen_callback): await self.listen_callback(data) else: self.listen_callback(data) if self.once: self.remove_event() self.event_listener.remove_event(self.event_id) def remove_event(self): self.removed = True def check_removed(self): return self.removed def check_day_changed(self): now = dtm.datetime.now() if self.date != now.strftime("%Y%m%d"): return True else: if now.time() >= self.CLOSE_TIME: return True else: return False