from typing import List
from pyqqq.utils.api_client import raise_for_status, send_request
from pyqqq.utils.logger import get_logger
from pyqqq.utils.singleton import singleton
import pyqqq.config as c
import asyncio
import os
import websockets
import json
import inspect
import datetime as dtm
logger = get_logger(__name__)
[docs]
def get_all_last_trades(codes: List[str] = None):
"""
모든 종목의 최근 체결 정보를 반환합니다.
Args:
codes (List[str], optional): 종목 코드 리스트. 모든 종목의 체결 정보를 반환하려면 None을 전달합니다.
Returns:
list:
- dict:
- chetime (str): 체결시간
- sign (str): 전일대비구분
- change (int): 전일대비가격
- drate (float): 전일대비등락율
- price (int): 체결가
- opentime (str): 시가시간
- open (int): 시가
- hightime (str): 고가시간
- high (int): 고가
- lowtime (str): 저가시간
- low (int): 저가
- cgubun (str): 체결구분
- cvolume (int): 체결량
- volume (int): 누적거래량
- value (int): 누적거래대금(백만)
- mdvolume (int): 매도체결수량
- mdchecnt (int): 매도체결건수
- msvolume (int): 매수체결수량
- mschecnt (int): 매수체결건수
- cpower (float): 체결강도
- offerho (int): 매도호가
- bidho (int): 매수호가
- status (str): 장정보
- jnilvolume (int): 전일동시간대거래량
- shcode (str): 종목코드
"""
params = None
if codes:
params = {"codes": ",".join(codes) if isinstance(codes, list) else codes}
r = send_request("GET", f"{c.PYQQQ_API_URL}/domestic-stock/trades", params=params)
raise_for_status(r)
data = r.json()
result = [data[k] for k in data.keys()]
return result
def get_all_last_orderbooks():
"""
모든 종목의 최근 호가 정보를 반환합니다.
Returns:
list:
- dict:
- hotime (str): 호가 시간 (HHMMSS)
- date (str): 호가 날짜 (YYYYMMDD)
- bidho[1~10] (int): 매도 호가
- bidrem[1~10] (int): 매도 호가 잔량
- totbidrem (int): 총 매수 호가 잔량
- offerho[1~10] (int): 매수 호가
- offerrem[1~10] (int): 매수 호가 잔량
- totofferrem (int): 총 매수 호가 잔량
- donsigubun (str): 동시 호가 구분
- alloc_gubun (str or None): 배분 적용 구분
- volume (int): 누적 거래량
- shcode (str): 종목 코드
"""
r = send_request("GET", f"{c.PYQQQ_API_URL}/domestic-stock/orderbooks")
raise_for_status(r)
data = r.json()
result = [data[k] for k in data.keys()]
return result
@singleton
class TickEventListener:
"""
틱 데이터를 실시간으로 구독하는 서버와 websocket으로 연결되오,
원하는 조건이 충족되었을 때 콜백을 호출하는 클래스입니다.
객체를 생성 후, start() 하면 웹소켓이 연결됩니다.
start() 전, 후에 관계없이 append_event()로 이벤트를 추가할 수 있습니다.
Args:
client_id (str): 클라이언트 ID. 고유한 값이어야 합니다.
url (str): 웹소켓 서버 주소. TICKEVENT_URL 환경변수로 설정할 수 있습니다.
config (dict): 설정값
event_add_delay (float): 이벤트 추가 딜레이 시간. 기본값은 1초입니다.
"""
LOG_TAG = "[TickEventListener]"
def __init__(self, client_id, url=None, config={}):
self.ws_connected = False
self.events = {}
self.pending_event_ids = []
self.event_add_delay = 1.0
self.health_ping_delay = 60
self.retry_cnt = 0
self.ws = None
self.tot_retry_cnt = 0
logger.info(f"{self.LOG_TAG}init")
if "event_add_delay" in config:
self.event_add_delay = config["event_add_delay"]
if "health_ping_delay" in config:
self.health_ping_delay = config["health_ping_delay"]
if client_id is not None:
api_key = os.getenv("PYQQQ_API_KEY") or ""
self.client_id = client_id + api_key
else:
self.client_id = api_key
self.url = url
if self.url is None:
self.url = os.getenv("TICKEVENT_URL") or c.PYQQQ_EVENT_WS_URL
async def start(self):
"""
틱데이터 이벤트 리스너를 시작합니다
"""
self.tasks = [
asyncio.create_task(self.connect_ws()),
asyncio.create_task(self.health_ping())
]
async def stop(self):
"""
틱데이터 이벤트 리스너를 종료합니다.
"""
for t in self.tasks:
t.cancel()
await asyncio.gather(*self.tasks)
async def connect_ws(self):
"""
웹소켓 서버에 연결하여 틱데이터 이벤트를 수신합니다.
"""
try:
async with websockets.connect(self.url) as websocket:
self.retry_cnt = 0
self.ws_connected = True
self.ws = websocket
await self.register(websocket)
asyncio.create_task(self.process_events_appending())
try:
async for message in websocket:
# logger.debug(f"{self.LOG_TAG}message: {message}")
await self.handle_response(message)
except websockets.exceptions.ConnectionClosed:
logger.info(f"{self.LOG_TAG}Connection closed")
await self.on_connect_failed()
except Exception as e:
logger.info(f"{self.LOG_TAG}Connection failed. error: {e}")
await self.on_connect_failed()
finally:
self.ws_connected = False
async def health_ping(self):
"""
웹소켓 서버에 헬스체크를 보냅니다.
"""
if self.ws_connected:
await self.ws.send(json.dumps({"type": "ping", "client_id": self.client_id}))
await asyncio.sleep(self.health_ping_delay)
async def on_connect_failed(self):
self.ws_connected = False
self.move_events_to_pending()
self.retry_cnt += 1
sleep_time = min(60, self.retry_cnt * 2)
logger.info(f"{self.LOG_TAG}retry cnt: {self.retry_cnt}. sleep_time: {sleep_time} sec")
await asyncio.sleep(self.retry_cnt * 2)
await self.connect_ws()
def append_event(self, ticker, event_id, price, once, side, price_comparison, listen_callback):
"""
사용자로부터 틱데이터 이벤트 추가 요청을 등록합니다. 즉시 처리되진 않습니다.
"""
self.events[event_id] = TickEvent(ticker=ticker, event_listener=self, client_id=self.client_id, event_id=event_id, price=price, once=once, side=side, price_comparison=price_comparison, listen_callback=listen_callback)
self.pending_event_ids.append(event_id)
async def append_event_async(self, ticker, event_id, price, once, side, price_comparison, listen_callback):
"""
사용자로부터 틱데이터 이벤트 추가 요청을 등록합니다. 비동기로 가능한 한 즉시 처리됩니다.
"""
if self.ws_connected:
self.events[event_id] = TickEvent(ticker=ticker, event_listener=self, client_id=self.client_id, event_id=event_id, price=price, once=once, side=side, price_comparison=price_comparison, listen_callback=listen_callback)
logger.info(f"{self.LOG_TAG}append_event_async {event_id}")
await self.send_subscribe(event_id)
else:
logger.info(f"{self.LOG_TAG}append_event_async {event_id} failed")
self.append_event(ticker, event_id, price, once, side, price_comparison, listen_callback)
async def close_event(self, event_id):
"""
사용자로부터 틱데이터 이벤트 삭제 요청을 등록합니다. 비동기로 가능한 한 즉시 처리됩니다.
"""
await self.send_unsubscribe(event_id)
self.remove_event(event_id)
async def send_subscribe(self, event_id):
"""
틱데이터 이벤트 구독 요청을 보냅니다.
"""
event = self.events[event_id]
if event is not None and not event.check_removed():
dumpdata = self.events[event_id].get_subscribe_dump_data()
await self.ws.send(dumpdata)
async def send_unsubscribe(self, event_id):
"""
틱데이터 이벤트 구독 해지 요청을 보냅니다.
"""
event = self.events[event_id]
if event is not None and not event.check_removed():
dumpdata = event.get_unsubscribe_dump_data()
await self.ws.send(dumpdata)
event.remove_event()
def remove_event(self, event_id):
"""
특정 이벤트를 리스너에서 삭제합니다.
사용자가 직접 호출하는건 권장하지 않습니다.
"""
if event_id in self.events:
event = self.events[event_id]
del self.events[event_id]
del event
if event_id in self.pending_event_ids:
self.pending_event_ids.remove(event_id)
async def process_events_appending(self):
"""
사용자로부터 받았던 틱데이터 이벤트 요청을 모아서 처리합니다.
웹소켓 연결 해제 후 재연결시에서도 사용됩니다.
"""
self.tot_retry_cnt += 1
cur_retry_cnt = self.tot_retry_cnt
logger.info(f"{self.LOG_TAG} process_events_appending")
while True:
if cur_retry_cnt < self.tot_retry_cnt:
logger.info(f"{self.LOG_TAG} old process task exit. {cur_retry_cnt}, {self.tot_retry_cnt}")
return
if self.ws_connected:
res = []
# 날짜가 바뀌면 이벤트를 모두 날립니다
keys = list(self.events.keys())
for event_id in keys:
if self.events[event_id].check_day_changed():
await self.close_event(event_id)
cur_pending_event_ids = list(self.pending_event_ids)
for pending_event_id in cur_pending_event_ids:
if not self.events[pending_event_id].check_removed():
await self.send_subscribe(pending_event_id)
res.append(pending_event_id)
self.pending_event_ids.remove(pending_event_id)
if len(res) > 0 or len(cur_pending_event_ids) > 0:
logger.debug(f"{self.LOG_TAG} try connect events:{cur_pending_event_ids}, connected: {res}")
else:
logger.debug(f"{self.LOG_TAG}not connected")
await asyncio.sleep(self.event_add_delay)
def move_events_to_pending(self):
self.pending_event_ids = list(self.events.keys())
async def register(self, ws):
"""
서버에 유저를 등록합니다.
"""
data = json.dumps({"type": "register", "client_id": self.client_id})
logger.info(f"{self.LOG_TAG}registering. data:{data}")
await ws.send(data)
async def handle_response(self, response):
"""
서버로부터 받은 틱데이터 이벤트를 처리합니다.
"""
data = json.loads(response)
if "event_id" not in data:
# res_data = data["data"]
# if res_data["message"] == "pong":
# logger.debug(f"{self.LOG_TAG} pong response")
return
event_id = data["event_id"]
if event_id in self.events:
event = self.events[event_id]
if event.check_removed():
self.events.remove(event_id)
return
else:
await event.handle_tick_data(data)
else:
logger.warning(f"{self.LOG_TAG}event_id {event_id} not found")
class TickEvent:
"""
TickEventListener 클래스에서 사용하는 틱데이터 이벤트 클래스입니다.
"""
LOG_TAG = "[TickEvent]"
CLOSE_TIME = dtm.time(18, 0)
def __init__(self, event_listener, ticker, client_id, event_id, price, once, side, price_comparison, listen_callback):
logger.info(f"{self.LOG_TAG} create {event_id}, {ticker}, {price}, {price_comparison}")
self.removed = False
self.event_listener = event_listener
self.ticker = ticker
self.client_id = client_id
self.event_id = event_id
self.price = price
self.once = once
self.side = side
self.price_comparison = price_comparison
self.listen_callback = listen_callback
self.date = dtm.datetime.now().strftime("%Y%m%d")
async def listen_tick_event(self, ws):
logger.info(f"{self.LOG_TAG} listen {self.event_id}")
self.removed = False
await ws.send(json.dumps({"type": "subscribe", "ticker": self.ticker, "client_id": self.client_id, "event_id": self.event_id, "price": self.price, "once": self.once, "side": self.side, "price_comparison": self.price_comparison}))
def get_subscribe_dump_data(self):
logger.info(f"{self.LOG_TAG} subscribe {self.event_id}, {self.ticker}, {self.price}, {self.price_comparison}, {self.side}")
return json.dumps({"type": "subscribe", "ticker": self.ticker, "client_id": self.client_id, "event_id": self.event_id, "price": self.price, "once": self.once, "side": self.side, "price_comparison": self.price_comparison})
def get_unsubscribe_dump_data(self):
return json.dumps({"type": "unsubscribe", "ticker": self.ticker, "client_id": self.client_id, "event_id": self.event_id})
async def handle_tick_data(self, data):
if self.listen_callback:
if inspect.iscoroutinefunction(self.listen_callback):
await self.listen_callback(data)
else:
self.listen_callback(data)
if self.once:
self.remove_event()
self.event_listener.remove_event(self.event_id)
def remove_event(self):
self.removed = True
def check_removed(self):
return self.removed
def check_day_changed(self):
now = dtm.datetime.now()
if self.date != now.strftime("%Y%m%d"):
return True
else:
if now.time() >= self.CLOSE_TIME:
return True
else:
return False