[docs]defget(self,key:str)->Union[str,bool,int,float,dict,list,None]:""" KV store 로 부터 key 에 해당하는 값을 가져온다. Args: key (str): key Returns: str | bool | int | float | dict | list | None: value """asserttype(key)isstr,"key must be a string"assertlen(key)>0,"key must not be empty"url=f"{c.PYQQQ_API_URL}/kvstore/get-value"params={"contextName":self.name,"key":key}r=send_request("GET",url,params=params)raise_for_status(r)iflen(r.text)==0:returnNonedata=r.json()ifdata:returnjson.loads(data)else:returnNone
[docs]defset(self,key:str,value:Union[str,bool,int,float,dict,list,None])->bool:""" KV store 에 key 에 해당하는 값을 저장한다. value는 json.dumps로 serialize 가능한 값이어야 한다. value가 None이면 key를 삭제한다. (delete 메소드와 동일한 효과) Args: key (str): key value (str | bool | int | float | dict | list | None): value Returns: bool: 성공 여부 """asserttype(key)isstr,"key must be a string"assertlen(key)>0,"key must not be empty"asserttype(value)in[str,bool,int,float,dict,list,NoneType,],"value must be a string, bool, int, float, dict, list, or None"ifvalueisNone:returnself.delete(key)try:value=json.dumps(value)exceptException:raiseValueError("value must be serializable to JSON")req_body={"contextName":self.name,"key":key,"value":value,}r=send_request("PUT",f"{c.PYQQQ_API_URL}/kvstore/set-value",json=req_body)raise_for_status(r)data=r.json()returndata.get("message")=="success"
[docs]defdelete(self,key:str)->bool:""" KV store 에서 key 에 해당하는 값을 삭제한다. Args: key (str): key Returns: bool: 성공 여부 """req_body={"contextName":self.name,"key":key,}r=send_request("DELETE",f"{c.PYQQQ_API_URL}/kvstore/delete-value",json=req_body)raise_for_status(r)ifr.status_code!=200:print(r.text)data=r.json()returndata.get("message")=="success"
[docs]defkeys(self)->List[str]:""" KV store 에 저장된 모든 key 를 가져온다. Returns: List[str]: key list """url=f"{c.PYQQQ_API_URL}/kvstore/list-keys"r=send_request("GET",url,params={"contextName":self.name})raise_for_status(r)data=r.json()returndataifdataelse[]
[docs]defclear(self):""" KV store 에 저장된 모든 key-value 를 삭제한다. """url=f"{c.PYQQQ_API_URL}/kvstore/clear"r=send_request("DELETE",url,json={"contextName":self.name})raise_for_status(r)
classMockKVStore(KVStore):""" Mock KVStore for testing """def__init__(self,context_name:str="default"):super().__init__(context_name)self.store={}defget(self,key:str)->Union[str,bool,int,float,dict,list,None]:returnself.store.get(key)defset(self,key:str,value:Union[str,bool,int,float,dict,list,None])->bool:ifvalueisNone:returnself.delete(key)self.store[key]=valuereturnTruedefdelete(self,key:str)->bool:ifkeyinself.store:delself.store[key]returnTruereturnFalsedefkeys(self)->List[str]:returnlist(self.store.keys())defclear(self):self.store={}