Source code for pyqqq.data.realtime

from typing import List
from pyqqq.utils.api_client import raise_for_status, send_request
from pyqqq.utils.logger import get_logger
from pyqqq.utils.singleton import singleton
import pyqqq.config as c
import asyncio
import os
import websockets
import json
import inspect
import datetime as dtm


logger = get_logger(__name__)


[docs] def get_all_last_trades(codes: List[str] = None): """ 모든 종목의 최근 체결 정보를 반환합니다. Args: codes (List[str], optional): 종목 코드 리스트. 모든 종목의 체결 정보를 반환하려면 None을 전달합니다. Returns: list: - dict: - chetime (str): 체결시간 - sign (str): 전일대비구분 - change (int): 전일대비가격 - drate (float): 전일대비등락율 - price (int): 체결가 - opentime (str): 시가시간 - open (int): 시가 - hightime (str): 고가시간 - high (int): 고가 - lowtime (str): 저가시간 - low (int): 저가 - cgubun (str): 체결구분 - cvolume (int): 체결량 - volume (int): 누적거래량 - value (int): 누적거래대금(백만) - mdvolume (int): 매도체결수량 - mdchecnt (int): 매도체결건수 - msvolume (int): 매수체결수량 - mschecnt (int): 매수체결건수 - cpower (float): 체결강도 - offerho (int): 매도호가 - bidho (int): 매수호가 - status (str): 장정보 - jnilvolume (int): 전일동시간대거래량 - shcode (str): 종목코드 """ params = None if codes: params = {"codes": ",".join(codes) if isinstance(codes, list) else codes} r = send_request("GET", f"{c.PYQQQ_API_URL}/domestic-stock/trades", params=params) raise_for_status(r) data = r.json() result = [data[k] for k in data.keys()] return result
def get_all_last_orderbooks(): """ 모든 종목의 최근 호가 정보를 반환합니다. Returns: list: - dict: - hotime (str): 호가 시간 (HHMMSS) - date (str): 호가 날짜 (YYYYMMDD) - bidho[1~10] (int): 매도 호가 - bidrem[1~10] (int): 매도 호가 잔량 - totbidrem (int): 총 매수 호가 잔량 - offerho[1~10] (int): 매수 호가 - offerrem[1~10] (int): 매수 호가 잔량 - totofferrem (int): 총 매수 호가 잔량 - donsigubun (str): 동시 호가 구분 - alloc_gubun (str or None): 배분 적용 구분 - volume (int): 누적 거래량 - shcode (str): 종목 코드 """ r = send_request("GET", f"{c.PYQQQ_API_URL}/domestic-stock/orderbooks") raise_for_status(r) data = r.json() result = [data[k] for k in data.keys()] return result @singleton class TickEventListener: """ 틱 데이터를 실시간으로 구독하는 서버와 websocket으로 연결되오, 원하는 조건이 충족되었을 때 콜백을 호출하는 클래스입니다. 객체를 생성 후, start() 하면 웹소켓이 연결됩니다. start() 전, 후에 관계없이 append_event()로 이벤트를 추가할 수 있습니다. Args: client_id (str): 클라이언트 ID. 고유한 값이어야 합니다. url (str): 웹소켓 서버 주소. TICKEVENT_URL 환경변수로 설정할 수 있습니다. config (dict): 설정값 event_add_delay (float): 이벤트 추가 딜레이 시간. 기본값은 1초입니다. """ LOG_TAG = "[TickEventListener]" def __init__(self, client_id, url=None, config={}): self.ws_connected = False self.events = {} self.pending_event_ids = [] self.event_add_delay = 1.0 self.health_ping_delay = 60 self.retry_cnt = 0 self.ws = None self.tot_retry_cnt = 0 logger.info(f"{self.LOG_TAG}init") if "event_add_delay" in config: self.event_add_delay = config["event_add_delay"] if "health_ping_delay" in config: self.health_ping_delay = config["health_ping_delay"] if client_id is not None: api_key = os.getenv("PYQQQ_API_KEY") or "" self.client_id = client_id + api_key else: self.client_id = api_key self.url = url if self.url is None: self.url = os.getenv("TICKEVENT_URL") or c.PYQQQ_EVENT_WS_URL async def start(self): """ 틱데이터 이벤트 리스너를 시작합니다 """ self.tasks = [ asyncio.create_task(self.connect_ws()), asyncio.create_task(self.health_ping()) ] async def stop(self): """ 틱데이터 이벤트 리스너를 종료합니다. """ for t in self.tasks: t.cancel() await asyncio.gather(*self.tasks) async def connect_ws(self): """ 웹소켓 서버에 연결하여 틱데이터 이벤트를 수신합니다. """ try: async with websockets.connect(self.url) as websocket: self.retry_cnt = 0 self.ws_connected = True self.ws = websocket await self.register(websocket) asyncio.create_task(self.process_events_appending()) try: async for message in websocket: # logger.debug(f"{self.LOG_TAG}message: {message}") await self.handle_response(message) except websockets.exceptions.ConnectionClosed: logger.info(f"{self.LOG_TAG}Connection closed") await self.on_connect_failed() except Exception as e: logger.info(f"{self.LOG_TAG}Connection failed. error: {e}") await self.on_connect_failed() finally: self.ws_connected = False async def health_ping(self): """ 웹소켓 서버에 헬스체크를 보냅니다. """ if self.ws_connected: await self.ws.send(json.dumps({"type": "ping", "client_id": self.client_id})) await asyncio.sleep(self.health_ping_delay) async def on_connect_failed(self): self.ws_connected = False self.move_events_to_pending() self.retry_cnt += 1 sleep_time = min(60, self.retry_cnt * 2) logger.info(f"{self.LOG_TAG}retry cnt: {self.retry_cnt}. sleep_time: {sleep_time} sec") await asyncio.sleep(self.retry_cnt * 2) await self.connect_ws() def append_event(self, ticker, event_id, price, once, side, price_comparison, listen_callback): """ 사용자로부터 틱데이터 이벤트 추가 요청을 등록합니다. 즉시 처리되진 않습니다. """ self.events[event_id] = TickEvent(ticker=ticker, event_listener=self, client_id=self.client_id, event_id=event_id, price=price, once=once, side=side, price_comparison=price_comparison, listen_callback=listen_callback) self.pending_event_ids.append(event_id) async def append_event_async(self, ticker, event_id, price, once, side, price_comparison, listen_callback): """ 사용자로부터 틱데이터 이벤트 추가 요청을 등록합니다. 비동기로 가능한 한 즉시 처리됩니다. """ if self.ws_connected: self.events[event_id] = TickEvent(ticker=ticker, event_listener=self, client_id=self.client_id, event_id=event_id, price=price, once=once, side=side, price_comparison=price_comparison, listen_callback=listen_callback) logger.info(f"{self.LOG_TAG}append_event_async {event_id}") await self.send_subscribe(event_id) else: logger.info(f"{self.LOG_TAG}append_event_async {event_id} failed") self.append_event(ticker, event_id, price, once, side, price_comparison, listen_callback) async def close_event(self, event_id): """ 사용자로부터 틱데이터 이벤트 삭제 요청을 등록합니다. 비동기로 가능한 한 즉시 처리됩니다. """ await self.send_unsubscribe(event_id) self.remove_event(event_id) async def send_subscribe(self, event_id): """ 틱데이터 이벤트 구독 요청을 보냅니다. """ event = self.events[event_id] if event is not None and not event.check_removed(): dumpdata = self.events[event_id].get_subscribe_dump_data() await self.ws.send(dumpdata) async def send_unsubscribe(self, event_id): """ 틱데이터 이벤트 구독 해지 요청을 보냅니다. """ event = self.events[event_id] if event is not None and not event.check_removed(): dumpdata = event.get_unsubscribe_dump_data() await self.ws.send(dumpdata) event.remove_event() def remove_event(self, event_id): """ 특정 이벤트를 리스너에서 삭제합니다. 사용자가 직접 호출하는건 권장하지 않습니다. """ if event_id in self.events: event = self.events[event_id] del self.events[event_id] del event if event_id in self.pending_event_ids: self.pending_event_ids.remove(event_id) async def process_events_appending(self): """ 사용자로부터 받았던 틱데이터 이벤트 요청을 모아서 처리합니다. 웹소켓 연결 해제 후 재연결시에서도 사용됩니다. """ self.tot_retry_cnt += 1 cur_retry_cnt = self.tot_retry_cnt logger.info(f"{self.LOG_TAG} process_events_appending") while True: if cur_retry_cnt < self.tot_retry_cnt: logger.info(f"{self.LOG_TAG} old process task exit. {cur_retry_cnt}, {self.tot_retry_cnt}") return if self.ws_connected: res = [] # 날짜가 바뀌면 이벤트를 모두 날립니다 keys = list(self.events.keys()) for event_id in keys: if self.events[event_id].check_day_changed(): await self.close_event(event_id) cur_pending_event_ids = list(self.pending_event_ids) for pending_event_id in cur_pending_event_ids: if not self.events[pending_event_id].check_removed(): await self.send_subscribe(pending_event_id) res.append(pending_event_id) self.pending_event_ids.remove(pending_event_id) if len(res) > 0 or len(cur_pending_event_ids) > 0: logger.debug(f"{self.LOG_TAG} try connect events:{cur_pending_event_ids}, connected: {res}") else: logger.debug(f"{self.LOG_TAG}not connected") await asyncio.sleep(self.event_add_delay) def move_events_to_pending(self): self.pending_event_ids = list(self.events.keys()) async def register(self, ws): """ 서버에 유저를 등록합니다. """ data = json.dumps({"type": "register", "client_id": self.client_id}) logger.info(f"{self.LOG_TAG}registering. data:{data}") await ws.send(data) async def handle_response(self, response): """ 서버로부터 받은 틱데이터 이벤트를 처리합니다. """ data = json.loads(response) if "event_id" not in data: # res_data = data["data"] # if res_data["message"] == "pong": # logger.debug(f"{self.LOG_TAG} pong response") return event_id = data["event_id"] if event_id in self.events: event = self.events[event_id] if event.check_removed(): self.events.remove(event_id) return else: await event.handle_tick_data(data) else: logger.warning(f"{self.LOG_TAG}event_id {event_id} not found") class TickEvent: """ TickEventListener 클래스에서 사용하는 틱데이터 이벤트 클래스입니다. """ LOG_TAG = "[TickEvent]" CLOSE_TIME = dtm.time(18, 0) def __init__(self, event_listener, ticker, client_id, event_id, price, once, side, price_comparison, listen_callback): logger.info(f"{self.LOG_TAG} create {event_id}, {ticker}, {price}, {price_comparison}") self.removed = False self.event_listener = event_listener self.ticker = ticker self.client_id = client_id self.event_id = event_id self.price = price self.once = once self.side = side self.price_comparison = price_comparison self.listen_callback = listen_callback self.date = dtm.datetime.now().strftime("%Y%m%d") async def listen_tick_event(self, ws): logger.info(f"{self.LOG_TAG} listen {self.event_id}") self.removed = False await ws.send(json.dumps({"type": "subscribe", "ticker": self.ticker, "client_id": self.client_id, "event_id": self.event_id, "price": self.price, "once": self.once, "side": self.side, "price_comparison": self.price_comparison})) def get_subscribe_dump_data(self): logger.info(f"{self.LOG_TAG} subscribe {self.event_id}, {self.ticker}, {self.price}, {self.price_comparison}, {self.side}") return json.dumps({"type": "subscribe", "ticker": self.ticker, "client_id": self.client_id, "event_id": self.event_id, "price": self.price, "once": self.once, "side": self.side, "price_comparison": self.price_comparison}) def get_unsubscribe_dump_data(self): return json.dumps({"type": "unsubscribe", "ticker": self.ticker, "client_id": self.client_id, "event_id": self.event_id}) async def handle_tick_data(self, data): if self.listen_callback: if inspect.iscoroutinefunction(self.listen_callback): await self.listen_callback(data) else: self.listen_callback(data) if self.once: self.remove_event() self.event_listener.remove_event(self.event_id) def remove_event(self): self.removed = True def check_removed(self): return self.removed def check_day_changed(self): now = dtm.datetime.now() if self.date != now.strftime("%Y%m%d"): return True else: if now.time() >= self.CLOSE_TIME: return True else: return False