我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用xbmc.LOGDEBUG。
def log(msg, level=xbmc.LOGNOTICE): """ Outputs message to log file :param msg: message to output :param level: debug levelxbmc. Values: xbmc.LOGDEBUG = 0 xbmc.LOGERROR = 4 xbmc.LOGFATAL = 6 xbmc.LOGINFO = 1 xbmc.LOGNONE = 7 xbmc.LOGNOTICE = 2 xbmc.LOGSEVERE = 5 xbmc.LOGWARNING = 3 """ log_message = u'{0}: {1}'.format(addonID, msg) xbmc.log(log_message.encode("utf-8"), level)
def run(self): self.daemon_active = True while not self.__exit: log_msg("Start Spotify Connect Daemon") #spotty_args = ["-v"] spotty_args = ["--onstart", "curl -s -f -m 2 http://localhost:%s/playercmd/start" % PROXY_PORT, "--onstop", "curl -s -f -m 2 http://localhost:%s/playercmd/stop" % PROXY_PORT] self.__spotty_proc = self.__spotty.run_spotty(arguments=spotty_args) thread.start_new_thread(self.fill_fake_buffer, ()) while not self.__exit: line = self.__spotty_proc.stderr.readline().strip() if line: log_msg(line, xbmc.LOGDEBUG) if self.__spotty_proc.returncode and self.__spotty_proc.returncode > 0 and not self.__exit: # daemon crashed ? restart ? break self.daemon_active = False log_msg("Stopped Spotify Connect Daemon")
def __next__(self): # For the most part, the buffer-filling thread should prevent the need for waiting here, # but wait exponentially (up to about 32 seconds) for it to fill before giving up. log_msg("Spotify radio track buffer asked for next item", xbmc.LOGDEBUG) attempts = 0 while attempts <= 5: self._buffer_lock.acquire() if len(self._buffer) <= self.MIN_BUFFER_SIZE: self._buffer_lock.release() sleep_time = pow(2, attempts) log_msg("Spotify radio track buffer empty, sleeping for %d seconds" % sleep_time, xbmc.LOGDEBUG) time.sleep(sleep_time) attempts += 1 else: track = self._buffer.pop(0) self._buffer_lock.release() log_msg("Got track '%s' from Spotify radio track buffer" % track["id"], xbmc.LOGDEBUG) return track raise StopIteration # Support both Python 2.7 & Python 3.0
def onInit(self): xbmc.log(msg="[Match Center] Script started", level=xbmc.LOGDEBUG) if os.path.exists(ignored_league_list_file): self.ignored_leagues = [league.lower() for league in eval(FileIO.fileread(ignored_league_list_file)) if league] else: self.ignored_leagues = [] xbmc.executebuiltin("ClearProperty(no-games,Home)") self.getControl(32540).setImage(os.path.join(addon_path,"resources","img","goal.png")) xbmc.executebuiltin("SetProperty(loading-script-matchcenter-livescores,1,home)") self.livescoresThread() xbmc.executebuiltin("ClearProperty(loading-script-matchcenter-livescores,Home)") i = 0 while self.isRunning: if (float(i*200)/(livescores_update_time*60*1000)).is_integer() and ((i*200)/(3*60*1000)) != 0: self.livescoresThread() xbmc.sleep(200) i += 1 xbmc.log(msg="[Match Center] Script stopped", level=xbmc.LOGDEBUG)
def trace(method): # @trace decorator def method_trace_on(*args, **kwargs): start = time.time() result = method(*args, **kwargs) end = time.time() log('{name!r} time: {time:2.4f}s args: |{args!r}| kwargs: |{kwargs!r}|' .format(name=method.__name__,time=end - start, args=args, kwargs=kwargs), LOGDEBUG) return result def method_trace_off(*args, **kwargs): return method(*args, **kwargs) if __is_debugging(): return method_trace_on else: return method_trace_off
def log(self, txt = '', level=xbmc.LOGDEBUG): ''' Log a text into the Kodi-Logfile ''' try: if self.detailLevel > 0 or level == xbmc.LOGERROR: if self.detailLevel == 2 and level == xbmc.LOGDEBUG: # More Logging level = xbmc.LOGNOTICE elif self.detailLevel == 3 and (level == xbmc.LOGDEBUG or level == xbmc.LOGSEVERE): # Complex Logging level = xbmc.LOGNOTICE if level != xbmc.LOGSEVERE: if isinstance(txt, unicode): txt = unidecode(txt) xbmc.log(b"[%s] %s" % (self.pluginName, txt), level) except: xbmc.log(b"[%s] Unicode Error in message text" % self.pluginName, xbmc.LOGERROR)
def __init__(self): # Class initialisation. Fails with a RuntimeError exception if VPN Manager add-on is not available, or too old self.filtered_addons = [] self.filtered_windows = [] self.primary_vpns = [] self.last_updated = 0 self.refreshLists() self.default = self.getConnected() xbmc.log("VPN Mgr API : Default is " + self.default, level=xbmc.LOGDEBUG) self.timeout = 30 if not xbmc.getCondVisibility("System.HasAddon(service.vpn.manager)"): xbmc.log("VPN Mgr API : VPN Manager is not installed, cannot use filtering", level=xbmc.LOGERROR) raise RuntimeError("VPN Manager is not installed") else: v = int((xbmcaddon.Addon("service.vpn.manager").getAddonInfo("version").strip()).replace(".","")) if v < 310: raise RuntimeError("VPN Manager " + str(v) + " installed, but needs to be v3.1.0 or later")
def connectToValidated(self, connection, wait): # Given the number of a validated connection, connect to it. Return True when the connection has happened # or False if there's a problem connecting (or there's not a VPN connection available for this connection # number). Return True without messing with the connection if the current VPN is the same as the VPN being # requested. The wait parameter will determine if the function returns once the connection has been made, # or if it's fire and forget (in which case True will be returned regardless) if not self.isVPNSetUp(): return False if connection < 1 or connection > 10: return False connection = connection - 1 self.refreshLists() if self.primary_vpns[connection] == "": return False if self.getConnected() == self.primary_vpns[connection]: return True xbmc.log(msg="VPN Mgr API : Connecting to " + self.primary_vpns[connection], level=xbmc.LOGDEBUG) self.setAPICommand(self.primary_vpns[connection]) if wait: return self.waitForConnection(self.primary_vpns[connection]) return True
def __init__(self, addon, force): self.needReset = False self.fetchError = False self.start = True self.force = force ''' self.xmltvInterval = int(addon.getSetting('sd.interval')) self.logoSource = int(addon.getSetting('logos.source')) self.addonsType = int(addon.getSetting('addons.ini.type')) # make sure the folder in the user's profile exists or create it! if not os.path.exists(self.PLUGIN_DATA): os.makedirs(self.PLUGIN_DATA) # make sure the ini file is fetched as well if necessary if self.addonsType != self.INI_TYPE_DEFAULT: customFile = str(addon.getSetting('addons.ini.file')) if os.path.exists(customFile): # uses local file provided by user! xbmc.log('[%s] Use local file: %s' % (ADDON.getAddonInfo('id'), customFile), xbmc.LOGDEBUG) else: # Probably a remote file xbmc.log('[%s] Use remote file: %s' % (ADDON.getAddonInfo('id'), customFile), xbmc.LOGDEBUG) self.updateLocalFile(customFile, addon, True, force=force) '''
def scan_empty(self, datas): cats = [] def addCat(name, img, **kargs): cats.append(kargs) vids = [] def addVid(title, vurl, img): vids.append(1) self_module = sys.modules[__name__] self_module.addDir = addCat self_module.addLink = addVid self.get_categories(False) new_id2skip = [] i = 0 nb = len(cats) cat_done = [] for cat in cats: xbmc.log(str(i) + '/' + str(nb),xbmc.LOGDEBUG) i += 1 vids = [] self.get_videos(cat) if not len(vids): new_id2skip.append(cat['id']) cat_done.append(cat['id']) xbmc.log('done: ' + ',' .join(cat_done),xbmc.LOGDEBUG) xbmc.log('id2skip: ' + ','.join(new_id2skip),xbmc.LOGDEBUG)
def enableService(self, serviceId, addonFile): import xbmc try: srvThread = execfile(addonFile, sys.modules['__main__'].__dict__) except Exception as e: srvThread = None msg, loglevel = str(e), xbmc.LOGERROR else: msg = 'Service %s, succesfully loaded from %s' msg, loglevel = msg % (serviceId, addonFile), xbmc.LOGDEBUG finally: xbmc.log(msg, loglevel) if loglevel == xbmc.LOGERROR: msg = traceback.format_exc() xbmc.log(msg, xbmc.LOGERROR) return srvThread
def trace(method): # @debug decorator def method_trace_on(*args, **kwargs): start = time.time() result = method(*args, **kwargs) end = time.time() log('{name!r} time: {time:2.4f}s args: |{args!r}| kwargs: |{kwargs!r}|'.format(name=method.__name__, time=end - start, args=args, kwargs=kwargs), LOGDEBUG) return result def method_trace_off(*args, **kwargs): return method(*args, **kwargs) if __is_debugging(): return method_trace_on else: return method_trace_off
def addon_enabled(self, addon_id): rpc_request = json.dumps({"jsonrpc": "2.0", "method": "Addons.GetAddonDetails", "id": 1, "params": {"addonid": "%s" % addon_id, "properties": ["enabled"]} }) response = json.loads(xbmc.executeJSONRPC(rpc_request)) try: return response['result']['addon']['enabled'] is True except KeyError: message = response['error']['message'] code = response['error']['code'] error = 'Requested |%s| received error |%s| and code: |%s|' % (rpc_request, message, code) xbmc.log(error, xbmc.LOGDEBUG) return False
def set_addon_enabled(self, addon_id, enabled=True): rpc_request = json.dumps({"jsonrpc": "2.0", "method": "Addons.SetAddonEnabled", "id": 1, "params": {"addonid": "%s" % addon_id, "enabled": enabled} }) response = json.loads(xbmc.executeJSONRPC(rpc_request)) try: return response['result'] == 'OK' except KeyError: message = response['error']['message'] code = response['error']['code'] error = 'Requested |%s| received error |%s| and code: |%s|' % (rpc_request, message, code) xbmc.log(error, xbmc.LOGDEBUG) return False
def log(txt, log_level=xbmc.LOGDEBUG): """ Log something to the kodi.log file :param txt: Text to write to the log :param log_level: Severity of the log text :return: None """ if (_addon.getSetting("debug") == "true") or (log_level != xbmc.LOGDEBUG): if isinstance(txt, str): try: txt = txt.decode("utf-8") except UnicodeDecodeError: xbmc.log('Could not decode to Unicode: {0}'.format(txt), level=xbmc.LOGWARNING) return message = u'%s: %s' % (_addonid, txt) xbmc.log(msg=message.encode("utf-8"), level=log_level)
def AddAudioEntry(self, a): title = a.get("artist") if title: title += u" : " title += a.get("title") d = unicode(datetime.timedelta(seconds=int(a["duration"]))) listTitle = d + u" - " + title listitem = xbmcgui.ListItem(PrepareString(listTitle)) xbmc.log(str(a),xbmc.LOGDEBUG) listitem.setInfo(type='Music', infoLabels={'title': a.get("title") or "", 'artist': a.get("artist") or "", 'album': a.get("artist") or "", 'duration': a.get('duration') or 0}) listitem.setProperty('mimetype', 'audio/mpeg') xbmcplugin.addDirectoryItem(self.handle, a["url"], listitem, False)
def GetAlbums(self, uid=None): albums=None if uid: albums=self.api.call("photos.getAlbums", need_covers=1, uid=uid) else: albums=self.api.call("photos.getAlbums", need_covers=1) items = [] for a in albums: xbmc.log(a.get('thumb_src'),xbmc.LOGDEBUG) e = ( a["title"] + unicode(" (%s photo)" % a["size"]), a["description"], a.get('thumb_src'), str(a["aid"]), a["owner_id"] ) items.append(e) return items
def log_msg(msg, loglevel=xbmc.LOGDEBUG): '''log to kodi logfile''' if isinstance(msg, unicode): msg = msg.encode('utf-8') xbmc.log("Skin Helper Backup --> %s" % msg, level=loglevel)
def log(module, msg): xbmc.log((u"### [%s] - %s" % (module,msg,)).encode('utf-8'), level=xbmc.LOGDEBUG)
def log(message,loglevel=xbmc.LOGNOTICE): """"save message to kodi.log. Args: message: has to be unicode, http://wiki.xbmc.org/index.php?title=Add-on_unicode_paths#Logging loglevel: xbmc.LOGDEBUG, xbmc.LOGINFO, xbmc.LOGNOTICE, xbmc.LOGWARNING, xbmc.LOGERROR, xbmc.LOGFATAL """ xbmc.log(encode(__addon_id__ + u": " + message), level=loglevel)
def debug(content): """ Outputs content to log file :param content: content which should be output :return: None """ if type(content) is str: message = unicode(content, "utf-8") else: message = content log(message, xbmc.LOGDEBUG)
def _fill_buffer(self): while self._running: self._buffer_lock.acquire() if len(self._buffer) <= self.MIN_BUFFER_SIZE: log_msg("Spotify radio track buffer was %d, below minimum size of %d - filling" % (len(self._buffer), self.MIN_BUFFER_SIZE), xbmc.LOGDEBUG) self._buffer += self._fetch() self._buffer_lock.release() else: self._buffer_lock.release() time.sleep(self.CHECK_BUFFER_PERIOD)
def _fetch(self): log_msg("Spotify radio track buffer invoking recommendations() via spotipy", xbmc.LOGDEBUG) try: auth_token = xbmc.getInfoLabel("Window(Home).Property(spotify-token)").decode("utf-8") client = spotipy.Spotify(auth_token) tracks = client.recommendations( seed_tracks=[t["id"] for t in self._buffer[0: 5]], limit=self.FETCH_SIZE)["tracks"] log_msg("Spotify radio track buffer got %d results back" % len(tracks)) return tracks except Exception: log_exception("SpotifyRadioTrackBuffer", "Failed to fetch recommendations, returning empty result") return []
def log_exception(modulename, exceptiondetails): '''helper to properly log an exception''' log_msg(format_exc(sys.exc_info()), xbmc.LOGDEBUG) log_msg("Exception in %s ! --> %s" % (modulename, exceptiondetails), xbmc.LOGWARNING)
def request_token_spotty(spotty): '''request token by using the spotty binary''' token_info = None if spotty.playback_supported: try: args = ["-t", "--client-id", CLIENTID, "--scope", ",".join(SCOPE), "-n", "temp"] spotty = spotty.run_spotty(arguments=args) stdout, stderr = spotty.communicate() result = None log_msg(stdout, xbmc.LOGDEBUG) for line in stdout.split(): line = line.strip() if line.startswith("{\"accessToken\""): result = eval(line) # transform token info to spotipy compatible format if result: token_info = {} token_info["access_token"] = result["accessToken"] token_info["expires_in"] = result["expiresIn"] token_info["token_type"] = result["tokenType"] token_info["scope"] = ' '.join(result["scope"]) token_info['expires_at'] = int(time.time()) + token_info['expires_in'] token_info['refresh_token'] = result["accessToken"] log_msg("Token from spotty: %s" % token_info, xbmc.LOGDEBUG) except Exception as exc: log_exception(__name__, exc) return token_info
def log(self, message, level=xbmc.LOGDEBUG): """ Add message to Kodi log starting with Addon ID :param message: message to be written into the Kodi log :type message: str :param level: log level. :mod:`xbmc` module provides the necessary symbolic constants. Default: ``xbmc.LOGDEBUG`` :type level: int """ if isinstance(message, unicode): message = message.encode('utf-8') xbmc.log('{0} [v.{1}]: {2}'.format(self.id, self.version, message), level)
def log_debug(self, message): """ Add debug message to the Kodi log :param message: message to write to the Kodi log :type message: str """ self.log(message, xbmc.LOGDEBUG)
def onInit(self): xbmc.log(msg="[Match Center] Twitter cycle started", level=xbmc.LOGDEBUG) self.getControl(32540).setImage(os.path.join(addon_path,"resources","img","goal.png")) xbmc.executebuiltin("SetProperty(loading-script-matchcenter-twitter,1,home)") self.getTweets() xbmc.executebuiltin("ClearProperty(loading-script-matchcenter-twitter,Home)") i=0 while self.isRunning: if (float(i*200)/(twitter_update_time*60*1000)).is_integer() and ((i*200)/(3*60*1000)) != 0: self.getTweets() xbmc.sleep(200) i += 1 xbmc.log(msg="[Match Center] Twitter cycle stopped", level=xbmc.LOGDEBUG)
def log(msg, level=LOGDEBUG): try: if isinstance(msg, unicode): msg = '%s (ENCODED)' % msg.encode('utf-8') kodi.__log('%s: %s' % (name, msg), level) except Exception as e: try: kodi.__log('Logging Failure: %s' % (e), level) except: pass
def debug(self, message): message = prep_log_message(message) if xbmc: self._log(message, xbmc.LOGDEBUG) else: self._log.debug(message)
def _log_msg(msg, loglevel=xbmc.LOGDEBUG): '''helper to send a message to the kodi log''' if isinstance(msg, unicode): msg = msg.encode('utf-8') xbmc.log("Skin Helper Simplecache --> %s" % msg, level=loglevel)
def _log_msg(msg, level=xbmc.LOGDEBUG): '''logger to kodi log''' if isinstance(msg, unicode): msg = msg.encode("utf-8") xbmc.log('{0} --> {1}'.format(ADDON_ID, msg), level=level)
def emit(self, record): levels = { logging.CRITICAL: xbmc.LOGFATAL, logging.ERROR: xbmc.LOGERROR, logging.WARNING: xbmc.LOGWARNING, logging.INFO: xbmc.LOGINFO, logging.DEBUG: xbmc.LOGDEBUG, logging.NOTSET: xbmc.LOGNONE, } if get_setting_as_bool('debug'): try: xbmc.log(self.format(record), levels[record.levelno]) except UnicodeEncodeError: xbmc.log(self.format(record).encode( 'utf-8', 'ignore'), levels[record.levelno])
def log(self, msg, level=xbmc.LOGDEBUG): """Adds a log entry to the Kodi log Parameters ---------- msg : :obj:`str` Entry that should be turned into a list item level : :obj:`int` Kodi log level """ if isinstance(msg, unicode): msg = msg.encode('utf-8') xbmc.log('[%s] %s' % (self.plugin, msg.__str__()), level)
def log(text): xbmc.log(str([text]), xbmc.LOGDEBUG)
def __init__(self, pluginName, detailLevel=0, enableTidalApiLog=False): ''' Initialize Error Logging with a given Log Level detailLevel = 0 : xbmc.LOGERROR and xbmc.LOGNOTICE detailLevel = 1 : as level 0 plus xbmc.LOGWARNING detailLevel = 2 : as level 1 plus xbmc.LOGDEBUG detailLevel = 3 : as level 2 plus xbmc.LOGSEVERE ''' self.pluginName = pluginName self.detailLevel = detailLevel self.debugServer = 'localhost' # Set Log Handler for tidalapi self.addTidalapiLogger(pluginName, enableDebug=enableTidalApiLog)
def log(txt): if isinstance (txt,str): txt = txt.decode('utf-8') message = u'%s: %s' % (ADDONID, txt) xbmc.log(msg=message.encode('utf-8'), level=xbmc.LOGDEBUG) # Custom urlopener to set user-agent
def connectTo(self, connection_name, wait): # Given the ovpn filename of a connection, connect to it. Return True when the connection has happened # or False if there's a problem connecting (or there's not a VPN connection available for this connection # number). Return True without messing with the connection if the current VPN is the same as the VPN being # requested. The wait parameter will determine if the function returns once the connection has been made, # or if it's fire and forget (in which case True will be returned regardless) if not self.isVPNSetUp(): return False if connection_name == "": return False if self.getConnected() == connection_name: return True xbmc.log(msg="VPN Mgr API : Connecting to " + connection_name, level=xbmc.LOGDEBUG) self.setAPICommand(connection_name) if wait: return self.waitForConnection(connection_name) return True
def disconnect(self, wait): # Disconnect any active VPN connection. Return True when the connection has disconnected. If there's # not an active connection, return True anyway. The wait parameter will determine if the function returns # once the connection has been made, or if it's fire and forget (in which case True will be returned regardless) if not self.isVPNSetUp(): return False if self.getConnected() == "": return True xbmc.log(msg="VPN Mgr API : Disconnecting", level=xbmc.LOGDEBUG) self.setAPICommand("Disconnect") if wait: return self.waitForConnection("") return True
def filterAndSwitch(self, path, windowid, default, wait): # Given a path to an addon, and/or a window ID, determine if it's associated with a particular VPN and # switch to that VPN. Return True when the switch has happened or False if there's a problem switching # (or there's no VPN that's been set up). If the connected VPN is the VPN that's been identifed as being # required, or no filter is found, just return True without messing with the connection. The default # parameter is a boolean indicating if the default VPN should be connected to if no filter is found. # The wait parameter will determine if the function returns once the connection has been made, or if it's # fire and forget (in which case True will be returned regardless). if not self.isVPNSetUp(): return False connection = self.isFiltered(path, windowid) # Switch to the default connection if there's no filter if connection == -1 and default : xbmc.log(msg="VPN Mgr API : Reconnecting to the default", level=xbmc.LOGDEBUG) return self.defaultVPN(wait) if connection == 0: if self.getConnected() == "": return True xbmc.log(msg="VPN Mgr API : Disconnecting due to filter " + path + " or window ID " + str(windowid), level=xbmc.LOGDEBUG) self.setAPICommand("Disconnect") if wait: return self.waitForConnection("") if connection > 0: connection = connection - 1 if self.primary_vpns[connection] == "": return False if self.getConnected() == self.primary_vpns[connection]: return True xbmc.log(msg="VPN Mgr API : Connecting to " + self.primary_vpns[connection] + " due to filter " + path + " or window ID " + str(windowid), level=xbmc.LOGDEBUG) self.setAPICommand(self.primary_vpns[connection]) if wait: return self.waitForConnection(self.primary_vpns[connection]) return True
def save_setting(key, value, is_list=False): xbmc.log('[%s] Tyring to save setting: key "%s" / value "%s"' % (ADDON.getAddonInfo('id'), key, str(value)), xbmc.LOGDEBUG) file_path = xbmc.translatePath( os.path.join(ADDON.getAddonInfo('profile'), 'settings.xml')) if not os.path.exists(file_path): generate_settings_file(file_path) tree = eT.parse(file_path) root = tree.getroot() updated = False for item in root.findall('setting'): if item.attrib['id'] == key: if is_list: cur_values = item.attrib['value'] if not cur_values: cur_values = [] else: cur_values = json.loads(cur_values) if isinstance(value, list): for val in value: if val not in cur_values: cur_values.append(val) else: if value not in cur_values: cur_values.append(value) item.attrib['value'] = json.dumps(cur_values) ADDON.setSetting(key, cur_values) else: item.attrib['value'] = value ADDON.setSetting(key, value) updated = True if updated: tree.write(file_path) return True
def debug(s): if DEBUG: xbmc.log(str(s), xbmc.LOGDEBUG)
def _deleteLineup(self, lineup): c = self.conn.cursor() # delete channels associated with the lineup xbmc.log('[%s] Removing Channels for lineup: %s' % ( ADDON.getAddonInfo('id'), str(lineup)), xbmc.LOGDEBUG) c.execute('DELETE FROM channels WHERE source=? AND lineup=?', [self.source.KEY, lineup]) c.execute("UPDATE sources SET channels_updated=? WHERE id=?", [datetime.datetime.now(), self.source.KEY]) self.conn.commit()
def _saveLineup(self, channelList, lineup): c = self.conn.cursor() # delete removed channels c.execute('SELECT * FROM channels WHERE source=? AND lineup=?', [self.source.KEY, lineup]) to_delete = [] for row in c: station_id = row['id'] found = False for channel in channelList: if channel.id == station_id: found = True break if not found: xbmc.log('[%s] Removing Channel: %s from lineup: %s' % ( ADDON.getAddonInfo('id'), str(station_id), str(lineup)), xbmc.LOGDEBUG) to_delete.append(station_id) if to_delete: c.execute('DELETE FROM channels WHERE id IN (%s)' % ','.join('?' * len(to_delete)), to_delete) # Add new channels for channel in channelList: xbmc.log('[%s] Adding Channel: %s from lineup: %s' % ( ADDON.getAddonInfo('id'), str(channel.id), str(lineup)), xbmc.LOGDEBUG) logo = get_logo(channel) c.execute( 'INSERT OR IGNORE INTO channels(id, title, logo, stream_url, visible, weight, source, lineup) VALUES(?, ?, ?, ?, ?, (CASE ? WHEN -1 THEN (SELECT COALESCE(MAX(weight)+1, 0) FROM channels WHERE source=?) ELSE ? END), ?, ?)', [channel.id, channel.title, logo, '', True, -1, self.source.KEY, -1, self.source.KEY, lineup]) c.execute("UPDATE sources SET channels_updated=? WHERE id=?", [datetime.datetime.now(), self.source.KEY]) self.conn.commit()
def updateSchedules(self, ch_list, progress_callback): sd = SdAPI() station_ids = [] for ch in ch_list: station_ids.append(ch.id) # make sure date is in UTC! date_local = datetime.datetime.now() is_dst = time.daylight and time.localtime().tm_isdst > 0 utc_offset = time.altzone if is_dst else time.timezone td_utc = datetime.timedelta(seconds=utc_offset) date = date_local + td_utc xbmc.log("[%s] Local date '%s' converted to UTC '%s'" % (ADDON.getAddonInfo('id'), str(date_local), str(date)), xbmc.LOGDEBUG) # [{'station_id': station_id, 'p_id': p_id, 'start': start, # 'dur': dur, 'title': 'abc', 'desc': 'abc', 'logo': ''}, ... ] elements_parsed = 0 schedules = sd.get_schedules(station_ids, date, progress_callback) for prg in schedules: start = self.to_local(prg['start']) end = start + datetime.timedelta(seconds=int(prg['dur'])) result = Program(prg['station_id'], prg['title'], '', start, end, prg['desc'],'', imageSmall=prg['logo']) elements_parsed += 1 if result: if progress_callback and elements_parsed % 100 == 0: percent = 100.0 / len(schedules) * elements_parsed if not progress_callback(percent): raise SourceUpdateCanceledException() yield result