我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用log.info()。
def main(): args, cmd, capturer = arg.parse_args() log.configure_logging(args.output_directory, args.log_to_stderr) log.log_header() result = cache.retrieve(cmd, args, capturer) if not result: print "DLJC: Build command failed." sys.exit(1) javac_commands, jars, stats = result log.info('Results: %s', pprint.pformat(javac_commands)) output_json(os.path.join(args.output_directory, 'javac.json'), javac_commands) output_json(os.path.join(args.output_directory, 'jars.json'), jars) output_json(os.path.join(args.output_directory, 'stats.json'), stats) tools.run(args, javac_commands, jars)
def self_correct(bot, event, irc, args): match = re.match(r"^s[/](.*)[/](.*)[/]?$", " ".join(args)) if match is not None: nick = event.source.nick channel = event.target for i in bot.userdb[channel][nick]['seen']: msg = i['message'] output = msg.replace(match.group(1), match.group(2)) if msg == output: pass else: break irc.reply(event, '<{0}> {1}'.format(nick, output)) log.info('Changing %s to %s', msg, output) else: pass
def user_correct(bot, event, irc, args): match = re.match(r"^u[/]([\w]+)[/](.*)[/](.*)[/]?$", " ".join(args)) if match is not None: nick = match.group(1) channel = event.target for i in bot.userdb[channel][nick]['seen']: msg = i['message'] output = msg.replace(match.group(2), match.group(3)) if msg == output: pass else: break irc.reply(event, '<{0}> {1}'.format(nick, output)) log.info('Changing %s to %s', args, output) else: pass
def prepare_xml_product(product_infos): xml_data="<customer_product>\ <is_active>1</is_active>\ <is_from_vendor>0</is_from_vendor>\ <currency_id>58</currency_id>\ <vat_id>607</vat_id>\ <activity_classification_choice>commerce</activity_classification_choice>\ <type_of_product_id>20004</type_of_product_id>" for tag, value in product_infos.iteritems(): if tag in INCWO_PARAMS: xml_data+="<"+tag+">"+str(value).replace("& ","& ")+"</"+tag+">" # log.debug("xml info of product : tag {}, value {} ".format(tag, value)) xml_data+="</customer_product>" return xml_data
def start(self, join=False): self._t = threading.Thread(target=self._server.serve_forever) self._t.setDaemon(True) # don't hang on exit self._t.start() log.info("Listening on %s", self.address) if join: self._t.join()
def stop(self): log.info("Closing server...") self._server.shutdown() self._server.server_close() # self._t.join()
def close(self): if self._com is not None: log.info("Closing connection to: %s", self.dev) self._com.close()
def _connect(self): try: if(self.dev == ""): SerialGamePad.findSerialDevices(self._hardwareID) if len(SerialGamePad.foundDevices) > 0: self.dev = SerialGamePad.foundDevices[0] log.info("Using COM Port: %s", self.dev) try: self._com = serial.Serial(self.dev, timeout=5) except serial.SerialException as e: ports = SerialGamePad.findSerialDevices(self._hardwareID) error = "Invalid port specified. No COM ports available." if len(ports) > 0: error = "Invalid port specified. Try using one of: \n" + \ "\n".join(ports) log.info(error) raise SerialPadError(error) packet = SerialGamePad._generateHeader(CMDTYPE.INIT, 0) self._com.write(packet) resp = self._com.read(1) if len(resp) == 0: SerialGamePad._comError() return ord(resp) except serial.SerialException as e: error = "Unable to connect to the device. Please check that it is connected and the correct port is selected." log.exception(e) log.error(error) raise e
def click_handler(channel, click_type, was_queued, time_diff): log.info(channel.bd_addr + " " + str(click_type)) if str(click_type) == 'ClickType.ButtonSingleClick': try: log.info("Switching on lights associated with button " + channel.bd_addr) for light in groups[channel.bd_addr]['group']: bridge.get(light).on() except KeyError: log.warning("Light not found for button " + str(channel.bd_addr)) elif str(click_type) == 'ClickType.ButtonHold': # turn off all lights log.info("Turning off all lights...") for light in bridge: light.off() return
def got_button(bd_addr): cc = fliclib.ButtonConnectionChannel(bd_addr) # Assign function to call when a button is clicked cc.on_button_single_or_double_click_or_hold = click_handler cc.on_connection_status_changed = \ lambda channel, connection_status, disconnect_reason: \ log.info(channel.bd_addr + " " + str(connection_status) + (" " + str(disconnect_reason) if connection_status == fliclib.ConnectionStatus.Disconnected else "")) client.add_connection_channel(cc)
def got_info(items): log.info('Checking verified flic buttons') for bd_addr in items["bd_addr_of_verified_buttons"]: log.success(bd_addr) got_button(bd_addr)
def call_command(bot, event, irc, arguments): command = ' '.join(arguments).split(' ') if not command[0].startswith("?"): del command[0] name = command[0] else: name = command[0][1:] if not name == '' and not name.find("?") != -1: privmsg = event.target == bot.config['nickname'] args = command[1:] if len(command) > 1 else '' host = event.source.host chan = event.target if not privmsg else False try: perms = commands[name]['perms'] min_args = commands[name]['minArgs'] if check_perms(host, chan, owner=perms[2], admin=perms[1], trusted=perms[0]): if len(args) < min_args: irc.reply(event, config.argsMissing) else: target = "a private message" if privmsg else event.target source = event.source log.info("%s called %s in %s", source, name, target) commands[name]['func'](bot, event, irc, args) else: if not event.source.host.find("/bot/"): irc.reply(event, config.noPerms) except KeyError: irc.notice(event.source.nick, config.invalidCmd.format(name)) except Exception: irc.reply(event, 'Oops, an error occured!') print_error(irc, event)
def add_ignore(irc, event, args): host = args[0] base_message = "Ignoring %s for %s seconds" indefinite = "Ignoring %s indefinately" if len(args) > 1: if args[1] == 'random': duration = random.randrange(100, 10000) expires = duration + int(time.time()) else: duration = int(args[1]) expires = duration + int(time.time()) else: expires = None channel = args[2] if len(args) > 2 else None if channel is not None: try: i = config.ignores['channels'][channel] except KeyError: i = config.ignores['channels'][channel] = [] i.append([host, expires]) else: i = config.ignores['global'] i.append([host, expires]) if expires is not None: if channel is not None: logging.info(base_message + " in %s", host, duration, channel) else: logging.info(base_message, host, duration) else: if channel is not None: logging.info(indefinite + " in %s", host, channel) else: logging.info(indefinite, host)
def on_endofmotd(event, irc): log.info("Received MOTD from network")
def on_ctcp(irc, event, raw): log.info("Received CTCP reply " + raw)
def on_join(self, event, irc): if event.source.nick == self.config['nickname']: log.info("Joining %s", event.target) if event.target not in self.userdb: self.userdb[event.target] = {} irc.send("WHO {0} nuhs%nhuac".format(event.target)) irc.send("NAMES {0}".format(event.target)) else: irc.send("WHO {0} nuhs%nhuac".format(event.source.nick))
def on_invite(event, irc): hostmask = event.source.host channel = event.arguments[0] if util.check_perms(hostmask, channel, trusted=True): log.info("Invited to %s by %s", channel, hostmask) irc.join(channel)
def on_notice(self, event, irc): source = event.source.host if not event.target == "*": if not event.target == self.config['nickname']: channel = event.target log.info("Received channel notice from %s in %s", source, channel) else: log.info("Received private notice from %s", source)
def on_endofnames(event, irc): log.info('Received end of NAMES reply.')
def run_test(self, suite): if not os.path.exists('report'): # ?????????????????? os.makedirs('report') report_name = "report\{}-{}.html".format("report", time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(time.time()))) with open(report_name, "wb") as f: runner = BSTestRunner(stream=f, title='{}???????'.format(self.name)) runner.run(suite) log.info("{}?????????????".format(self.name)) os.system("start {}".format(report_name))
def run(self, test): "Run the given test case or test suite." log.info("??????...") result = _TestResult(self.verbosity) test(result) self.stopTime = datetime.datetime.now() self.generateReport(test, result) print('\nTime Elapsed: %s' % (self.stopTime - self.startTime), file=sys.stderr) return result
def main(): db.init_db_engine(config.SQLALCHEMY_DATABASE_URI) total = db.data.get_count_pending_songs() done = 0 starttime = time.time() thisdone, rem = lookup() done += thisdone while rem > 0: thisdone, rem = lookup() done += thisdone durdelta, remdelta = util.stats(done, total, starttime) log.info("Done %s/%s in %s; %s remaining", done, total, str(durdelta), str(remdelta))
def get(self): ticket = DataCenter().get_jsapi_ticket() sign = Sign(ticket, 'http://www.test.com/wx/getjsapiticket') sign_str = sign.sign() #print 'weixin_JSAPI_ticket: ' #print sign_str log.info('weixin_JSAPI_ticket: %s'%(sign_str)) self.write(sign_str)
def post(self): body = json.loads(self.request.body) response = {'code': 0} if body.has_key('songId'): search_result = BaiduMusicSearch().search_song_byid(body['songId']) response['searchResult'] = search_result else: response['code'] = -1 response['error'] = "not found song id." json_data = json.dumps(response, ensure_ascii=False) log.info(json_data) self.write(json_data)
def get_access_token(self): appId = "appId" appSecret = "appSecret" postUrl = ("https://api.weixin.qq.com/cgi-bin/token?grant_type=" "client_credential&appid=%s&secret=%s" % (appId, appSecret)) urlResp = urllib.urlopen(postUrl) urlResp = json.loads(urlResp.read()) if urlResp.has_key('access_token'): self.__accessToken = urlResp['access_token'] self.__leftTime = urlResp['expires_in'] #get jsapi_ticket postUrl = ("https://api.weixin.qq.com/cgi-bin/ticket/getticket?access_token=%s&type=jsapi" % (self.__accessToken)) urlResp = urllib.urlopen(postUrl) urlResp = json.loads(urlResp.read()) if urlResp.has_key('ticket') and urlResp['errcode']==0: self.__jsapi_ticket = urlResp['ticket'] # restore in datacenter self.__data_global_obj.set_access_token(self.__accessToken) self.__data_global_obj.set_jsapi_ticket(self.__jsapi_ticket) #print "access_token: %s" % self.__accessToken #print "saved access_token: %s" % DataCenter().get_access_token() #print "expires_in: %s" % self.__leftTime log.info("access_token: %s"%(self.__accessToken)) log.info("saved access_token: %s"%(DataCenter().get_access_token())) log.info("expires_in: %s"%(self.__leftTime))
def query_menu(self): postUrl = "https://api.weixin.qq.com/cgi-bin/menu/get?access_token=%s" % self.__data_global_obj.get_access_token() urlResp = urllib.urlopen(url=postUrl) #print urlResp.read() log.info(urlResp.read())
def delete_menu(self): postUrl = "https://api.weixin.qq.com/cgi-bin/menu/delete?access_token=%s" % self.__data_global_obj.get_access_token() urlResp = urllib.urlopen(url=postUrl) #print urlResp.read() log.info(urlResp.read()) #???????????
def get_current_selfmenu_info(self): postUrl = "https://api.weixin.qq.com/cgi-bin/get_current_selfmenu_info?access_token=%s" % self.__data_global_obj.get_access_token() urlResp = urllib.urlopen(url=postUrl) #print urlResp.read() log.info(urlResp.read()) #### user operation ####
def get_user_info(self, openid): postUrl = "https://api.weixin.qq.com/cgi-bin/user/info?access_token=%s&openid=%s&lang=zh_CN " \ % (self.__data_global_obj.get_access_token(), openid) urlResp = urllib.urlopen(postUrl) recJson = urlResp.read() #read?????????????? #print recJson log.info(recJson) urlResp = json.loads(recJson) #return _decode_dict(urlResp) return urlResp
def get_access_token(self): grant_type = 'client_credentials' client_id = '' client_secret = '' #scope = '301 302 303 304 305 306 307' #scope = 'music_media_basic music_musicdata_basic music_userdata_basic music_search_basic music_media_premium music_audio_premium music_audio_hq' postUrl = ("https://openapi.baidu.com/oauth/2.0/token?" "grant_type=%s&client_id=%s&client_secret=%s" % (grant_type, client_id, client_secret)) postUrl = urllib.quote(postUrl) #URL????? urlResp = urllib.urlopen(postUrl) urlResp = urlResp.read() #print urlResp urlResp = json.loads(urlResp) #print urlResp DebugPrint(urlResp) if urlResp.has_key('access_token'): self.__accessToken = urlResp['access_token'] self.__session_key = urlResp['session_key'] #print "baidu music access_token: %s" % self.__accessToken #print "baidu music session_key: %s" % self.__session_key log.info("baidu music access_token: %s" % self.__accessToken) log.info("baidu music session_key: %s" % self.__session_key)
def create(self, postData, accessToken): postUrl = "https://api.weixin.qq.com/cgi-bin/menu/create?access_token=%s" % accessToken if isinstance(postData, unicode): postData = postData.encode('utf-8') urlResp = urllib.urlopen(url=postUrl, data=postData) #print urlResp.read() log.info(urlResp.read())
def query(self, accessToken): postUrl = "https://api.weixin.qq.com/cgi-bin/menu/get?access_token=%s" % accessToken urlResp = urllib.urlopen(url=postUrl) #print urlResp.read() log.info(urlResp.read())
def delete(self, accessToken): postUrl = "https://api.weixin.qq.com/cgi-bin/menu/delete?access_token=%s" % accessToken urlResp = urllib.urlopen(url=postUrl) #print urlResp.read() log.info(urlResp.read()) #???????????
def get_current_selfmenu_info(self, accessToken): postUrl = "https://api.weixin.qq.com/cgi-bin/get_current_selfmenu_info?access_token=%s" % accessToken urlResp = urllib.urlopen(url=postUrl) #print urlResp.read() log.info(urlResp.read())
def update_product(fournisseur_product_infos, incwo_product_infos): update_infos = {} try: PRODUCT_ID = incwo_product_infos["id"] PRODUCT_REF = fournisseur_product_infos["reference"] except KeyError: log.error("Incwo product with no ID or ref associated") raise ValueError() try: # Si produit considere comme vitrine : on annule la comparaison prix et category en mettant les champs aux memes valeurs if not compareValues(incwo_product_infos["product_category_id"],VITRINE_CATEGORY_ID): log.warning("Pas de mise a jour du prix du produit {} (Produit categorisé comme en vitrine)".format(PRODUCT_REF)) incwo_product_infos["product_category_id"] = fournisseur_product_infos["product_category_id"] fournisseur_product_infos["price"] = incwo_product_infos["price"] except KeyError: log.error("Incwo product with no category_ID associated") raise ValueError() for key in INCWO_PARAMS: if not key in fournisseur_product_infos: log.error("Product "+fournisseur_product_infos["name"]+" : fournisseur info incomplete! Missing "+key) raise ValueError() elif not key in incwo_product_infos: if key != 'barcode': log.debug("incwo info incomplete, updating "+key) update_infos[key]=fournisseur_product_infos[key] elif (compareValues(fournisseur_product_infos[key],incwo_product_infos[key])): log.debug("incwo info outdated, updating {}".format(key)) log.debug("Picata {} ; incwo_product_infos {}".format(fournisseur_product_infos[key], incwo_product_infos[key])) update_infos[key]=fournisseur_product_infos[key] if len(update_infos) > 0 : log.debug("Update needed for product "+str(PRODUCT_ID)) xml = prepare_xml_product(update_infos) url = "https://www.incwo.com/"+str(ID_USER)+"/customer_products/"+str(PRODUCT_ID)+".xml"; send_request('put', url, xml) # else : # log.debug("Product {} (id {}) infos up to date".format(fournisseur_product_infos["name"],PRODUCT_ID)) manage_stock_movement(fournisseur_product_infos, PRODUCT_ID, incwo_product_infos["product_category_id"])
def get(self): ''' the import handle ''' try: webData = self.request.body #print "WxShowHandler Get webdata is ", webData log.info("WxShowHandler Get webdata is %s" % (webData)) id = self.get_argument('id', '') showidstr = self.get_argument('showid', '') if len(id) == 0 or len(showidstr) == 0: self.write('parameter error!') # get sign ticket for weixin jsapisdk ticket = DataCenter().get_jsapi_ticket() urlall = self.request.uri #print self.request.path /wx/show #print self.request.uri /wx/show?id=oLN9QxI-YpdNJkSIXQkppJDHuvZM&showid=15 sign = Sign(ticket, test_urlhead + urlall) sign_data = sign.sign() #print 'weixin_JSAPI_ticket: ' #print sign_data log.info('weixin_JSAPI_ticket: %s'%(sign_data)) timestamp = sign_data['timestamp'] nonceStr = sign_data['nonceStr'] signature = sign_data['signature'] # get_param id showid = long(showidstr) userdata = DataCenter().get_data_by_id(showid) if len(userdata) == 0: self.write("no data") return data_dict = userdata[0] #print data_dict log.debug(data_dict) title_info = data_dict['title'] sub_info = data_dict['aidata'].split(test_split_str) all_info = data_dict['originaldata'].split(test_split_str) createtime = data_dict['createtime'].strftime('%Y-%m-%d %H:%M:%S') author = '' authorinfo = data_dict['author'] datasource = data_dict['datasource'] _userid = data_dict['userid'] if authorinfo == '': author = datasource elif datasource == '': author = authorinfo else : author = authorinfo + ' | ' + datasource self.render("index.html", title=title_info, allinfo=all_info, subjects=sub_info, author=author, \ createtime=createtime, appid=test_weixin_appid, timestamp=timestamp, nonceStr=nonceStr, \ userid=_userid, signature=signature) except Exception, Argument: log.error(Argument) self.write(Argument)
def main(): # deal Ctrl-C signal.signal(signal.SIGINT, signal_handler) tornado.options.parse_command_line() http_server = tornado.httpserver.HTTPServer(application) #????? http_server.listen(options.port) #???????(???????????) #http_server.bind(options.port) #http_server.start(0) #print 'Development server is running at http://127.0.0.1:%s/' % options.port #print 'Quit the server with Control-C' log.info('Development server is running at http://127.0.0.1:%s/' % options.port) log.info('Quit the server with Control-C') # nlpproc init nlp_global_obj = NlpProc() nlp_global_obj.init_nlpta() # datacenter init data_global_obj = DataCenter() data_global_obj.connect_db("127.0.0.1:3306",'testdb', 'root', 'password') # log in iflytek ifly_tek = IflyTek() # start get access token timer get_access_token_instance = WeixinClient() get_access_token_instance.get_access_token() #get first time tornado.ioloop.PeriodicCallback(get_access_token_instance.get_access_token, period).start() # start scheduler get_himalaya_access_token_ins = Himalaya() tornado.ioloop.PeriodicCallback(get_himalaya_access_token_ins.get_access_token, period).start() # test #tornado.ioloop.PeriodicCallback(like_cron, 10000).start() # start scheduler # start tornado ioloop tornado.ioloop.IOLoop.instance().start()
def get_recommendation_tmp(self, os_type, pack_id): recommendation = {'code': 0} #?????? res = self.categories_human_recommend(os_type, pack_id) for item in res: #print "===>> category:", item['category_name'].encode("utf-8"), item['id'] if item['id'] != 8: continue category_id = item['id'] category_name = item['category_name'] print "====>>>> categor name, id:", category_name, category_id #log.info("category_name:%s, category_id:%d"%(category_name, category_id)) #????tag res = self.tags_list(os_type, pack_id, 0, category_id) tag_names = [] for item in res: tag_names.append(item['tag_name']) # category + tag ???? recommendation['recommendation'] = [] recommendation['code'] = 0 cnt = 0 for item in tag_names: """ if cmp(item, u"????"): #print "===>>> tag_name:", item.encode('utf-8') print "===>>> tag_name:", item continue """ print "===>>> tag_name:", item res = self.albums_list(os_type, pack_id, category_id, item, 3) if res.has_key('albums'): tmp_list = res['albums'] for track in tmp_list: print "==>>>album: ", track['album_title'] else: pass else: recommendation = {'code': 0} recommendation['error'] = 'no data' return json.dumps(recommendation, ensure_ascii = False).encode('utf-8')