我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用itchat.content()。
def gpu_status(self,av_type_list): for t in av_type_list: cmd='nvidia-smi -q --display='+t #print('\nCMD:',cmd,'\n') r=os.popen(cmd) info=r.readlines() r.close() content = " ".join(info) #print('\ncontent:',content,'\n') index=content.find('Attached GPUs') s=content[index:].replace(' ','').rstrip('\n') self.t_send(s, toUserName='filehelper') time.sleep(.5) #th.exit() #============================================================================== # #==============================================================================
def WhatToPaiDui(self, groupName): msgCount = {} msgs = self.groupLastMsgsDict[groupName] for msg in msgs: if msg['Content'] not in msgCount: msgCount[msg['Content']] = 0 msgCount[msg['Content']] += 1 contentToPaiDui = [ x for x in msgCount if msgCount[x] > 1 ] if len(contentToPaiDui) == 0: # No dui to pai return # it's possible that two duis are formed at the same time, but only one can pass the TTL check for content in contentToPaiDui: if (groupName, content) not in self.selfPaiDuiTTL or self.selfPaiDuiTTL == 0: self.selfPaiDuiTTL[(groupName, content)] = self.maxSelfPaiDuiTTL yield content # We use yield here because we still need to conitnue managing the TTL else: self.selfPaiDuiTTL[(groupName, content)] -= 1
def note(): @itchat.msg_register(['Sharing'],isGroupChat=True) def collect_links(msg): try: received_date = datetime.datetime.now() title = msg['FileName'] url = msg['Url'] sharer = msg['ActualNickName'] text = find_text(content=msg['Content']) print(title, sharer, text, url, received_date) store(received_date=received_date, title=title, url=url, sharer=sharer, text=text) print('data stored done') except Exception as e: print(e)
def text_reply(msg): # cont = alice.respond(msg['Text']) cont = requests.get('http://www.tuling123.com/openapi/api?key=?????&info=%s' % msg['Content']).content m = json.loads(cont) itchat.send(m['text'], msg['FromUserName']) if m['code'] == 200000: itchat.send(m['url'], msg['FromUserName']) if m['code'] == 302000: itchat.send(m['list'], msg['FromUserName']) if m['code'] == 308000: itchat.send(m['list'], msg['FromUserName'])
def GetMiddleStr(self,content,startStr,endStr): #get the string between two specified strings #?????????????? try: startIndex = content.index(startStr) if startIndex>=0: startIndex += len(startStr) endIndex = content.index(endStr) return content[startIndex:endIndex] except: return '' #============================================================================== # #==============================================================================
def __init__(self, blacklist=[]): self.blacklist = blacklist self.groupLastMsgsDict = {} # A dictionary controlling not pai dui for more than one time # Key: (groupName, content), Value: TTL (0 or non-exist means OK to paidui) self.selfPaiDuiTTL = {} logging.info('PaiduiHook initialized.')
def process(self, msg, type): if not self.isInitialized: logging.error('The forwarder was not properly initialized. Please send a message in the groups you want to connect and try again.') return shallSendObj = self.shallSend(msg) if not shallSendObj['shallSend']: return if type == TEXT: fromText = '[{0}]'.format(self.chatroomDisplayNames[shallSendObj['fromChatroom']]) destinationChatroomId = self.chatroomIds[not shallSendObj['fromChatroom']] content = '{0} {1}: {2}'.format(fromText, msg['ActualNickName'], msg['Content']) logging.info(content) itchat.send(content, destinationChatroomId) elif type == PICTURE: fn = msg['FileName'] newfn = os.path.join(self.fileFolder, fn) msg['Text'](fn) os.rename(fn, newfn) type = {'Picture': 'img', 'Video': 'vid'}.get(msg['Type'], 'fil') typeText = {'Picture': '??', 'Video': '??'}.get(msg['Type'], '??') fromText = '[{0}]'.format(self.chatroomDisplayNames[shallSendObj['fromChatroom']]) destinationChatroomId = self.chatroomIds[not shallSendObj['fromChatroom']] content = '{0} {1} ???{2}:'.format(fromText, self.nickNameLookup.lookupNickName(msg), typeText) itchat.send(content, destinationChatroomId) logging.info(content) itchat.send('@{0}@{1}'.format(type, newfn), destinationChatroomId) elif type == SHARING: fromText = '[{0}]'.format(self.chatroomDisplayNames[shallSendObj['fromChatroom']]) destinationChatroomId = self.chatroomIds[not shallSendObj['fromChatroom']] content = '{0} {1} ?????: {2} {3}'.format(fromText, self.nickNameLookup.lookupNickName(msg), msg['Text'], msg['Url']) logging.info(content) itchat.send(content, destinationChatroomId) else: logging.info('Unknown type encoutered.') pass
def generateTagCloudForGroupV2(self, groupName, userName=None): records = None if userName is None: records = self.coll.find({ 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum) allRecords = self.coll.find({ 'to': { '$ne': groupName } }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum * 5) allRecordsGroup = sorted(allRecords, key=lambda x: x['to']) else: records = self.coll.find({ 'from': userName, 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum) allRecords = self.coll.find({ 'from': { '$ne': userName }, 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum * 5) allRecordsGroup = sorted(allRecords, key=lambda x: x['from']) docThisGroup = list(jieba.cut(' '.join([ r['content'] for r in records if re.match('<<<IMG', r['content']) is None]))) # remove the image records allRecordsGroup = itertools.groupby(allRecordsGroup, lambda x: x['to']) docsOtherGroups = [ list(jieba.cut(' '.join([x['content'] for x in list(g) if re.match('<<<IMG', x['content']) is None]))) for k, g in allRecordsGroup ] docs = [ docThisGroup ] + docsOtherGroups dictionary = gensim.corpora.Dictionary(docs) docs = [ dictionary.doc2bow(doc) for doc in docs ] id2token = { v: k for k, v in dictionary.token2id.items() } tfidf = gensim.models.tfidfmodel.TfidfModel(corpus=docs) tagCloudFrequencies = { id2token[x[0]]: x[1] for x in tfidf[docs[0]] } img = self.wordCloud.generate_from_frequencies(tagCloudFrequencies).to_image() fn = self.generateTmpFileName() img.save(fn) return fn # Generate a tag cloud image from the latest self.recordMaxNum messages. Return the file name.
def generateTagCloudForGroup(self, groupName, userName=None): records = None if userName is None: records = self.coll.find({ 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum) else: records = self.coll.find({ 'from': userName, 'to': groupName }).sort([ ('timestamp', DESCENDING) ]).limit(self.recordMaxNum) texts = [ r['content'] for r in records ] frequencies = Counter([ w for text in texts for w in jieba.cut(text, cut_all=False) if len(w) > 1 ]) frequencies = { k: min(self.maxFrequency, frequencies[k]) for k in frequencies } img = self.wordCloud.generate_from_frequencies(frequencies).to_image() fn = self.generateTmpFileName() img.save(fn) return fn
def find_text(content): content = content.replace(' ','A').replace('\n', 'A').replace('\t','A') pattern = r"<des>(.*?)</des>" r = re.compile(pattern) result = r.findall(content)[0] return result
def note(): @itchat.msg_register(NOTE, isGroupChat=True) def deal_note(msg): try: pattern = r'??"(.*?)"?????' r = re.compile(pattern) result = r.findall(msg['Content'])[0] return(u""" @%s ?????? ?????? ?????????????, ????? ???? ?? ?? ????? 14? ?? ????? ps: ??????sharing.zhuojiayuan.com""" % result) except Exception as e: print(e) @itchat.msg_register(['Sharing'],isGroupChat=True) def collect_links(msg): try: received_date = datetime.datetime.now() title = msg['FileName'] url = msg['Url'] sharer = msg['ActualNickName'] text = find_text(content=msg['Content']) print(title, sharer, text, url, received_date) store(received_date=received_date, title=title, url=url, sharer=sharer, text=text) print('data stored done') #????????flask???????????????? #????controller/glinks.py?new?? r = requests.get('http://www.zhuojiayuan.com:66/glinks/new') print(r.status_code) except Exception as e: print(e)
def handle_text_msg(msg): global IN_ACTION # todo ?????? ??????? username = msg['ActualNickName'] # ???,?? content = msg['Text'] userlogo = msg["UserImg"] # ?? if username in setting.ACTION_ADMIN and setting.ACTION_KEYWORD in content: begin_action() response = "????! 2?????:)" return {'type': 'b', 'response': response} # ???? ?? if '[??]' in content and IN_ACTION: # ?? ???? clean_content = re.split(r'\[??\]', content)[-1] if clean_content: # todo :???????? ??????? clean_content = "<span class='api_icon'>![](" + userlogo + \ ")</span><span class='api_nickname'>" + username + "</span>" + clean_content try: # ????? timeout plugin.msg_input(msg=clean_content) except Exception as e: logger.error(str(e)) # ???????webhook # ?? forum_client.post_thread(username,clean_content) # ??username????????clean_content????? response = "@{} ??????".format(username) return {'type': 'q', 'response': response} # if '/bot/t' in content: ''' if content.startswith('[??]'): #?? #????????? thread_id,clean_content = re.split(r'\[??\].*?(?P<id>\d+)', content)[-2:] response = "????:)" return {'type':'t','response':response} ''' # if '/bot/h' in content: if '[??]' in content: # help #response='Hi @{} ???????\n??:[??]\n??:[??] ????\n??:[??](id) ????\n??:[??] ????'.format(msg['ActualNickName']) response = 'Hi @{} ???????\n??:[??]\n??:[??] ????'.format( msg['ActualNickName']) return {'type': 'h', 'response': response} return {'type': None, 'response': None} # ??? # ????