Add contextual logging where applicable (#124)
* Add contextual logging where applicable * Compress device id in log output
This commit is contained in:
@@ -104,7 +104,40 @@ class TuyaLoggingAdapter(logging.LoggerAdapter):
|
||||
|
||||
def process(self, msg, kwargs):
|
||||
"""Process log point and return output."""
|
||||
return f"[{self.extra['device_id']}] {msg}", kwargs
|
||||
dev_id = self.extra["device_id"]
|
||||
return f"[{dev_id[0:3]}...{dev_id[-3:]}] {msg}", kwargs
|
||||
|
||||
|
||||
class ContextualLogger:
|
||||
"""Contextual logger adding device id to log points."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize a new ContextualLogger."""
|
||||
self._logger = None
|
||||
|
||||
def set_logger(self, logger, device_id):
|
||||
"""Set base logger to use."""
|
||||
self._logger = TuyaLoggingAdapter(logger, {"device_id": device_id})
|
||||
|
||||
def debug(self, msg, *args):
|
||||
"""Debug level log."""
|
||||
return self._logger.log(logging.DEBUG, msg, *args)
|
||||
|
||||
def info(self, msg, *args):
|
||||
"""Info level log."""
|
||||
return self._logger.log(logging.INFO, msg, *args)
|
||||
|
||||
def warning(self, msg, *args):
|
||||
"""Warning method log."""
|
||||
return self._logger.log(logging.WARNING, msg, *args)
|
||||
|
||||
def error(self, msg, *args):
|
||||
"""Error level log."""
|
||||
return self._logger.log(logging.ERROR, msg, *args)
|
||||
|
||||
def exception(self, msg, *args):
|
||||
"""Exception level log."""
|
||||
return self._logger.log(logging.EXCEPTION, msg, *args)
|
||||
|
||||
|
||||
def pack_message(msg):
|
||||
@@ -171,19 +204,19 @@ class AESCipher:
|
||||
return s[: -ord(s[len(s) - 1 :])]
|
||||
|
||||
|
||||
class MessageDispatcher:
|
||||
class MessageDispatcher(ContextualLogger):
|
||||
"""Buffer and dispatcher for Tuya messages."""
|
||||
|
||||
# Heartbeats always respond with sequence number 0, so they can't be waited for like
|
||||
# other messages. This is a hack to allow waiting for heartbeats.
|
||||
HEARTBEAT_SEQNO = -100
|
||||
|
||||
def __init__(self, log, listener):
|
||||
def __init__(self, dev_id, listener):
|
||||
"""Initialize a new MessageBuffer."""
|
||||
self.log = log
|
||||
self.buffer = b""
|
||||
self.listeners = {}
|
||||
self.listener = listener
|
||||
self.set_logger(_LOGGER, dev_id)
|
||||
|
||||
def abort(self):
|
||||
"""Abort all waiting clients."""
|
||||
@@ -198,9 +231,9 @@ class MessageDispatcher:
|
||||
async def wait_for(self, seqno, timeout=5):
|
||||
"""Wait for response to a sequence number to be received and return it."""
|
||||
if seqno in self.listeners:
|
||||
raise Exception(f"listener exists for {seqno} (id: {self.id})")
|
||||
raise Exception(f"listener exists for {seqno}")
|
||||
|
||||
self.log.debug("Waiting for sequence number %d", seqno)
|
||||
self.debug("Waiting for sequence number %d", seqno)
|
||||
self.listeners[seqno] = asyncio.Semaphore(0)
|
||||
try:
|
||||
await asyncio.wait_for(self.listeners[seqno].acquire(), timeout=timeout)
|
||||
@@ -246,23 +279,23 @@ class MessageDispatcher:
|
||||
|
||||
def _dispatch(self, msg):
|
||||
"""Dispatch a message to someone that is listening."""
|
||||
_LOGGER.debug("Dispatching message %s", msg)
|
||||
self.debug("Dispatching message %s", msg)
|
||||
if msg.seqno in self.listeners:
|
||||
self.log.debug("Dispatching sequence number %d", msg.seqno)
|
||||
self.debug("Dispatching sequence number %d", msg.seqno)
|
||||
sem = self.listeners[msg.seqno]
|
||||
self.listeners[msg.seqno] = msg
|
||||
sem.release()
|
||||
elif msg.cmd == 0x09:
|
||||
self.log.debug("Got heartbeat response")
|
||||
self.debug("Got heartbeat response")
|
||||
if self.HEARTBEAT_SEQNO in self.listeners:
|
||||
sem = self.listeners[self.HEARTBEAT_SEQNO]
|
||||
self.listeners[self.HEARTBEAT_SEQNO] = msg
|
||||
sem.release()
|
||||
elif msg.cmd == 0x08:
|
||||
self.log.debug("Got status update")
|
||||
self.debug("Got status update")
|
||||
self.listener(msg)
|
||||
else:
|
||||
self.log.debug(
|
||||
self.debug(
|
||||
"Got message type %d for unknown listener %d: %s",
|
||||
msg.cmd,
|
||||
msg.seqno,
|
||||
@@ -292,7 +325,7 @@ class EmptyListener(TuyaListener):
|
||||
"""Device disconnected."""
|
||||
|
||||
|
||||
class TuyaProtocol(asyncio.Protocol):
|
||||
class TuyaProtocol(asyncio.Protocol, ContextualLogger):
|
||||
"""Implementation of the Tuya protocol."""
|
||||
|
||||
def __init__(self, dev_id, local_key, protocol_version, on_connected, listener):
|
||||
@@ -308,7 +341,7 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
port (int): The port to connect to.
|
||||
"""
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.log = TuyaLoggingAdapter(_LOGGER, {"device_id": dev_id})
|
||||
self.set_logger(_LOGGER, dev_id)
|
||||
self.id = dev_id
|
||||
self.local_key = local_key.encode("latin1")
|
||||
self.version = protocol_version
|
||||
@@ -333,22 +366,22 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
if listener is not None:
|
||||
listener.status_updated(self.dps_cache)
|
||||
|
||||
return MessageDispatcher(self.log, _status_update)
|
||||
return MessageDispatcher(self.id, _status_update)
|
||||
|
||||
def connection_made(self, transport):
|
||||
"""Did connect to the device."""
|
||||
|
||||
async def heartbeat_loop():
|
||||
"""Continuously send heart beat updates."""
|
||||
self.log.debug("Started heartbeat loop")
|
||||
self.debug("Started heartbeat loop")
|
||||
while True:
|
||||
try:
|
||||
await self.heartbeat()
|
||||
except Exception as ex:
|
||||
self.log.exception("Heartbeat failed (%s), disconnecting", ex)
|
||||
self.exception("Heartbeat failed (%s), disconnecting", ex)
|
||||
break
|
||||
await asyncio.sleep(HEARTBEAT_INTERVAL)
|
||||
self.log.debug("Stopped heartbeat loop")
|
||||
self.debug("Stopped heartbeat loop")
|
||||
self.close()
|
||||
|
||||
self.transport = transport
|
||||
@@ -361,22 +394,22 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
|
||||
def connection_lost(self, exc):
|
||||
"""Disconnected from device."""
|
||||
self.log.debug("Connection lost: %s", exc)
|
||||
self.debug("Connection lost: %s", exc)
|
||||
try:
|
||||
self.close()
|
||||
except Exception:
|
||||
self.log.exception("Failed to close connection")
|
||||
self.exception("Failed to close connection")
|
||||
finally:
|
||||
try:
|
||||
listener = self.listener()
|
||||
if listener is not None:
|
||||
listener.disconnected(exc)
|
||||
except Exception:
|
||||
self.log.exception("Failed to call disconnected callback")
|
||||
self.exception("Failed to call disconnected callback")
|
||||
|
||||
def close(self):
|
||||
"""Close connection and abort all outstanding listeners."""
|
||||
self.log.debug("Closing connection")
|
||||
self.debug("Closing connection")
|
||||
if self.heartbeater is not None:
|
||||
self.heartbeater.cancel()
|
||||
if self.dispatcher is not None:
|
||||
@@ -388,7 +421,7 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
|
||||
async def exchange(self, command, dps=None):
|
||||
"""Send and receive a message, returning response from device."""
|
||||
self.log.debug(
|
||||
self.debug(
|
||||
"Sending command %s (device type: %s)",
|
||||
command,
|
||||
self.dev_type,
|
||||
@@ -406,7 +439,7 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
self.transport.write(payload)
|
||||
msg = await self.dispatcher.wait_for(seqno)
|
||||
if msg is None:
|
||||
self.log.debug("Wait was aborted for seqno %d", seqno)
|
||||
self.debug("Wait was aborted for seqno %d", seqno)
|
||||
return None
|
||||
|
||||
# TODO: Verify stuff, e.g. CRC sequence number?
|
||||
@@ -414,7 +447,7 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
|
||||
# Perform a new exchange (once) if we switched device type
|
||||
if dev_type != self.dev_type:
|
||||
self.log.debug(
|
||||
self.debug(
|
||||
"Re-send %s due to device type change (%s -> %s)",
|
||||
command,
|
||||
dev_type,
|
||||
@@ -465,14 +498,14 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
try:
|
||||
data = await self.status()
|
||||
except Exception as e:
|
||||
self.log.exception("Failed to get status: %s", e)
|
||||
self.exception("Failed to get status: %s", e)
|
||||
raise
|
||||
if "dps" in data:
|
||||
self.dps_cache.update(data["dps"])
|
||||
|
||||
if self.dev_type == "type_0a":
|
||||
return self.dps_cache
|
||||
self.log.debug("Detected dps: %s", self.dps_cache)
|
||||
self.debug("Detected dps: %s", self.dps_cache)
|
||||
return self.dps_cache
|
||||
|
||||
def add_dps_to_request(self, dp_indicies):
|
||||
@@ -501,17 +534,17 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
|
||||
if "data unvalid" in payload:
|
||||
self.dev_type = "type_0d"
|
||||
self.log.debug(
|
||||
self.debug(
|
||||
"switching to dev_type %s",
|
||||
self.dev_type,
|
||||
)
|
||||
return None
|
||||
else:
|
||||
raise Exception(f"Unexpected payload={payload} (id: {self.id})")
|
||||
raise Exception(f"Unexpected payload={payload}")
|
||||
|
||||
if not isinstance(payload, str):
|
||||
payload = payload.decode()
|
||||
self.log.debug("Decrypted payload: %s", payload)
|
||||
self.debug("Decrypted payload: %s", payload)
|
||||
return json.loads(payload)
|
||||
|
||||
def _generate_payload(self, command, data=None):
|
||||
@@ -543,7 +576,7 @@ class TuyaProtocol(asyncio.Protocol):
|
||||
json_data["dps"] = self.dps_to_request
|
||||
|
||||
payload = json.dumps(json_data).replace(" ", "").encode("utf-8")
|
||||
self.log.debug("Send payload: %s", payload)
|
||||
self.debug("Send payload: %s", payload)
|
||||
|
||||
if self.version == 3.3:
|
||||
payload = self.cipher.encrypt(payload, False)
|
||||
|
Reference in New Issue
Block a user