Source code for pyqqq.brokerage.kis.oauth

from pyqqq import get_api_key
from pyqqq.utils.kvstore import KVStore
from tinydb import TinyDB, Query
from typing import Optional, Tuple
import datetime as dtm
import pyqqq.config as c
import requests


[docs] class KISAuth: ''' 한국투자증권 인증 정보를 담고 있는 객체 Args: appkey (str): 앱 키 appsecret (str): 앱 시크릿 paper_trading (bool): 모의 투자 여부 '''
[docs] def __init__(self, appkey: str, appsecret: str, paper_trading: bool = False): assert appkey is not None, "App key must be set" assert appsecret is not None, "App secret must be set" self.appkey = appkey self.appsecret = appsecret self.paper_trading = paper_trading self.repo = KISTokenRepository()
@property def host_url(self) -> str: ''' 모의/실전 투자 환경에 따른 API 호스트 URL을 반환합니다. Returns: str: API 호스트 URL ''' if self.paper_trading: return 'https://openapivts.koreainvestment.com:29443' else: return 'https://openapi.koreainvestment.com:9443' @property def websocket_url(self) -> str: """ 모의/실전 투자 환경에 따른 웹소켓 호스트 URL을 반환합니다. Returns: str: 웹소켓 호스트 URL """ if self.paper_trading: return "ws://ops.koreainvestment.com:31000" else: return "ws://ops.koreainvestment.com:21000"
[docs] def get_token(self, refresh=False) -> str: ''' 액세스 토큰을 반환합니다. Args: refresh (bool): 강제 재발급 Returns: str: 액세스 토큰 ''' token = self.repo.find(KISKeyTypes.ACCESS_TOKEN, self.appkey, self.appsecret) if token is None or refresh is True: token, expiry = self.issue_access_token() self.repo.save( KISKeyTypes.ACCESS_TOKEN, self.appkey, self.appsecret, token, expiry ) return token
[docs] def issue_access_token(self): ''' 액세스 토큰을 발급합니다. Returns: tuple: 액세스 토큰과 만료 시간을 담은 튜플 ''' if self.appkey is None or self.appsecret is None: raise ValueError('App key and app secret must be set') headers = { 'content-type': 'application/json' } body = { 'grant_type': 'client_credentials', 'appkey': self.appkey, 'appsecret': self.appsecret } r = requests.post(f"{self.host_url}/oauth2/tokenP", headers=headers, json=body) r.raise_for_status() data = r.json() access_token = data['access_token'] expires_in = data['expires_in'] expiry = int(dtm.datetime.now().timestamp() + expires_in) return access_token, expiry
def get_approval_key(self, refresh=False) -> str: """ 승인 키를 반환합니다. Args: refresh (bool): 강제 재발급 Returns: str: 승인 키 """ approval_key = self.repo.find( KISKeyTypes.APPROVAL_KEY, self.appkey, self.appsecret ) if approval_key is None or refresh is True: approval_key, expiry = self.issue_approval_key() self.repo.save( KISKeyTypes.APPROVAL_KEY, self.appkey, self.appsecret, approval_key, expiry, ) return approval_key def issue_approval_key(self) -> Tuple[str, str]: """ 승인 키를 발급합니다. Returns: tuple: 승인 키와 만료 시간을 담은 튜플 """ headers = {"content-type": "application/json; utf-8"} body = { "grant_type": "client_credentials", "appkey": self.appkey, "secretkey": self.appsecret, } r = requests.post( f"{self.host_url}/oauth2/Approval", headers=headers, json=body ) r.raise_for_status() data = r.json() approval_key = data["approval_key"] expiry = int( (dtm.datetime.now() + dtm.timedelta(hours=23, minutes=30)).timestamp() ) return approval_key, expiry
class KISTokenRepository: def __init__(self): if get_api_key(): self.repo = _RemoteTokenRepository() else: self.repo = _LocalTokenRepository() def find(self, type, appkey, appsecret) -> Optional[str]: """ 앱 키와 앱 시크릿에 해당하는 액세스 토큰을 반환합니다. Args: type (str): 토큰 타입 (access_token, approval_key) appkey (str): 앱 키 appsecret (str): 앱 시크릿 Returns: str|None: 액세스 토큰 """ return self.repo.find(type, appkey, appsecret) def save(self, type, appkey, appsecret, token, expiry): """ 앱 키와 앱 시크릿에 해당하는 액세스 토큰을 저장합니다. Args: type (str): 토큰 타입 (access_token, approval_key) appkey (str): 앱 키 appsecret (str): 앱 시크릿 token (str): 액세스 토큰 expiry (int): 만료 시간(UNIX 타임스탬프) """ return self.repo.save(type, appkey, appsecret, token, expiry) class KISKeyTypes: ACCESS_TOKEN = "access_token" APPROVAL_KEY = "approval_key" class _LocalTokenRepository: def __init__(self): self.db = TinyDB(c.get_tiny_db_path()) def find(self, type, appkey, appsecret) -> Optional[str]: TokenCache = Query() result = self.db.search( TokenCache.type == type and TokenCache.appkey == appkey and TokenCache.appsecret == appsecret and TokenCache.expiry > dtm.datetime.now().timestamp() ) if len(result) > 0: return result[0]['token'] else: return None def save(self, type, appkey, appsecret, token, expiry): TokenCache = Query() self.db.upsert( { "type": type, "appkey": appkey, "appsecret": appsecret, "token": token, "expiry": expiry, }, TokenCache.appkey == appkey and TokenCache.appsecret == appsecret, ) class _RemoteTokenRepository: def __init__(self): self.kvstore = KVStore("kis_token_cache") self.cache = {} def _cache_key(self, type, appkey, appsecret): return f"{type}_{appkey}_{appsecret}" def find(self, type, appkey, appsecret) -> Optional[str]: key = self._cache_key(type, appkey, appsecret) # 1st. check local data local_data = self.cache.get(key, None) if local_data: if local_data["expiry"] > dtm.datetime.now().timestamp(): return local_data["token"] else: self.cache.pop(key) # 2nd. check remote data if self.kvstore.get(key): remote_data = self.kvstore.get(key) if remote_data["expiry"] > dtm.datetime.now().timestamp(): self.cache[key] = remote_data return remote_data["token"] else: self.kvstore.delete(key) return None def save(self, type, appkey, appsecret, token, expiry): key = self._cache_key(type, appkey, appsecret) data = {"token": token, "expiry": expiry} self.cache[key] = data self.kvstore.set(key, data)