cros_ec_python.devices.mec

  1import struct
  2import warnings
  3import errno
  4import sys
  5
  6from ..baseclass import CrosEcClass
  7from ..constants.COMMON import *
  8from ..constants.LPC import *
  9from ..constants.MEC import *
 10from ..constants.MEMMAP import *
 11from ..exceptions import ECError
 12
 13from ..ioports import PortIO
 14
 15
 16class CrosEcMec(CrosEcClass):
 17    """
 18    Class to interact with the EC using the MEC LPC interface.
 19    """
 20
 21    def __init__(
 22        self, init: bool = True, portio: PortIO | None = None
 23    ):
 24        """
 25        Detect and initialise the EC.
 26        :param init: Whether to initialise the EC on creation. Default is True.
 27        :param address: Specify a custom memmap address, will be detected if not specified.
 28        :param portio: PortIO object to use. Default is auto-detected.
 29        """
 30
 31        if portio is None:
 32            portio = PortIO()
 33
 34        self.portio: PortIO = portio
 35        """PortIO object to use."""
 36
 37        if init:
 38            self.ec_init()
 39
 40    @staticmethod
 41    def detect() -> bool:
 42        """
 43        Checks for known CrOS EC memory map addresses in `/proc/ioports`.
 44        """
 45        if sys.platform == "linux":
 46            try:
 47                with open("/proc/ioports", "r") as f:
 48                    for line in f:
 49                        if line.lstrip()[:4] == format(EC_HOST_CMD_REGION0, "04x"):
 50                            return True
 51                return False
 52            except FileNotFoundError:
 53                pass
 54        elif sys.platform == "win32":
 55            try:
 56                from wmi import WMI
 57
 58                c = WMI()
 59                for resource in c.Win32_PortResource():
 60                    start_address = int(resource.StartingAddress)
 61                    if start_address == EC_HOST_CMD_REGION0:
 62                        return True
 63                return False
 64            except ImportError:
 65                pass
 66
 67        # Probe for the EC as a last resort
 68        return CrosEcMec.probe()
 69
 70    @staticmethod
 71    def probe(portio: PortIO | None = None) -> bool:
 72        """
 73        Probe the EC memory map address.
 74        :return: True if the EC is found, False otherwise.
 75        """
 76        if portio is None:
 77            portio = PortIO()
 78
 79        # Request I/O permissions
 80        if (
 81            (res := portio.ioperm(EC_LPC_ADDR_HOST_DATA, 1, True))
 82            or (res := portio.ioperm(EC_LPC_ADDR_HOST_CMD, 1, True))
 83            or (
 84                res := portio.ioperm(
 85                    EC_HOST_CMD_REGION0, EC_HOST_CMD_MEC_REGION_SIZE, True
 86                )
 87            )
 88        ):
 89            if res == errno.EPERM:
 90                raise PermissionError("Permission denied. Try running as root.")
 91            warnings.warn(f"ioperm returned {errno.errorcode[res]} ({res})!")
 92
 93        status = 0xFF
 94
 95        # Read status bits, at least one should be 0
 96        status &= portio.inb(EC_LPC_ADDR_HOST_CMD)
 97        status &= portio.inb(EC_LPC_ADDR_HOST_DATA)
 98
 99        if status != 0xFF:
