I am not sure how i can mock the websocket.serve method and other methods of websocket module, Need help in creating a unittest for this class. if anyone help in creating unittest for these methods?
I want to wroite unit test for this python file, I am not sure how to mock a websocket and how to write a unit test for this particular python file, can anyone please help with some test cases .
import websockets
class WebSocketNotif(NotifBase):
def __init__(self):
super().__init__()
self.log = Logger(module = "modulename").create()
self.__config = CommonConfig("config.json")
self.websocket_host = "localhost"
self.websocket_port = self.__config["websocket_port"] if "websocket_port" in self.__config
else 8081
self.push_frequency_seconds = int(self.__config["notification_frequency"])*60\
if "notification_frequency" in self.__config else 600
self.serve_forever = True
def start_server(self):
start_server = websockets.serve(self.notify,self.websocket_host, self.websocket_post)
asyncio.get_event_loop().run_untill_complete(start_server)
asyncio.get_event_loop().run_forever()
def stop_server(self):
self.serve_forever = False
async def __serve_connection_status(self, websocket):
while self.serve_forever:
to_send, _ = self.get_connection_status()
try:
to_send = json.dumps(to_send)
await websocket.send(str(to_send))
await asyncio.sleep(self.push_frequency_seconds)
except websockets.ConnectionClosed:
break
async def __serve_install_status(self,websocket):
while self.serve_forever:
to_send, _ = self.get_install_status()
try:
to_send = json.dumps(to_send)
await websocket.send(str(to_send))
await asyncio.sleep(self.push_frequency_seconds)
except websockets.ConnectionClosed:
break
async def __serve_download_status(self, websocket):
while self.serve_forever:
to_send, _ = self.get_download_status()
try:
await websocket.send(str(to_send))
await asyncio.sleep(self.push_frequency_seconds)
except websockets.ConnectionClosed:
break
async def notify(self, websocket, path)
if "notify/download" in path:
await self.__serve_download_status(websocket)
elif "notify/install" in path:
await self.__serve_install_status(websocket)
elif "notify/connection" in path:
await self.__serve_connection_status(websocket)
else:
await websocket.send('{"status":"Invalid URL"')
if __name__ = "__main__"
WebSocketNotif().start_server()