flake8 cleanup of fauxmo

This commit is contained in:
Emil Lerch 2019-07-19 18:51:38 -07:00
parent 38fcf3fbae
commit 0e534d8d9d
Signed by: lobo
GPG Key ID: CEC5F37C1BE5A481

View File

@ -33,11 +33,8 @@ import socket
import struct import struct
import sys import sys
import time import time
import urllib
import uuid import uuid
# This XML is the minimum needed to define one of our virtual switches # This XML is the minimum needed to define one of our virtual switches
# to the Amazon Echo # to the Amazon Echo
@ -57,6 +54,7 @@ SETUP_XML = """<?xml version="1.0"?>
DEBUG = False DEBUG = False
def dbg(msg): def dbg(msg):
global DEBUG global DEBUG
if DEBUG: if DEBUG:
@ -76,21 +74,21 @@ class poller:
self.use_poll = False self.use_poll = False
self.targets = {} self.targets = {}
def add(self, target, fileno = None): def add(self, target, fileno=None):
if not fileno: if not fileno:
fileno = target.fileno() fileno = target.fileno()
if self.use_poll: if self.use_poll:
self.poller.register(fileno, select.POLLIN) self.poller.register(fileno, select.POLLIN)
self.targets[fileno] = target self.targets[fileno] = target
def remove(self, target, fileno = None): def remove(self, target, fileno=None):
if not fileno: if not fileno:
fileno = target.fileno() fileno = target.fileno()
if self.use_poll: if self.use_poll:
self.poller.unregister(fileno) self.poller.unregister(fileno)
del(self.targets[fileno]) del(self.targets[fileno])
def poll(self, timeout = 0): def poll(self, timeout=0):
if self.use_poll: if self.use_poll:
ready = self.poller.poll(timeout) ready = self.poller.poll(timeout)
else: else:
@ -124,8 +122,7 @@ class upnp_device(object):
dbg("got local address of %s" % upnp_device.this_host_ip) dbg("got local address of %s" % upnp_device.this_host_ip)
return upnp_device.this_host_ip return upnp_device.this_host_ip
def __init__(self, listener, poller, port, root_url, server_version, persistent_uuid, other_headers=None, ip_address=None):
def __init__(self, listener, poller, port, root_url, server_version, persistent_uuid, other_headers = None, ip_address = None):
self.listener = listener self.listener = listener
self.poller = poller self.poller = poller
self.port = port self.port = port
@ -174,17 +171,17 @@ class upnp_device(object):
def respond_to_search(self, destination, search_target): def respond_to_search(self, destination, search_target):
dbg("Responding to search for %s" % self.get_name()) dbg("Responding to search for %s" % self.get_name())
date_str = email.utils.formatdate(timeval=None, localtime=False, usegmt=True) date_str = email.utils.formatdate(timeval=None, localtime=False, usegmt=True)
location_url = self.root_url % {'ip_address' : self.ip_address, 'port' : self.port} location_url = self.root_url % {'ip_address': self.ip_address, 'port': self.port}
message = ("HTTP/1.1 200 OK\r\n" message = ("HTTP/1.1 200 OK\r\n"
"CACHE-CONTROL: max-age=86400\r\n" "CACHE-CONTROL: max-age=86400\r\n"
"DATE: %s\r\n" "DATE: %s\r\n"
"EXT:\r\n" "EXT:\r\n"
"LOCATION: %s\r\n" "LOCATION: %s\r\n"
"OPT: \"http://schemas.upnp.org/upnp/1/0/\"; ns=01\r\n" "OPT: \"http://schemas.upnp.org/upnp/1/0/\"; ns=01\r\n"
"01-NLS: %s\r\n" "01-NLS: %s\r\n"
"SERVER: %s\r\n" "SERVER: %s\r\n"
"ST: %s\r\n" "ST: %s\r\n"
"USN: uuid:%s::%s\r\n" % (date_str, location_url, self.uuid, self.server_version, search_target, self.persistent_uuid, search_target)) "USN: uuid:%s::%s\r\n" % (date_str, location_url, self.uuid, self.server_version, search_target, self.persistent_uuid, search_target))
if self.other_headers: if self.other_headers:
for header in self.other_headers: for header in self.other_headers:
message += "%s\r\n" % header message += "%s\r\n" % header
@ -193,14 +190,14 @@ class upnp_device(object):
temp_socket.sendto(message, destination) temp_socket.sendto(message, destination)
# This subclass does the bulk of the work to mimic a WeMo switch on the network. # This subclass does the bulk of the work to mimic a WeMo switch on the network
class fauxmo(upnp_device): class fauxmo(upnp_device):
@staticmethod @staticmethod
def make_uuid(name): def make_uuid(name):
return ''.join(["%x" % sum([ord(c) for c in name])] + ["%x" % ord(c) for c in "%sfauxmo!" % name])[:14] return ''.join(["%x" % sum([ord(c) for c in name])] + ["%x" % ord(c) for c in "%sfauxmo!" % name])[:14]
def __init__(self, name, listener, poller, ip_address, port, action_handler = None): def __init__(self, name, listener, poller, ip_address, port, action_handler=None):
self.serial = self.make_uuid(name) self.serial = self.make_uuid(name)
self.name = name self.name = name
self.ip_address = ip_address self.ip_address = ip_address
@ -219,7 +216,7 @@ class fauxmo(upnp_device):
def handle_request(self, data, sender, socket): def handle_request(self, data, sender, socket):
if data.find('GET /setup.xml HTTP/1.1') == 0: if data.find('GET /setup.xml HTTP/1.1') == 0:
dbg("Responding to setup.xml for %s" % self.name) dbg("Responding to setup.xml for %s" % self.name)
xml = SETUP_XML % {'device_name' : self.name, 'device_serial' : self.serial} xml = SETUP_XML % {'device_name': self.name, 'device_serial': self.serial}
date_str = email.utils.formatdate(timeval=None, localtime=False, usegmt=True) date_str = email.utils.formatdate(timeval=None, localtime=False, usegmt=True)
message = ("HTTP/1.1 200 OK\r\n" message = ("HTTP/1.1 200 OK\r\n"
"CONTENT-LENGTH: %d\r\n" "CONTENT-LENGTH: %d\r\n"
@ -291,27 +288,27 @@ class upnp_broadcast_responder(object):
self.ip = '239.255.255.250' self.ip = '239.255.255.250'
self.port = 1900 self.port = 1900
try: try:
#This is needed to join a multicast group # This is needed to join a multicast group
self.mreq = struct.pack("4sl",socket.inet_aton(self.ip),socket.INADDR_ANY) self.mreq = struct.pack("4sl", socket.inet_aton(self.ip), socket.INADDR_ANY)
#Set up server socket # Set up server socket
self.ssock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,socket.IPPROTO_UDP) self.ssock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.ssock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) self.ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try: try:
self.ssock.bind(('',self.port)) self.ssock.bind(('', self.port))
except Exception, e: except Exception, e:
dbg("WARNING: Failed to bind %s:%d: %s" , (self.ip,self.port,e)) dbg("WARNING: Failed to bind %s:%d: %s", (self.ip, self.port, e))
ok = False ok = False
try: try:
self.ssock.setsockopt(socket.IPPROTO_IP,socket.IP_ADD_MEMBERSHIP,self.mreq) self.ssock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, self.mreq)
except Exception, e: except Exception, e:
dbg('WARNING: Failed to join multicast group:',e) dbg('WARNING: Failed to join multicast group:', e)
ok = False ok = False
except Exception, e: except Exception, e:
dbg("Failed to initialize UPnP sockets:",e) dbg("Failed to initialize UPnP sockets:", e)
return False return False
if ok: if ok:
dbg("Listening for UPnP broadcasts") dbg("Listening for UPnP broadcasts")
@ -329,8 +326,8 @@ class upnp_broadcast_responder(object):
else: else:
pass pass
#Receive network data # Receive network data
def recvfrom(self,size): def recvfrom(self, size):
if self.TIMEOUT: if self.TIMEOUT:
self.ssock.setblocking(0) self.ssock.setblocking(0)
ready = select.select([self.ssock], [], [], self.TIMEOUT)[0] ready = select.select([self.ssock], [], [], self.TIMEOUT)[0]
@ -408,7 +405,7 @@ for one_faux in FAUXMOS:
if len(one_faux) == 2: if len(one_faux) == 2:
# a fixed port wasn't specified, use a dynamic one # a fixed port wasn't specified, use a dynamic one
one_faux.append(0) one_faux.append(0)
switch = fauxmo(one_faux[0], u, p, None, one_faux[2], action_handler = one_faux[1]) switch = fauxmo(one_faux[0], u, p, None, one_faux[2], action_handler=one_faux[1])
dbg("Entering main loop\n") dbg("Entering main loop\n")
@ -420,4 +417,3 @@ while True:
except Exception, e: except Exception, e:
dbg(e) dbg(e)
break break