Source code for pyqqq.data.minutes

from typing import Dict
from pyqqq.utils.api_client import raise_for_status, send_request
from pyqqq.utils.local_cache import DiskCacheManager
from pyqqq.utils.logger import get_logger
import datetime as dtm
import numpy as np
import pandas as pd
import pyqqq.config as c
import pytz


logger = get_logger(__name__)
minuteCache = DiskCacheManager("minute_cache")


[docs] @minuteCache.memoize() def get_all_minute_data(time: dtm.datetime, source: str = "ebest", adjusted: bool = True) -> pd.DataFrame: """ 모든 종목의 분봉 데이터를 반환합니다. 2024년 4월 9일 데이터 부터 조회 가능합니다. Args: time (dtm.datetime): 조회할 시간 source (str): 데이터를 검색할 API. 'ebest' 또는 'kis'를 지정할 수 있습니다. 기본값은 'ebest'입니다. adjusted (bool): 수정주가 여부. 기본값은 True. Returns: pd.DataFrame: 모든 종목의 분봉 데이터가 포함된 pandas DataFrame. DataFrame의 열은 다음과 같습니다: - open (int): 시가 - high (int): 고가 - low (int): 저가 - close (int): 종가 - volume (int): 누적거래량 - sign (str): 대비부호 (1:상한가 2:상승, 3:보합 4:하한가, 5:하락) - change (str): 전일 대비 가격 변화 - diff (float): 전일 대비 등락율 - chdegree (float): 체결강도 - mdvolume (int): 매도체결수량 - msvolume (int): 매수체결수량 - revolume (int): 순매수체결량 - mdchecnt (int): 매도체결건수 - mschecnt (int): 매수체결건수 - rechecnt (int): 순체결건수 - cvolume (int): 체결량 - mdchecnttm (int): 시간별매도체결건수 - mschecnttm (int): 시간별매수체결건수 - totofferrem (int): 매도잔량 - totbidrem (int): 매수잔량 - mdvolumetm (int): 시간별매도체결량 - msvolumetm (int): 시간별매수체결량 Examples: >>> df = get_all_minute_data(dtm.datetime(2024, 5, 2, 15, 30)) >>> print(df) time open high low ... totofferrem totbidrem mdvolumetm msvolumetm code ... 000020 2024-05-02 15:30:00 8700 8700 8700 ... 8404 4015 35 1 000040 2024-05-02 15:30:00 1058 1058 1058 ... 13597 24738 20 0 000050 2024-05-02 15:30:00 7620 7620 7620 ... 2534 2866 0 0 000070 2024-05-02 15:30:00 69400 69400 69400 ... 606 724 0 3 000075 2024-05-02 15:30:00 54900 54900 54900 ... 315 308 0 0 [5 rows x 23 columns] """ tz = pytz.timezone("Asia/Seoul") url = f"{c.PYQQQ_API_URL}/domestic-stock/ohlcv/minutes/all/{time.date()}/{time.strftime('%H%M')}" params = { "brokerage": source, "adjusted": "true" if adjusted else "false", "current_date": dtm.date.today(), } r = send_request("GET", url, params=params) if r.status_code != 200 and r.status_code != 201: logger.error(f"Failed to get minute data: {r.text}") return rows = r.json() for data in rows: time = data["time"].replace("Z", "+00:00") time = dtm.datetime.fromisoformat(time).astimezone(tz).replace(tzinfo=None) data["time"] = time df = pd.DataFrame(rows) if not df.empty: dtypes = df.dtypes for k in [ "open", "high", "low", "close", "volume", "change", "totofferrem", "totbidrem", ]: if k in dtypes: dtypes[k] = np.dtype("int64") for k in ["diff", "chdegree"]: if k in dtypes: dtypes[k] = np.dtype("float64") df = df.astype(dtypes) df.set_index("code", inplace=True) return df
[docs] @minuteCache.memoize() def get_all_day_data( date: dtm.date, codes: list[str] | str, period: dtm.timedelta = dtm.timedelta(minutes=1), source: str = "ebest", adjusted: bool = True, ascending: bool = True, ) -> dict[str, pd.DataFrame] | pd.DataFrame: """ 지정된 날짜에 대해 하나 이상의 주식 코드에 대한 전체 분별 OHLCV(시가, 고가, 저가, 종가, 거래량) 데이터를 검색하여 반환합니다. 2024년 4월 26일 데이터 부터 조회 가능합니다. Args: date (dtm.date): 데이터를 검색할 날짜. codes (list[str]): 조회할 주식 코드들의 리스트. 최대 20개까지 지정할 수 있습니다. period (dtm.timedelta, optional): 반환된 데이터의 시간 간격. 기본값은 1분입니다. 30초 이상의 값을 30초간격으로 지정할 수 있습니다. source (str, optional): 데이터를 검색할 API. 'ebest' 또는 'kis'를 지정할 수 있습니다. 기본값은 'ebest'입니다. adjusted (bool): 수정주가 여부. 기본값은 True. ascending (bool): 오름차순 여부. 기본값은 True. Returns: dict[str, pd.DataFrame]: 주식 코드를 키로 하고, 해당 주식의 일일 OHLCV 데이터가 포함된 pandas DataFrame을 값으로 하는 딕셔너리. 각 DataFrame에는 변환된 'time' 열이 포함되어 있으며, 이는 조회된 데이터의 시간을 나타냅니다. 'time' 열은 DataFrame의 인덱스로 설정됩니다. DataFrame의 열은 다음과 같습니다: - open (int): 시가 - high (int): 고가 - low (int): 저가 - close (int): 종가 - volume (int): 누적거래량 - sign (str): 대비부호 (1:상한가 2:상승, 3:보합 4:하한가, 5:하락) - change (str): 전일 대비 가격 변화 - diff (float): 전일 대비 등락율 - chdegree (float): 체결강도 - mdvolume (int): 매도체결수량 - msvolume (int): 매수체결수량 - revolume (int): 순매수체결량 - mdchecnt (int): 매도체결건수 - mschecnt (int): 매수체결건수 - rechecnt (int): 순체결건수 - cvolume (int): 체결량 - mdchecnttm (int): 시간별매도체결건수 - mschecnttm (int): 시간별매수체결건수 - totofferrem (int): 매도잔량 - totbidrem (int): 매수잔량 - mdvolumetm (int): 시간별매도체결량 - msvolumetm (int): 시간별매수체결량 Raises: requests.exceptions.RequestException: PYQQQ API로부터 데이터를 검색하는 과정에서 오류가 발생한 경우. Examples: >>> result = get_all_day_data(dtm.date(2024, 4, 26), ["005930", "319640"], dtm.timedelta(minutes=1)) >>> print(result["069500"]) open high low close volume sign change diff \ time 2024-04-26 09:00:00 77800 77900 77400 77600 1629535 2 1300 1.70 2024-04-26 09:01:00 77500 77700 77300 77600 2155263 2 1300 1.70 2024-04-26 09:02:00 77600 77700 77400 77500 2600420 2 1200 1.57 2024-04-26 09:03:00 77500 77500 77200 77500 3033307 2 1200 1.57 2024-04-26 09:04:00 77400 77600 77400 77500 3268502 2 1200 1.57 """ assert isinstance(date, dtm.date), "date must be a datetime.date object" assert isinstance(codes, list) or isinstance(codes, str), "codes must be a list of strings or single code" if isinstance(codes, list): assert all(isinstance(code, str) for code in codes), "codes must be a list of strings" assert len(codes) > 0, "codes must not be empty" assert len(codes) <= 20, "codes must not exceed 20" if period is not None: assert period >= dtm.timedelta(seconds=30), "period must be at least 30 seconds" assert period.total_seconds() % 30 == 0, "period must be a multiple of 30 seconds" tz = pytz.timezone("Asia/Seoul") target_codes = codes if isinstance(codes, list) else [codes] if source == "kis": url = f"{c.PYQQQ_API_URL}/domestic-stock/ohlcv/minutes/{date}" else: url = f"{c.PYQQQ_API_URL}/domestic-stock/ohlcv/half-minutes/{date}" r = send_request( "GET", url, params={ "codes": ",".join(target_codes) if target_codes else None, "brokerage": source, "adjusted": "true" if adjusted else "false", "current_date": dtm.date.today(), }, ) if r.status_code != 200 and r.status_code != 201: logger.error(f"Failed to get day data: {r.text}") r.raise_for_status() result = {} for code in target_codes: result[code] = pd.DataFrame() entries = r.json() cols = entries["cols"] if len(cols) == 0: return result time_index = cols.index("time") multirows = entries["rows"] for code in multirows.keys(): rows = multirows[code] for row in rows: time = row[time_index].replace("Z", "+00:00") time = dtm.datetime.fromisoformat(time).astimezone(tz).replace(tzinfo=None) row[time_index] = time rows.reverse() df = pd.DataFrame(rows, columns=cols) if source == "kis": df = resample_kis_data(df, period) else: df = resample_ebest_data(df, period) df.sort_index(ascending=ascending, inplace=True) result[code] = df if isinstance(codes, str): return result[codes] else: return result
def resample_ebest_data(df, period): if period is not None and period.total_seconds() != 30: df["time"] = df["time"] - dtm.timedelta(seconds=30) df.set_index("time", inplace=True) minutes = period.total_seconds() / 60 op_dict = { "open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum", "sign": "last", "change": "last", "diff": "last", "chdegree": "last", "mdvolume": "sum", "msvolume": "sum", "revolume": "sum", "mdchecnt": "sum", "mschecnt": "sum", "rechecnt": "sum", "cvolume": "sum", "mdchecnttm": "sum", "mschecnttm": "sum", "totofferrem": "last", "totbidrem": "last", "mdvolumetm": "sum", "msvolumetm": "sum", } df = df.resample(f"{minutes}min").apply(op_dict) df.dropna(inplace=True) df.reset_index(inplace=True) dtypes = df.dtypes for k in [ "open", "high", "low", "close", "volume", "change", "totofferrem", "totbidrem", ]: dtypes[k] = np.dtype("int64") dtypes["diff"] = np.dtype("float64") dtypes["chdegree"] = np.dtype("float64") df = df.astype(dtypes) df.set_index("time", inplace=True) return df def resample_kis_data(df, period): if period is not None and period.total_seconds() != 60: minutes = period.total_seconds() // 60 op_dict = { "open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum", "value": "sum", "cum_volume": "sum", "cum_value": "sum", } df = df.resample(f"{minutes}min").apply(op_dict) df.dropna(inplace=True) df.reset_index(inplace=True) dtypes = df.dtypes for k in [ "open", "high", "low", "close", "volume", "value", "cum_volume", "cum_value", ]: dtypes[k] = np.dtype("int64") df = df.astype(dtypes) df = df[["time", "open", "high", "low", "close", "volume", "value", "cum_volume", "cum_value"]] df.set_index("time", inplace=True) return df
[docs] def get_orderbook(code: str, time: dtm.datetime) -> Dict: """ 주식 종목의 주문 호가 정보를 반환합니다. Args: code (str): 종목 코드 time (dtm.datetime): 조회할 시간 Returns: dict: 호가 정보가 포함된 사전. - total_bid_volume (int): 총 매수 잔량. - total_ask_volume (int): 총 매도 잔량. - ask_price (int): 1차 매도 호가 가격. - ask_volume (int): 1차 매도 호가 잔량. - bid_price (int): 1차 매수 호가 가격. - bid_volume (int): 1차 매수 호가 잔량. - time (dtm.datetime): 현지 기준 호가 정보 조회 시간. - bids (list): 매수 호가 목록 (각 항목은 price와 volume을 포함하는 dict). - asks (list): 매도 호가 목록 (각 항목은 price과 volume을 포함하는 dict). """ url = f"{c.PYQQQ_API_URL}/domestic-stock/orderbook/minutes/{code}/{time.date()}/{time.strftime('%H%M')}" r = send_request("GET", url) if r.status_code == 404: return None else: raise_for_status(r) data = r.json() data.pop("code") data["time"] = dtm.datetime.fromisoformat(data["time"]).astimezone(pytz.timezone("Asia/Seoul")).replace(tzinfo=None) return data