100            # Check for 'EC' in memory map
101            portio.outw(
102                EC_MEC_ADDR_MEMMAP + EC_MEMMAP_ID & 0xFFFC | MEC_ACCESS_TYPE_WORD,
103                MEC_EMI_EC_ADDRESS_B0,
104            )
105            if portio.inw(MEC_EMI_EC_DATA_B0).to_bytes(2, "little") == b"EC":
106                # Found it!
107                return True
108
109        # Nothing here
110        portio.ioperm(EC_LPC_ADDR_HOST_DATA, 1, False)
111        portio.ioperm(EC_LPC_ADDR_HOST_CMD, 1, False)
112        portio.ioperm(EC_HOST_CMD_REGION0, EC_HOST_CMD_MEC_REGION_SIZE, False)
113        return False
114
115    def ec_init(self) -> None:
116        """
117        Initialise the EC.
118        """
119        if not self.probe():
120            raise OSError("Could not find EC!")
121
122    def ec_exit(self) -> None:
123        pass
124
125    def wait_for_ec(self, status_addr: Int32 = EC_LPC_ADDR_HOST_CMD) -> None:
126        """
127        Wait for the EC to be ready after sending a command.
128        :param status_addr: The status register to read.
129        """
130        while self.portio.inb(status_addr) & EC_LPC_STATUS_BUSY_MASK:
131            pass
132
133    def command(
134        self,
135        version: UInt8,
136        command: UInt32,
137        outsize: UInt16,
138        insize: UInt32,
139        data: bytes = None,
140        warn: bool = True,
141    ) -> bytes:
142        """
143        Send a command to the EC and return the response. Uses the v3 command protocol over LPC.
144        :param version: Command version number (often 0).
145        :param command: Command to send (EC_CMD_...).
146        :param outsize: Outgoing length in bytes.
147        :param insize: Max number of bytes to accept from the EC.
148        :param data: Outgoing data to EC.
149        :param warn: Whether to warn if the response size is not as expected. Default is True.
150        :return: Response from the EC.
151        """
152        csum = 0
153        request = bytearray(
154            struct.pack(
155                "BBHBxH", EC_HOST_REQUEST_VERSION, csum, command, version, outsize
156            )
157        )
158        # (struct_version: UInt8, checksum: UInt8, command: UInt16,
159        # command_version: UInt8, reserved: UInt8, data_len: UInt16)
160
161        # Fail if output size is too big
162        if outsize + len(request) > EC_LPC_HOST_PACKET_SIZE:
163            raise ValueError("Output size too big!")
164
165        packet = request + (data or bytes())
166        # Calculate checksum
167        for i in packet:
168            csum += i
169
170        # Write checksum field so the entire packet sums to 0
171        packet[1] = (-csum) & 0xFF
172
173        # Copy data
174        self.mec_xfer(0, len(packet), write=True, data=packet)
175
176        # Start the command
177        self.portio.outb(EC_COMMAND_PROTOCOL_3, EC_LPC_ADDR_HOST_CMD)
178
179        self.wait_for_ec()
180
181        # Check result
182        i = self.portio.inb(EC_LPC_ADDR_HOST_DATA)
183        if i:
184            raise ECError(i)
185
186        # Read back response and start checksum
187        csum = 0
188        data_out = self.mec_xfer(0, struct.calcsize("BBHHH"), write=False)
189        for i in data_out:
190            csum += i
191
192        response = struct.unpack("BBHHH", data_out)
193        # (struct_version: UInt8, checksum: UInt8, result: UInt16, data_len: UInt16, reserved: UInt16)
194
195        if response[0] != EC_HOST_RESPONSE_VERSION:
196            raise IOError("Invalid response version!")
197
198        if response[4]:
199            # Reserved should be 0
200            raise IOError("Invalid response!")
201
202        if response[3] != insize and warn:
203            warnings.warn(
204                f"Expected {insize} bytes, got {response[3]} back from EC",
205                RuntimeWarning,
206            )
207
208        # Read back data
209        if response[3] != 0:
210            data = self.mec_xfer(struct.calcsize("BBHHH"), response[3], write=False)
211            for i in data:
212                csum += i
213        else:
214            data = bytes()
215
216        if csum & 0xFF:
217            raise IOError("Checksum error!")
218
219        return bytes(data)
220
221    def memmap(self, offset: Int32, num_bytes: Int32) -> bytes:
222        """
223        Read memory from the EC.
224        :param offset: Offset to read from.
225        :param num_bytes: Number of bytes to read.
226        :param address: Address of the EC memory map.
227        :return: Bytes read from the EC.
228        """
229        return self.mec_xfer(EC_MEC_ADDR_MEMMAP + offset, num_bytes)
230    
231    def mec_xfer_direct(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
232        """
233        Transfer data to/from the MEC EC using the MEC-specific LPC access method. Direct method for single access.
234        :param address: Address to read/write.
235        :param length: Number of bytes to read/write.
236        :param write: True to write, False to read. Default is False (read).
237        :param data: Data to write if write is True.
238        :return: Data read from the EC if write is False, otherwise None.
239        """
240        if write and (data is None or len(data) < length):
241            raise ValueError("Insufficient data provided for write operation.")
242
243        if address % 4 + length <= 4:
244            access_type = (MEC_ACCESS_TYPE_BYTE, MEC_ACCESS_TYPE_WORD, MEC_ACCESS_TYPE_BYTE, MEC_ACCESS_TYPE_LONG)[length - 1]
245            self.portio.outw(address & 0xFFFC | access_type, MEC_EMI_EC_ADDRESS_B0)
246            if write:
247                match length:
248                    case 1:
249                        self.portio.outb(data[0], MEC_EMI_EC_DATA_B0 + (address % 4))
250                    case 2:
251                        self.portio.outw(int.from_bytes(data[0:2], "little"), MEC_EMI_EC_DATA_B0 + (address % 4))
252                    case 3:
253                        self.portio.outw(int.from_bytes(data[0:2], "little"), MEC_EMI_EC_DATA_B0 + (address % 4))
254                        self.portio.outb(data[2], MEC_EMI_EC_DATA_B2 + (address % 4))
255                    case 4:
256                        self.portio.outl(int.from_bytes(data[0:4], "little"), MEC_EMI_EC_DATA_B0)
257            else:
258                match length:
259                    case 1:
260                        return self.portio.inb(
261                            MEC_EMI_EC_DATA_B0 + (address % 4)
262                        ).to_bytes(1, "little")
263                    case 2:
264                        return self.portio.inw(MEC_EMI_EC_DATA_B0 + (address % 4)).to_bytes(2, "little")
265                    case 3:
266                        word1 = self.portio.inw(MEC_EMI_EC_DATA_B0 + (address % 4)).to_bytes(2, "little")
267                        byte1 = self.portio.inb(MEC_EMI_EC_DATA_B2 + (address % 4)).to_bytes(1, "little")
268                        return word1 + byte1
269                    case 4:
270                        return self.portio.inl(MEC_EMI_EC_DATA_B0).to_bytes(4, "little")
271        else:
272            raise ValueError("Direct access only supports single 1-4 byte accesses.")
273
274    def mec_xfer_aligned_64(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
275        """
276        Transfer data to/from the MEC EC using the MEC-specific LPC access method. Aligned 64-bit method.
277        :param address: Address to read/write.
278        :param length: Number of bytes to read/write.
279        :param write: True to write, False to read. Default is False (read).
280        :param data: Data to write if write is True.
281        :return: Data read from the EC if write is False, otherwise None.
282        """
283        if write and (data is None or len(data) < length):
284            raise ValueError("Insufficient data provided for write operation.")
285
286        if address % 4 == 0 and length % 4 == 0:
287            result = bytearray()
288            self.portio.outw(address & 0xFFFC | MEC_ACCESS_TYPE_LONG_AUTOINCREMENT, MEC_EMI_EC_ADDRESS_B0)
289            for i in range(0, length, 4):
290                if write:
291                    self.portio.outl(int.from_bytes(data[i:i+4], "little"), MEC_EMI_EC_DATA_B0)
292                else:
293                    result.extend(self.portio.inl(MEC_EMI_EC_DATA_B0).to_bytes(4, "little"))
294            if not write:
295                return bytes(result)
296        else:
297            raise ValueError("Aligned 64-bit access requires 4-byte aligned address and length.")
298
299    def mec_xfer(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
300        """
301        Transfer data to/from the MEC EC using the MEC-specific LPC access method.
302        :param address: Address to read/write.
303        :param length: Number of bytes to read/write.
304        :param write: True to write, False to read. Default is False (read).
305        :param data: Data to write if write is True.
306        :return: Data read from the EC if write is False, otherwise None.
307        """
308        if write and (data is None or len(data) < length):
309            raise ValueError("Insufficient data provided for write operation.")
310
311        result = bytearray()
312        # Use some combination of the two methods for unaligned accesses
313        if address % 4 != 0:
314            # Handle unaligned start
315            start_align = 4 - (address % 4)
316            if write:
317                self.mec_xfer_direct(address, start_align, write, data[0:start_align])
318                address += start_align
319                length -= start_align
320                data = data[start_align:]
321            else:
322                part = self.mec_xfer_direct(address, start_align, write)
323                address += start_align
324                length -= start_align
325                result = bytearray(part)
326        # Handle aligned middle
327        middle_length = length - (length % 4)
328        if middle_length > 0:
329            if write:
330                self.mec_xfer_aligned_64(address, middle_length, write, data[0:middle_length])
331                address += middle_length
332                length -= middle_length
333                data = data[middle_length:]
334            else:
335                part = self.mec_xfer_aligned_64(address, middle_length, write)
336                address += middle_length
337                length -= middle_length
338                result.extend(part)
339        # Handle unaligned end
340        if length > 0:
341            if write:
342                self.mec_xfer_direct(address, length, write, data[0:length])
343            else:
344                part = self.mec_xfer_direct(address, length, write)
345                result.extend(part)
346        if not write:
347            return bytes(result)
class CrosEcMec(cros_ec_python.baseclass.CrosEcClass):
 17class CrosEcMec(CrosEcClass):
 18    """
 19    Class to interact with the EC using the MEC LPC interface.
 20    """
 21
 22    def __init__(
 23        self, init: bool = True, portio: PortIO | None = None
 24    ):
 25        """
 26        Detect and initialise the EC.
 27        :param init: Whether to initialise the EC on creation. Default is True.
 28        :param address: Specify a custom memmap address, will be detected if not specified.
 29        :param portio: PortIO object to use. Default is auto-detected.
 30        """
 31
 32        if portio is None:
 33            portio = PortIO()
 34
 35        self.portio: PortIO = portio
 36        """PortIO object to use."""
 37
 38        if init:
 39            self.ec_init()
 40
 41    @staticmethod
 42    def detect() -> bool:
 43        """
 44        Checks for known CrOS EC memory map addresses in `/proc/ioports`.
 45        """
 46        if sys.platform == "linux":
 47            try:
 48                with open("/proc/ioports", "r") as f:
 49                    for line in f:
 50                        if line.lstrip()[:4] == format(EC_HOST_CMD_REGION0, "04x"):
 51                            return True
 52                return False
 53            except FileNotFoundError:
 54                pass
 55        elif sys.platform == "win32":
 56            try:
 57                from wmi import WMI
 58
 59                c = WMI()
 60                for resource in c.Win32_PortResource():
 61                    start_address = int(resource.StartingAddress)
 62                    if start_address == EC_HOST_CMD_REGION0:
 63                        return True
 64                return False
 65            except ImportError:
 66                pass
 67
 68        # Probe for the EC as a last resort
 69        return CrosEcMec.probe()
 70
 71    @staticmethod
 72    def probe(portio: PortIO | None = None) -> bool:
 73        """
 74        Probe the EC memory map address.
 75        :return: True if the EC is found, False otherwise.
 76        """
 77        if portio is None:
 78            portio = PortIO()
 79
 80        # Request I/O permissions
 81        if (
 82            (res := portio.ioperm(EC_LPC_ADDR_HOST_DATA, 1, True))
 83            or (res := portio.ioperm(EC_LPC_ADDR_HOST_CMD, 1, True))
 84            or (
 85                res := portio.ioperm(
 86                    EC_HOST_CMD_REGION0, EC_HOST_CMD_MEC_REGION_SIZE, True
 87                )
 88            )
 89        ):
 90            if res == errno.EPERM:
 91                raise PermissionError("Permission denied. Try running as root.")
 92            warnings.warn(f"ioperm returned {errno.errorcode[res]} ({res})!")
 93
 94        status = 0xFF
 95
 96        # Read status bits, at least one should be 0
 97        status &= portio.inb(EC_LPC_ADDR_HOST_CMD)
 98        status &= portio.inb(EC_LPC_ADDR_HOST_DATA)
 99
100        if status != 0xFF:
101            # Check for 'EC' in memory map
102            portio.outw(
103                EC_MEC_ADDR_MEMMAP + EC_MEMMAP_ID & 0xFFFC | MEC_ACCESS_TYPE_WORD,
104                MEC_EMI_EC_ADDRESS_B0,
105            )
106            if portio.inw(MEC_EMI_EC_DATA_B0).to_bytes(2, "little") == b"EC":
107                # Found it!
108                return True
109
110        # Nothing here
111        portio.ioperm(EC_LPC_ADDR_HOST_DATA, 1, False)
112        portio.ioperm(EC_LPC_ADDR_HOST_CMD, 1, False)
113        portio.ioperm(EC_HOST_CMD_REGION0, EC_HOST_CMD_MEC_REGION_SIZE, False)
114        return False
115
116    def ec_init(self) -> None:
117        """
118        Initialise the EC.
119        """
120        if not self.probe():
121            raise OSError("Could not find EC!")
122
123    def ec_exit(self) -> None:
124        pass
125
126    def wait_for_ec(self, status_addr: Int32 = EC_LPC_ADDR_HOST_CMD) -> None:
127        """
128        Wait for the EC to be ready after sending a command.
129        :param status_addr: The status register to read.
130        """
131        while self.portio.inb(status_addr) & EC_LPC_STATUS_BUSY_MASK:
132            pass
133
134    def command(
135        self,
136        version: UInt8,
137        command: UInt32,
138        outsize: UInt16,
139        insize: UInt32,
140        data: bytes = None,
141        warn: bool = True,
142    ) -> bytes:
143        """
144        Send a command to the EC and return the response. Uses the v3 command protocol over LPC.
145        :param version: Command version number (often 0).
146        :param command: Command to send (EC_CMD_...).
147        :param outsize: Outgoing length in bytes.
148        :param insize: Max number of bytes to accept from the EC.
149        :param data: Outgoing data to EC.
150        :param warn: Whether to warn if the response size is not as expected. Default is True.
151        :return: Response from the EC.
152        """
153        csum = 0
154        request = bytearray(
155            struct.pack(
156                "BBHBxH", EC_HOST_REQUEST_VERSION, csum, command, version, outsize
157            )
158        )
159        # (struct_version: UInt8, checksum: UInt8, command: UInt16,
160        # command_version: UInt8, reserved: UInt8, data_len: UInt16)
161
162        # Fail if output size is too big
163        if outsize + len(request) > EC_LPC_HOST_PACKET_SIZE:
164            raise ValueError("Output size too big!")
165
166        packet = request + (data or bytes())
167        # Calculate checksum
168        for i in packet:
169            csum += i
170
171        # Write checksum field so the entire packet sums to 0
172        packet[1] = (-csum) & 0xFF
173
174        # Copy data
175        self.mec_xfer(0, len(packet), write=True, data=packet)
176
177        # Start the command
178        self.portio.outb(EC_COMMAND_PROTOCOL_3, EC_LPC_ADDR_HOST_CMD)
179
180        self.wait_for_ec()
181
182        # Check result
183        i = self.portio.inb(EC_LPC_ADDR_HOST_DATA)
184        if i:
185            raise ECError(i)
186
187        # Read back response and start checksum
188        csum = 0
189        data_out = self.mec_xfer(0, struct.calcsize("BBHHH"), write=False)
190        for i in data_out:
191            csum += i
192
193        response = struct.unpack("BBHHH", data_out)
194        # (struct_version: UInt8, checksum: UInt8, result: UInt16, data_len: UInt16, reserved: UInt16)
195
196        if response[0] != EC_HOST_RESPONSE_VERSION:
197            raise IOError("Invalid response version!")
198
199        if response[4]:
200            # Reserved should be 0
201            raise IOError("Invalid response!")
202
203        if response[3] != insize and warn:
204            warnings.warn(
205                f"Expected {insize} bytes, got {response[3]} back from EC",
206                RuntimeWarning,
207            )
208
209        # Read back data
210        if response[3] != 0:
211            data = self.mec_xfer(struct.calcsize("BBHHH"), response[3], write=False)
212            for i in data:
213                csum += i
214        else:
215            data = bytes()
216
217        if csum & 0xFF:
218            raise IOError("Checksum error!")
219
220        return bytes(data)
221
222    def memmap(self, offset: Int32, num_bytes: Int32) -> bytes:
223        """
224        Read memory from the EC.
225        :param offset: Offset to read from.
226        :param num_bytes: Number of bytes to read.
227        :param address: Address of the EC memory map.
228        :return: Bytes read from the EC.
229        """
230        return self.mec_xfer(EC_MEC_ADDR_MEMMAP + offset, num_bytes)
231    
232    def mec_xfer_direct(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
233        """
234        Transfer data to/from the MEC EC using the MEC-specific LPC access method. Direct method for single access.
235        :param address: Address to read/write.
236        :param length: Number of bytes to read/write.
237        :param write: True to write, False to read. Default is False (read).
238        :param data: Data to write if write is True.
239        :return: Data read from the EC if write is False, otherwise None.
240        """
241        if write and (data is None or len(data) < length):
242            raise ValueError("Insufficient data provided for write operation.")
243
244        if address % 4 + length <= 4:
245            access_type = (MEC_ACCESS_TYPE_BYTE, MEC_ACCESS_TYPE_WORD, MEC_ACCESS_TYPE_BYTE, MEC_ACCESS_TYPE_LONG)[length - 1]
246            self.portio.outw(address & 0xFFFC | access_type, MEC_EMI_EC_ADDRESS_B0)
247            if write:
248                match length:
249                    case 1:
250                        self.portio.outb(data[0], MEC_EMI_EC_DATA_B0 + (address % 4))
251                    case 2:
252                        self.portio.outw(int.from_bytes(data[0:2], "little"), MEC_EMI_EC_DATA_B0 + (address % 4))
253                    case 3:
254                        self.portio.outw(int.from_bytes(data[0:2], "little"), MEC_EMI_EC_DATA_B0 + (address % 4))
255                        self.portio.outb(data[2], MEC_EMI_EC_DATA_B2 + (address % 4))
256                    case 4:
257                        self.portio.outl(int.from_bytes(data[0:4], "little"), MEC_EMI_EC_DATA_B0)
258            else:
259                match length:
260                    case 1:
261                        return self.portio.inb(
262                            MEC_EMI_EC_DATA_B0 + (address % 4)
263                        ).to_bytes(1, "little")
264                    case 2:
265                        return self.portio.inw(MEC_EMI_EC_DATA_B0 + (address % 4)).to_bytes(2, "little")
266                    case 3:
267                        word1 = self.portio.inw(MEC_EMI_EC_DATA_B0 + (address % 4)).to_bytes(2, "little")
268                        byte1 = self.portio.inb(MEC_EMI_EC_DATA_B2 + (address % 4)).to_bytes(1, "little")
269                        return word1 + byte1
270                    case 4:
271                        return self.portio.inl(MEC_EMI_EC_DATA_B0).to_bytes(4, "little")
272        else:
273            raise ValueError("Direct access only supports single 1-4 byte accesses.")
274
275    def mec_xfer_aligned_64(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
276        """
277        Transfer data to/from the MEC EC using the MEC-specific LPC access method. Aligned 64-bit method.
278        :param address: Address to read/write.
279        :param length: Number of bytes to read/write.
280        :param write: True to write, False to read. Default is False (read).
281        :param data: Data to write if write is True.
282        :return: Data read from the EC if write is False, otherwise None.
283        """
284        if write and (data is None or len(data) < length):
285            raise ValueError("Insufficient data provided for write operation.")
286
287        if address % 4 == 0 and length % 4 == 0:
288            result = bytearray()
289            self.portio.outw(address & 0xFFFC | MEC_ACCESS_TYPE_LONG_AUTOINCREMENT, MEC_EMI_EC_ADDRESS_B0)
290            for i in range(0, length, 4):
291                if write:
292                    self.portio.outl(int.from_bytes(data[i:i+4], "little"), MEC_EMI_EC_DATA_B0)
293                else:
294                    result.extend(self.portio.inl(MEC_EMI_EC_DATA_B0).to_bytes(4, "little"))
295            if not write:
296                return bytes(result)
297        else:
298            raise ValueError("Aligned 64-bit access requires 4-byte aligned address and length.")
299
300    def mec_xfer(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
301        """
302        Transfer data to/from the MEC EC using the MEC-specific LPC access method.
303        :param address: Address to read/write.
304        :param length: Number of bytes to read/write.
305        :param write: True to write, False to read. Default is False (read).
306        :param data: Data to write if write is True.
307        :return: Data read from the EC if write is False, otherwise None.
308        """
309        if write and (data is None or len(data) < length):
310            raise ValueError("Insufficient data provided for write operation.")
311
312        result = bytearray()
313        # Use some combination of the two methods for unaligned accesses
314        if address % 4 != 0:
315            # Handle unaligned start
316            start_align = 4 - (address % 4)
317            if write:
318                self.mec_xfer_direct(address, start_align, write, data[0:start_align])
319                address += start_align
320                length -= start_align
321                data = data[start_align:]
322            else:
323                part = self.mec_xfer_direct(address, start_align, write)
324                address += start_align
325                length -= start_align
326                result = bytearray(part)
327        # Handle aligned middle
328        middle_length = length - (length % 4)
329        if middle_length > 0:
330            if write:
331                self.mec_xfer_aligned_64(address, middle_length, write, data[0:middle_length])
332                address += middle_length
333                length -= middle_length
334                data = data[middle_length:]
335            else:
336                part = self.mec_xfer_aligned_64(address, middle_length, write)
337                address += middle_length
338                length -= middle_length
339                result.extend(part)
340        # Handle unaligned end
341        if length > 0:
342            if write:
343                self.mec_xfer_direct(address, length, write, data[0:length])
344            else:
345                part = self.mec_xfer_direct(address, length, write)
346                result.extend(part)
347        if not write:
348            return bytes(result)

Class to interact with the EC using the MEC LPC interface.

CrosEcMec( init: bool = True, portio: cros_ec_python.ioports.x86portio.IoPortIo | None = None)
22    def __init__(
23        self, init: bool = True, portio: PortIO | None = None
24    ):
25        """
26        Detect and initialise the EC.
27        :param init: Whether to initialise the EC on creation. Default is True.
28        :param address: Specify a custom memmap address, will be detected if not specified.
29        :param portio: PortIO object to use. Default is auto-detected.
30        """
31
32        if portio is None:
33            portio = PortIO()
34
35        self.portio: PortIO = portio
36        """PortIO object to use."""
37
38        if init:
39            self.ec_init()

Detect and initialise the EC.

Parameters
  • init: Whether to initialise the EC on creation. Default is True.
  • address: Specify a custom memmap address, will be detected if not specified.
  • portio: PortIO object to use. Default is auto-detected.

PortIO object to use.

@staticmethod
def detect() -> bool:
41    @staticmethod
42    def detect() -> bool:
43        """
44        Checks for known CrOS EC memory map addresses in `/proc/ioports`.
45        """
46        if sys.platform == "linux":
47            try:
48                with open("/proc/ioports", "r") as f:
49                    for line in f:
50                        if line.lstrip()[:4] == format(EC_HOST_CMD_REGION0, "04x"):
51                            return True
52                return False
53            except FileNotFoundError:
54                pass
55        elif sys.platform == "win32":
56            try:
57                from wmi import WMI
58
59                c = WMI()
60                for resource in c.Win32_PortResource():
61                    start_address = int(resource.StartingAddress)
62                    if start_address == EC_HOST_CMD_REGION0:
63                        return True
64                return False
65            except ImportError:
66                pass
67
68        # Probe for the EC as a last resort
69        return CrosEcMec.probe()

Checks for known CrOS EC memory map addresses in /proc/ioports.

@staticmethod
def probe(portio: cros_ec_python.ioports.x86portio.IoPortIo | None = None) -> bool:
 71    @staticmethod
 72    def probe(portio: PortIO | None = None) -> bool:
 73        """
 74        Probe the EC memory map address.
 75        :return: True if the EC is found, False otherwise.
 76        """
 77        if portio is None:
 78            portio = PortIO()
 79
 80        # Request I/O permissions
 81        if (
 82            (res := portio.ioperm(EC_LPC_ADDR_HOST_DATA, 1, True))
 83            or (res := portio.ioperm(EC_LPC_ADDR_HOST_CMD, 1, True))
 84            or (
 85                res := portio.ioperm(
 86                    EC_HOST_CMD_REGION0, EC_HOST_CMD_MEC_REGION_SIZE, True
 87                )
 88            )
 89        ):
 90            if res == errno.EPERM:
 91                raise PermissionError("Permission denied. Try running as root.")
 92            warnings.warn(f"ioperm returned {errno.errorcode[res]} ({res})!")
 93
 94        status = 0xFF
 95
 96        # Read status bits, at least one should be 0
 97        status &= portio.inb(EC_LPC_ADDR_HOST_CMD)
 98        status &= portio.inb(EC_LPC_ADDR_HOST_DATA)
 99
100        if status != 0xFF:
101            # Check for 'EC' in memory map
102            portio.outw(
103                EC_MEC_ADDR_MEMMAP + EC_MEMMAP_ID & 0xFFFC | MEC_ACCESS_TYPE_WORD,
104                MEC_EMI_EC_ADDRESS_B0,
105            )
106            if portio.inw(MEC_EMI_EC_DATA_B0).to_bytes(2, "little") == b"EC":
107                # Found it!
108                return True
109
110        # Nothing here
111        portio.ioperm(EC_LPC_ADDR_HOST_DATA, 1, False)
112        portio.ioperm(EC_LPC_ADDR_HOST_CMD, 1, False)
113        portio.ioperm(EC_HOST_CMD_REGION0, EC_HOST_CMD_MEC_REGION_SIZE, False)
114        return False

Probe the EC memory map address.

Returns

True if the EC is found, False otherwise.

def ec_init(self) -> None:
116    def ec_init(self) -> None:
117        """
118        Initialise the EC.
119        """
120        if not self.probe():
121            raise OSError("Could not find EC!")

Initialise the EC.

def ec_exit(self) -> None:
123    def ec_exit(self) -> None:
124        pass

Close connection to the EC.

def wait_for_ec(self, status_addr: int = 516) -> None:
126    def wait_for_ec(self, status_addr: Int32 = EC_LPC_ADDR_HOST_CMD) -> None:
127        """
128        Wait for the EC to be ready after sending a command.
129        :param status_addr: The status register to read.
130        """
131        while self.portio.inb(status_addr) & EC_LPC_STATUS_BUSY_MASK:
132            pass

Wait for the EC to be ready after sending a command.

Parameters
  • status_addr: The status register to read.
def command( self, version: int, command: int, outsize: int, insize: int, data: bytes = None, warn: bool = True) -> bytes:
134    def command(
135        self,
136        version: UInt8,
137        command: UInt32,
138        outsize: UInt16,
139        insize: UInt32,
140        data: bytes = None,
141        warn: bool = True,
142    ) -> bytes:
143        """
144        Send a command to the EC and return the response. Uses the v3 command protocol over LPC.
145        :param version: Command version number (often 0).
146        :param command: Command to send (EC_CMD_...).
147        :param outsize: Outgoing length in bytes.
148        :param insize: Max number of bytes to accept from the EC.
149        :param data: Outgoing data to EC.
150        :param warn: Whether to warn if the response size is not as expected. Default is True.
151        :return: Response from the EC.
152        """
153        csum = 0
154        request = bytearray(
155            struct.pack(
156                "BBHBxH", EC_HOST_REQUEST_VERSION, csum, command, version, outsize
157            )
158        )
159        # (struct_version: UInt8, checksum: UInt8, command: UInt16,
160        # command_version: UInt8, reserved: UInt8, data_len: UInt16)
161
162        # Fail if output size is too big
163        if outsize + len(request) > EC_LPC_HOST_PACKET_SIZE:
164            raise ValueError("Output size too big!")
165
166        packet = request + (data or bytes())
167        # Calculate checksum
168        for i in packet:
169            csum += i
170
171        # Write checksum field so the entire packet sums to 0
172        packet[1] = (-csum) & 0xFF
173
174        # Copy data
175        self.mec_xfer(0, len(packet), write=True, data=packet)
176
177        # Start the command
178        self.portio.outb(EC_COMMAND_PROTOCOL_3, EC_LPC_ADDR_HOST_CMD)
179
180        self.wait_for_ec()
181
182        # Check result
183        i = self.portio.inb(EC_LPC_ADDR_HOST_DATA)
184        if i:
185            raise ECError(i)
186
187        # Read back response and start checksum
188        csum = 0
189        data_out = self.mec_xfer(0, struct.calcsize("BBHHH"), write=False)
190        for i in data_out:
191            csum += i
192
193        response = struct.unpack("BBHHH", data_out)
194        # (struct_version: UInt8, checksum: UInt8, result: UInt16, data_len: UInt16, reserved: UInt16)
195
196        if response[0] != EC_HOST_RESPONSE_VERSION:
197            raise IOError("Invalid response version!")
198
199        if response[4]:
200            # Reserved should be 0
201            raise IOError("Invalid response!")
202
203        if response[3] != insize and warn:
204            warnings.warn(
205                f"Expected {insize} bytes, got {response[3]} back from EC",
206                RuntimeWarning,
207            )
208
209        # Read back data
210        if response[3] != 0:
211            data = self.mec_xfer(struct.calcsize("BBHHH"), response[3], write=False)
212            for i in data:
213                csum += i
214        else:
215            data = bytes()
216
217        if csum & 0xFF:
218            raise IOError("Checksum error!")
219
220        return bytes(data)

Send a command to the EC and return the response. Uses the v3 command protocol over LPC.

Parameters
  • version: Command version number (often 0).
  • command: Command to send (EC_CMD_...).
  • outsize: Outgoing length in bytes.
  • insize: Max number of bytes to accept from the EC.
  • data: Outgoing data to EC.
  • warn: Whether to warn if the response size is not as expected. Default is True.
Returns

Response from the EC.

def memmap(self, offset: int, num_bytes: int) -> bytes:
222    def memmap(self, offset: Int32, num_bytes: Int32) -> bytes:
223        """
224        Read memory from the EC.
225        :param offset: Offset to read from.
226        :param num_bytes: Number of bytes to read.
227        :param address: Address of the EC memory map.
228        :return: Bytes read from the EC.
229        """
230        return self.mec_xfer(EC_MEC_ADDR_MEMMAP + offset, num_bytes)

Read memory from the EC.

Parameters
  • offset: Offset to read from.
  • num_bytes: Number of bytes to read.
  • address: Address of the EC memory map.
Returns

Bytes read from the EC.

def mec_xfer_direct( self, address: int, length: int, write: bool = False, data: bytes = None) -> bytes | None:
232    def mec_xfer_direct(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
233        """
234        Transfer data to/from the MEC EC using the MEC-specific LPC access method. Direct method for single access.
235        :param address: Address to read/write.
236        :param length: Number of bytes to read/write.
237        :param write: True to write, False to read. Default is False (read).
238        :param data: Data to write if write is True.
239        :return: Data read from the EC if write is False, otherwise None.
240        """
241        if write and (data is None or len(data) < length):
242            raise ValueError("Insufficient data provided for write operation.")
243
244        if address % 4 + length <= 4:
245            access_type = (MEC_ACCESS_TYPE_BYTE, MEC_ACCESS_TYPE_WORD, MEC_ACCESS_TYPE_BYTE, MEC_ACCESS_TYPE_LONG)[length - 1]
246            self.portio.outw(address & 0xFFFC | access_type, MEC_EMI_EC_ADDRESS_B0)
247            if write:
248                match length:
249                    case 1:
250                        self.portio.outb(data[0], MEC_EMI_EC_DATA_B0 + (address % 4))
251                    case 2:
252                        self.portio.outw(int.from_bytes(data[0:2], "little"), MEC_EMI_EC_DATA_B0 + (address % 4))
253                    case 3:
254                        self.portio.outw(int.from_bytes(data[0:2], "little"), MEC_EMI_EC_DATA_B0 + (address % 4))
255                        self.portio.outb(data[2], MEC_EMI_EC_DATA_B2 + (address % 4))
256                    case 4:
257                        self.portio.outl(int.from_bytes(data[0:4], "little"), MEC_EMI_EC_DATA_B0)
258            else:
259                match length:
260                    case 1:
261                        return self.portio.inb(
262                            MEC_EMI_EC_DATA_B0 + (address % 4)
263                        ).to_bytes(1, "little")
264                    case 2:
265                        return self.portio.inw(MEC_EMI_EC_DATA_B0 + (address % 4)).to_bytes(2, "little")
266                    case 3:
267                        word1 = self.portio.inw(MEC_EMI_EC_DATA_B0 + (address % 4)).to_bytes(2, "little")
268                        byte1 = self.portio.inb(MEC_EMI_EC_DATA_B2 + (address % 4)).to_bytes(1, "little")
269                        return word1 + byte1
270                    case 4:
271                        return self.portio.inl(MEC_EMI_EC_DATA_B0).to_bytes(4, "little")
272        else:
273            raise ValueError("Direct access only supports single 1-4 byte accesses.")

Transfer data to/from the MEC EC using the MEC-specific LPC access method. Direct method for single access.

Parameters
  • address: Address to read/write.
  • length: Number of bytes to read/write.
  • write: True to write, False to read. Default is False (read).
  • data: Data to write if write is True.
Returns

Data read from the EC if write is False, otherwise None.

def mec_xfer_aligned_64( self, address: int, length: int, write: bool = False, data: bytes = None) -> bytes | None:
275    def mec_xfer_aligned_64(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
276        """
277        Transfer data to/from the MEC EC using the MEC-specific LPC access method. Aligned 64-bit method.
278        :param address: Address to read/write.
279        :param length: Number of bytes to read/write.
280        :param write: True to write, False to read. Default is False (read).
281        :param data: Data to write if write is True.
282        :return: Data read from the EC if write is False, otherwise None.
283        """
284        if write and (data is None or len(data) < length):
285            raise ValueError("Insufficient data provided for write operation.")
286
287        if address % 4 == 0 and length % 4 == 0:
288            result = bytearray()
289            self.portio.outw(address & 0xFFFC | MEC_ACCESS_TYPE_LONG_AUTOINCREMENT, MEC_EMI_EC_ADDRESS_B0)
290            for i in range(0, length, 4):
291                if write:
292                    self.portio.outl(int.from_bytes(data[i:i+4], "little"), MEC_EMI_EC_DATA_B0)
293                else:
294                    result.extend(self.portio.inl(MEC_EMI_EC_DATA_B0).to_bytes(4, "little"))
295            if not write:
296                return bytes(result)
297        else:
298            raise ValueError("Aligned 64-bit access requires 4-byte aligned address and length.")

Transfer data to/from the MEC EC using the MEC-specific LPC access method. Aligned 64-bit method.

Parameters
  • address: Address to read/write.
  • length: Number of bytes to read/write.
  • write: True to write, False to read. Default is False (read).
  • data: Data to write if write is True.
Returns

Data read from the EC if write is False, otherwise None.

def mec_xfer( self, address: int, length: int, write: bool = False, data: bytes = None) -> bytes | None:
300    def mec_xfer(self, address: Int32, length: Int32, write: bool = False, data: bytes = None) -> bytes | None:
301        """
302        Transfer data to/from the MEC EC using the MEC-specific LPC access method.
303        :param address: Address to read/write.
304        :param length: Number of bytes to read/write.
305        :param write: True to write, False to read. Default is False (read).
306        :param data: Data to write if write is True.
307        :return: Data read from the EC if write is False, otherwise None.
308        """
309        if write and (data is None or len(data) < length):
310            raise ValueError("Insufficient data provided for write operation.")
311
312        result = bytearray()
313        # Use some combination of the two methods for unaligned accesses
314        if address % 4 != 0:
315            # Handle unaligned start
316            start_align = 4 - (address % 4)
317            if write:
318                self.mec_xfer_direct(address, start_align, write, data[0:start_align])
319                address += start_align
320                length -= start_align
321                data = data[start_align:]
322            else:
323                part = self.mec_xfer_direct(address, start_align, write)
324                address += start_align
325                length -= start_align
326                result = bytearray(part)
327        # Handle aligned middle
328        middle_length = length - (length % 4)
329        if middle_length > 0:
330            if write:
331                self.mec_xfer_aligned_64(address, middle_length, write, data[0:middle_length])
332                address += middle_length
333                length -= middle_length
334                data = data[middle_length:]
335            else:
336                part = self.mec_xfer_aligned_64(address, middle_length, write)
337                address += middle_length
338                length -= middle_length
339                result.extend(part)
340        # Handle unaligned end
341        if length > 0:
342            if write:
343                self.mec_xfer_direct(address, length, write, data[0:length])
344            else:
345                part = self.mec_xfer_direct(address, length, write)
346                result.extend(part)
347        if not write:
348            return bytes(result)

Transfer data to/from the MEC EC using the MEC-specific LPC access method.

Parameters
  • address: Address to read/write.
  • length: Number of bytes to read/write.
  • write: True to write, False to read. Default is False (read).
  • data: Data to write if write is True.
Returns

Data read from the EC if write is False, otherwise None.