def testLoopback(self): with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: for loopback in (0, 1): s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK, loopback) self.assertEqual(loopback, s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_LOOPBACK))
def _have_socket_can(): """Check whether CAN sockets are supported on this host.""" try: s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) except (AttributeError, socket.error, OSError): return False else: s.close() return True
def setUp(self): self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) self.addCleanup(self.s.close) try: self.s.bind((self.interface,)) except socket.error: self.skipTest('network interface `%s` does not exist' % self.interface)
def clientSetUp(self): self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) try: self.cli.bind((self.interface,)) except socket.error: # skipTest should not be called here, and will be called in the # server instead pass
def testCreateSocket(self): with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: pass
def testBindAny(self): with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: s.bind(('', ))
def testFilter(self): can_id, can_mask = 0x200, 0x700 can_filter = struct.pack("=II", can_id, can_mask) with socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) as s: s.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, can_filter) self.assertEqual(can_filter, s.getsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_FILTER, 8))
def _have_socket_can(): """Check whether CAN sockets are supported on this host.""" try: s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) except (AttributeError, OSError): return False else: s.close() return True
def setUp(self): self.s = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) self.addCleanup(self.s.close) try: self.s.bind((self.interface,)) except OSError: self.skipTest('network interface `%s` does not exist' % self.interface)
def clientSetUp(self): self.cli = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) try: self.cli.bind((self.interface,)) except OSError: # skipTest should not be called here, and will be called in the # server instead pass
def __init__(self, interface, name=None): if name is None: name = interface self.name = name super().__init__(socket.AF_CAN, socket.SOCK_RAW, socket.CAN_RAW) self.bind((interface,)) # Throws OSError if interface doesn't exist
def do_start(self, params): if self.device and not self._run: try: self.socket = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW) self.socket.setblocking(0) self.socket.bind((self.device,)) self._run = True except Exception as e: self._run = False self.dprint(0, "ERROR: " + str(e)) self.set_error_text("ERROR: " + str(e)) traceback.print_exc()