我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tweepy.Cursor()。
def searchTweets(self, tag, limit=50000, tfilter=" -filter:retweets", resultType="recent"): # if tfilter is appended to tag, it'll have some problem about tqdm, or what???. # I don't know why it'll have the problem. #tag += tfilter try: tweets = [] tweetsObj = tweepy.Cursor(self.API.search, q=tag, result_type=resultType, exclude_replies = True).items(limit) pBar = tqdm(tweetsObj, ascii=True, total=limit, desc="Getting Tweets!") for cnt, tweet in enumerate(pBar): pBar.update(1) if not cnt < limit: break tweets.append(tweet) except tweepy.error.TweepError as et: print(et) except Exception as e: print(e) return tweets # if the tweets has more than 2 tweet which is tweeted by same user, it delete old tweet.
def rest_tweets(self, query, lang="pt", limit=None): """ returns all the tweets within 7 days top according to the query received by this method returns the complete tweet :param query: should contain all the words and can include logic operators should also provide the period of time for the search ex: rock OR axe (visit https://dev.twitter.com/rest/public/search to see how to create a query) :param lang: the language of the tweets :param limit: defines the maximum amount of tweets to fetch :return: tweets: a list of all tweets obtained after the request """ tweets = [] for tweet in tw.Cursor(self.api.search, q=query, lang=lang).items(limit): tweets.append(tweet._json) return tweets
def analyzetweets(self, access_token, access_token_secret, mytweets=False, q=None): auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) sentimentlist = [] subjectivitylist = [] number = NUMBER_OF_TWEETS tweets = tweepy.Cursor(api.user_timeline).items() if mytweets else tweepy.Cursor(api.search, q=q).items(number) for index, tweet in enumerate(tweets): analysis = TextBlob(tweet.text).sentiment sentimentlist.append(analysis.polarity) subjectivitylist.append(analysis.subjectivity) self.update_state(state="RUNNING", meta={"current": index + 1, "total": number}) sentimentavg = float(sum(sentimentlist) / max(len(sentimentlist), 1)) subjectivityavg = float(sum(subjectivitylist) / max(len(subjectivitylist), 1)) return {"current": number, "total": number, "subjectivityavg": subjectivityavg, "sentimentavg": sentimentavg}
def save_hashtag(hashtag): for status in tweepy.Cursor(api_twitter.search, q=hashtag).items(1000): try: for media in status.extended_entities['media']: print(media['media_url']) urllib.request.urlretrieve(media['media_url'], os.path.join(os.getcwd(), os.path.join('files', 'riko_meme'), media['media_url'].link.split('/')[-1])) except AttributeError: pass
def getTimeline(self, limit=50000, resultType="recent"): try: tweets = [] tweetsObj = tweepy.Cursor(self.API.home_timeline, result_type=resultType, exclude_replies = False).items(limit) pBar = tqdm(tweetsObj, ascii=True, total=limit, desc="Getting Tweets!") for cnt, tweet in enumerate(pBar): pBar.update(1) if not cnt < limit: break tweets.append(tweet) except tweepy.error.TweepError as et: print(et) except Exception as e: print(e) return tweets
def getFriendIds(self, userId, limit=100000): if self._byProtected(userId): return [] friendIds = [] try: friends = tweepy.Cursor(\ self.API.friends_ids,\ user_id = userId, \ cursor = -1\ ).items() for cnt, friend in enumerate(friends): if not cnt < limit: break friendIds.append(friend) return friendIds except tweepy.error.TweepError as et: print(et) return []
def getTweets(self, userId, limit=50): tweets = [] try: tweetsObj = tweepy.Cursor( \ self.API.user_timeline, \ user_id=userId, \ exclude_replies = True \ ).items(limit) for cnt, tweet in enumerate(tweetsObj): if not cnt < limit: break # print(tweet.text.replace("\n", "")) tweets.append(tweet) except tweepy.error.TweepError as et: print(et) return tweets
def crawl_user_data(portrait, path): api = portrait_api(portrait) now = datetime.datetime.now().strftime("%Y%m%d%H%M") timeline = [t._json for t in tweepy.Cursor(api.user_timeline, user_id=portrait.auth_id_str, count=200, since_id=portrait.last_tweet_id).items()] if timeline: with gzip.open('{0}/{1}_{2}.data.gz'.format(path, portrait.auth_id_str, now), 'wt') as f: f.write(json.dumps(timeline)) print('loaded tweets', len(timeline)) if not portrait.demo_portrait: print(portrait.auth_screen_name, 'not a demo portrait. downloading connectivity') connectivity = [t for t in tweepy.Cursor(api.friends_ids, user_id=portrait.auth_id_str, cursor=-1).items()] print('loaded friends', len(connectivity)) with gzip.open('{0}/{1}_{2}.friends.gz'.format(path, portrait.auth_id_str, now), 'wt') as f: f.write(json.dumps(connectivity)) return True
def getFollower(profile): i = 0 l = [] printColour("\n[*] ", BLUE) print "Follower list:\n" for user in tweepy.Cursor(api.followers, screen_name=profile, count=200).items(): try: l.append(user.screen_name) i = i + 1 except: print "[-] Timeout, sleeping for 15 minutes..." time.sleep(15*60) for user in l: printColour("[+] @" + user, GREEN) print(" (https://www.twitter.com/" + user + ")\n") printColour("\n[*] ", CYAN) print "Total follower: " + str(len(l)-1) + "\n"
def getFollowing(profile): i = 0 l = [] printColour("\n[*] ", BLUE) print "Following list:\n" for user in tweepy.Cursor(api.friends, screen_name=profile, count=200).items(): try: l.append(user.screen_name) i = i + 1 except: print "[-] Timeout, sleeping for 15 minutes..." time.sleep(15*60) for user in l: printColour("[+] @" + user, GREEN) print(" (https://www.twitter.com/" + user + ")\n") printColour("\n[*] ", CYAN) print "Total following: " + str(len(l)-1) + "\n"
def rogue(s): printColour("\n[*] ", BLUE) c = 0 print "Potential rogue profile:\n" pageList = [] tmp = [] i=0 for page in tweepy.Cursor(api.search_users, q=s, include_entities=False, count=20).pages(): if (c>30): # Counter to limit the request break c +=1 for result in page: if result.screen_name not in tmp: i += 1 tmp.append(result.screen_name) printColour("[+] " + result.name + " (@" + result.screen_name + ")", GREEN) print "\n" printColour("\n[*] ", CYAN) print "Total potential rogue profile: " + str(i) + "\n"
def get_tweets(self, since_id): """Looks up metadata for all Trump tweets since the specified ID.""" tweets = [] # Include the first ID by passing along an earlier one. since_id = str(int(since_id) - 1) # Use tweet_mode=extended so we get the full text. for status in Cursor(self.twitter_api.user_timeline, user_id=TRUMP_USER_ID, since_id=since_id, tweet_mode="extended").items(): # Use the raw JSON, just like the streaming API. tweets.append(status._json) self.logs.debug("Got tweets: %s" % tweets) return tweets
def save_tweets_with_retweets(screen_name): timestamp = time.strftime("%d.%m.%Y %H:%M:%S", time.localtime()) print(timestamp) for tweet in limit_handled(tweepy.Cursor(TWITTER_API.user_timeline, id=screen_name, count=200).items()): retweets = get_retweets(tweet.id) db.saveRetweets(tweet, retweets)
def search(self, target, date, maxnum = 10): ''' Collect all the tweets with the keyword self.target, in the range self.date[0] - self.date[1] ''' self.target = target self.date = date cursor = tweepy.Cursor( self.api.search, q = self.target, since = self.date[0], until = self.date[1], show_user = True) return cursor.items(maxnum)
def search_on_user(api, user_name, search_term): """ Searches a term over a user's twitter feed """ limit.check_remaining_calls(api) c = tweepy.Cursor(api.search, q=search_term+ ' -RT' + ' from:'+user_name, lang="en") # Removes retweets limit.check_remaining_calls(api) list_of_tweets = [] counter = 0 for tweet in c.items(): limit.check_remaining_calls(api) counter = counter + 1 tweet_text = tweet.text regex = r'https?://[^\s<>"]+|www\.[^\s<>"]+' match = re.search(regex, tweet_text) if match: link = match.group() list_of_tweets.append(link) if counter == 0: return 'null' return list_of_tweets[0]
def get_user_tweets(self, username, since_id=None): """ Download all tweets for an user Max is around 3200 tweets """ if self.api is None: self._authenticate() tweets = [] if since_id: cursor = tweepy.Cursor(self.api.user_timeline, screen_name=username, since_id=since_id) else: cursor = tweepy.Cursor(self.api.user_timeline, screen_name=username) for item in cursor.items(): tweets.append(item) return tweets
def get_searched_tweets(self, hashtag, since_id=None): """ Search all tweets for a hashtag """ if self.api is None: self._authenticate() tweets = [] if since_id: cursor = tweepy.Cursor(self.api.search, q=hashtag, count=100, since_id=since_id) else: cursor = tweepy.Cursor(self.api.search, q=hashtag, count=100) try: for item in cursor.items(): tweets.append(item) except tweepy.error.TweepError: print("Reached Twitter rate limit") return tweets
def save(self): try: print("[search] [search_term: {}]".format(self.screen_name)) i = 0 for page in tweepy.Cursor(self.client.user_timeline, screen_name=self.screen_name, count=200).pages(100): print("{}.".format(i)) i = i + 1 sleep(config.TWITTER_API_DELAY) self.process_page(page) except tweepy.error.RateLimitError: print("[search] [error: rate limit] [{}]".format(self)) sleep(60) except tweepy.error.TweepError as e: print("[search] [error: tweepy] [{}]".format(e)) sleep(60) except: print("[search] [error: unknown] [{}]".format(sys.exc_info()[0])) sleep(60)
def limit_handled(cursor: tweepy.Cursor): """Wrap cursor access with rate limiting :param cursor: The cursor to siphon :returns: Cursor items """ while True: try: yield cursor.next() except tweepy.RateLimitError: time.sleep(15 * 60)
def get_friends(self, callback, pages_limit=0): api = self._api user = self._user if user.friends_count > _FRIENDS_COUNT_MAX_: logging.warning('The user [%d]-[%s] has too many [%d] friends!' % (user.id, user.screen_name, user.friends_count)) return cursor = tweepy.Cursor(api.friends_ids, user_id=user.id, screen_name=user.screen_name) friends = [] try: for friends_page in cursor.pages(pages_limit): friends.extend(friends_page) if callable(callback): callback(friends) except tweepy.TweepError as e: logging.warning([user.id, user.screen_name, e])
def get_tweets(listOfTweets, keyword, numOfTweets): # Iterate through all tweets containing the given word, api search mode for tweet in tweepy.Cursor(api.search, q=keyword).items(numOfTweets): # Add tweets in this format dict_ = {'Screen Name': tweet.user.screen_name, 'User Name': tweet.user.name, 'Tweet Created At': unicode(tweet.created_at), 'Tweet Text': tweet.text, 'User Location': unicode(tweet.user.location), 'Tweet Coordinates': unicode(tweet.coordinates), 'Retweet Count': unicode(tweet.retweet_count), 'Retweeted': unicode(tweet.retweeted), 'Phone Type': unicode(tweet.source), 'Favorite Count': unicode(tweet.favorite_count), 'Favorited': unicode(tweet.favorited), 'Replied': unicode(tweet.in_reply_to_status_id_str) } listOfTweets.append(dict_) return listOfTweets # Connect to DB
def crawl_target(api, target_type, target_list): for target in target_list: if target_type == 'user': statuses = limit_handled(tweepy.Cursor(api.user_timeline, id=target).items()) elif target_type == 'hashtag': statuses = limit_handled(tweepy.Cursor(api.search, target).items()) print('Crawling %s' % target) for status in statuses: if status.created_at.timestamp() > catastrophe_period_start: if not tweet_db.get(bytes(status.id_str, 'utf-8')): print('Saving tweet: %s' % status.id_str) write_to_tweet_db(status) if not user_db.get(bytes(status.author.id_str, 'utf-8')): print('Saving user: %s' % status.author.id_str) write_to_user_db(status.author) else: print('Reached {time}, on to the next {ttype}'.format(time=status.created_at.strftime('%Y %h %d %H:%M:%S'), ttype=target_type)) break
def _build_file(self): self.total_rows = 0 #Get recent tweets from dril and add to new file for status in tweepy.Cursor(api.user_timeline, 'dril', since_id=self.since).items(): self.total_rows += self._process_status(status) #Put content of old file in new file #This is kind of messy uhhh try: #Open things for reading and writing readFile = open('data/dril.csv', 'rt', encoding='utf-8') writeFile = open('data/new.csv', 'at', encoding='utf-8') read = reader(readFile) write = writer(writeFile, delimiter=',', quoting=QUOTE_NONNUMERIC) #Uhhhhmmmmmhmh mmmm for row in read: write.writerow([int(row[0]), row[1]]) self.total_rows += 1 except IOError: print('Failed to open file (1) [okay if this is the first time running]') #Rename the new file to be the old file os.rename('data/new.csv', 'data/dril.csv')
def past(name,d): # Global variable count initialized to 0 global count count = 0 # variable u storing todays date. u=datetime.date.today() # Cursor searching for tweet with matching query 'q=name' # 'since' refers to the starting date # 'until' refers to today's date # whenever the tweet with the matching query is fetched the count variable is incremented by one. for tweet in tweepy.Cursor(api.search,q=name,since=u-datetime.timedelta(d),until=u,lang='en').items(): count+=1 # REST API ends here. # Flask routing. # local host with template index.html(can be found in the template folder)
def get_followers_id (user_keys,api,user,f_log,flag_fast): dict_followers={} try: print 'get %s ids followers' % user for page in tweepy.Cursor(api.followers_ids,screen_name=user, count=5000, monitor_rate_limit=True, wait_on_rate_limit=True, wait_on_rate_limit_notify = True, retry_count = 5, retry_delay = 5 ).pages(): for follower_id in page: dict_followers[follower_id]=1 if flag_fast: return dict_followers except: f_log.write(('%s, %s error en tweepy, method followers/id, user %s\n') % (time.asctime(),TypeError(),user)) return dict_followers
def get_following_id (user_keys,api,user,f_log,flag_fast): dict_following={} try: print 'get %s ids followers' % user for page in tweepy.Cursor(api.friends_ids,screen_name=user, monitor_rate_limit=True, wait_on_rate_limit=True, wait_on_rate_limit_notify = True, retry_count = 5, retry_delay = 5 ).pages(): for following_id in page: dict_following[following_id]=1 if flag_fast: return dict_following except: f_log.write(('%s, %s error en tweepy, method friends/id, user %s\n') % (time.asctime(),TypeError(),user)) return dict_following
def get_tweets(username: str, num=1): statuses = list(tweepy.Cursor(api_twitter.user_timeline, id=id).items(num)) return statuses
def archive(userid, filename='saved.txt'): with open(filename, 'a') as save: for status in tweepy.Cursor(api_twitter.user_timeline, id=userid).items(200): save.write((html.unescape(encode_tweet(status))))
def get_tweets(api_twitter, username: str, num=1): statuses = list(tweepy.Cursor(api_twitter.user_timeline, id=id).items(num)) return statuses
def del_yesterday_info(): # ???????? d = datetime.now() + timedelta(days=-1) yesterday = "%s/%s/%s" % (d.year, d.month, d.day) api = get_api() # ?????????TL??? myinfo = api.me() try: tweets = tweepy.Cursor(api.user_timeline, id=myinfo.id).items(100) except Exception as e: log.exception(e) # ???????????? for t in tweets: r = re.compile(yesterday) sentence = t.text.encode('utf-8') s = re.match(r, sentence) if s is None: pass else: try: api.destroy_status(t.id) except Exception as e: log.exception(e)
def Cursor(self, *args, **kwargs): return tweepy.Cursor(*args, **kwargs)
def get_and_save_friends(user_id): global n_calls for friend in tweepy.Cursor(api.friends, user_id=user_id, count=200).items(): save_user(friend) # Las claves de acceso a twitter están definidas como variables de entorno
def get_followers(self, account_name): """Return a list of all the followers of an account""" followers = [] for page in tweepy.Cursor(self.api.followers_ids, screen_name=str(account_name)).pages(): followers.extend(page) return followers
def get_tweets_from_timeline(self): """Return a list of all the tweets from the home timeline""" tweets = [] for status in tweepy.Cursor(self.api.home_timeline).items(200): tweets.append(status) return tweets
def get_user_tweets(self): """Return a list of all tweets from the authenticathed API user.""" tweets = [] for status in tweepy.Cursor(self.api.user_timeline).items(): tweets.append(status) return tweets
def get_mentions_from_timeline(self): """Return a list of all the tweets from the home timeline""" tweets = [] for status in tweepy.Cursor(self.api.home_timeline, include_entities=True).items(200): if 'user_mentions' in status.entities: tweets.append(str(status.user.screen_name) + " " + str(status.created_at) + "\n" + status.text) mentions = [] for items in tweets: if 'pytwe_bot' in items: mentions.append(items) return mentions
def getFollowerIds(self, userId, limit=5000): if self._byProtected(userId): return [] followerIds = [] try: followers = tweepy.Cursor(\ self.API.followers_ids,\ user_id = userId, \ cursor = -1).items() for cnt, follower in enumerate(followers): if not cnt < limit: break followerIds.append(follower) except tweepy.error.TweepError as et: print(et) return [] return followerIds
def get_nphs_tweets(since=datetime.utcnow() - timedelta(hours=24)): """ Get most recent tweets from the Twitter list of NPHS students """ statuses = [] # Find all tweets since the provided datetime for status in Cursor(api.list_timeline, "1Defenestrator", "NPHS").items(): if status.created_at < since: break else: statuses.append(status) # statuses = api.list_timeline("1Defenestrator", "NPHS") # Filter out retweets and return return [s for s in statuses if not s.text.startswith("RT @")]
def get_friends(api, username, limit): """ Download friends and process them """ for friend in tqdm(tweepy.Cursor(api.friends, screen_name=username).items(limit), unit="friends", total=limit): process_friend(friend)
def get_tweets(api, username, limit): """ Download Tweets from username account """ for status in tqdm(tweepy.Cursor(api.user_timeline, screen_name=username).items(limit), unit="tw", total=limit): process_tweet(status)
def twitterdetails(username): auth = tweepy.OAuthHandler(cfg.twitter_consumer_key, cfg.twitter_consumer_secret) auth.set_access_token(cfg.twitter_access_token, cfg.twiter_access_token_secret) #preparing auth api = tweepy.API(auth) f = open("temptweets.txt","w+") #writing tweets to temp file- last 1000 for tweet in tweepy.Cursor(api.user_timeline, id=username).items(1000): f.write(tweet.text.encode("utf-8")) f.write("\n") #extracting hashtags f = open('temptweets.txt', 'r') q=f.read() strings = re.findall(r'(?:\#+[\w_]+[\w\'_\-]*[\w_]+)', q) #Regex(s) Source: https://marcobonzanini.com/2015/03/09/mining-twitter-data-with-python-part-2/ #extracting users tusers = re.findall(r'(?:@[\w_]+)', q) f.close() hashlist=[] userlist=[] for item in strings: item=item.strip( '#' ) item=item.lower() hashlist.append(item) hashlist=hashlist[:10] for itm in tusers: itm=itm.strip( '@' ) itm=itm.lower() userlist.append(itm) userlist=userlist[:10] return hashlist,userlist
def getConnection(profile1, profile2): followerProfile1 = [] for user in tweepy.Cursor(api.followers, screen_name=profile1).items(): followerProfile1.append(user.screen_name) followerProfile2 = [] for user in tweepy.Cursor(api.followers, screen_name=profile2).items(): followerProfile2.append(user.screen_name) sharedFollower = [] for i in len(followerProfile1): for e in len(followerProfile2): if (followerProfile1[i] == followerProfile2[e]): sharedFollower.append(followerProfile1[i]) print "[*] " + followerProfile1[i] print "\n[+] Total shared follower " + str(len(sharedFollower)) + "\n" followingProfile1 = [] for user in tweepy.Cursor(api.followers, screen_name=profile1).items(): followingProfile1.append(user.screen_name) followingProfile2 = [] for user in tweepy.Cursor(api.followers, screen_name=profile2).items(): followingProfile2.append(user.screen_name) sharedFollowing = [] for i in len(followingProfile1): for e in len(followingProfile2): if (followingProfile1[i] == followingProfile2[e]): sharedFollowing.append(followingProfile1[i]) print "[*] " + followingProfile1[i] print "\n[+] Total shared following " + str(len(sharedFollowing)) + "\n" getSharedFollower(profile1Follower,profile2Follower)
def get_followers(screen_name): timestamp = datetime.now() log_doc = { 'accounts': { screen_name: { 'started_at': timestamp.timestamp() } } } db.saveToImportLog(IMPORT_KEY, log_doc) if FOLLOWER_LIMIT == 0: print("Get all followers for @" + screen_name) else: print("Get %d followers for @%s" % (FOLLOWER_LIMIT, screen_name)) print(timestamp.strftime("%d.%m.%Y %H:%M:%S")) followers = [] for user in limit_handled(tweepy.Cursor(TWITTER_API.followers, screen_name="@"+screen_name, count=200).items(FOLLOWER_LIMIT)): followers.append(user) return followers # def get_all_retweeters(screen_name): # timestamp = time.strftime("%d.%m.%Y %H:%M:%S", time.localtime()) # print(timestamp) # all_retweeters = [] # for tweet in limit_handled(tweepy.Cursor(api.user_timeline, id=screen_name, count=200).items()): # print(tweet.id) # retweeters = get_retweets(tweet.id) # # somehow get to retweeters # # all_retweeters.append(retweeters_per_tweet) # return all_retweeters
def get_tweets(pages=1): """Return a (200*pages) of Trump's tweets.""" tweets = [] for page in t.Cursor( api.user_timeline, screen_name="realDonaldTrump", count=200 ).pages(pages): for tweet in page: tweets.append(_process_text(tweet.text)) return [i for i in tweets if i]
def tw_search_json(query, cnt=5): authfile = './auth.k' api = tw_oauth(authfile) results = {} meta = { 'username': 'text', 'usersince': 'date', 'followers': 'numeric', 'friends': 'numeric', 'authorid': 'text', 'authorloc': 'geo', 'geoenable': 'boolean', 'source': 'text' } data = [] for tweet in tweepy.Cursor(api.search, q=query, count=cnt).items(): dTwt = {} dTwt['username'] = tweet.author.name dTwt['usersince'] = tweet.author.created_at #author/user profile creation date dTwt['followers'] = tweet.author.followers_count #number of author/user followers (inlink) dTwt['friends'] = tweet.author.friends_count #number of author/user friends (outlink) dTwt['authorid'] = tweet.author.id #author/user ID# dTwt['authorloc'] = tweet.author.location #author/user location dTwt['geoenable'] = tweet.author.geo_enabled #is author/user account geo enabled? dTwt['source'] = tweet.source #platform source for tweet data.append(dTwt) results['meta'] = meta results['data'] = data return results # TWEEPY SEARCH FUNCTION
def read_messages(self, to): exit = [] searched_tweets = [status for status in tweepy.Cursor(self.api.search, q=to, lang=self.language).items(self.max_tweets)] for elem in searched_tweets: exit.append({'user_tweets':elem.user.id,'screen_name':elem.user.screen_name,'description':elem.user.description,'tweet_message':elem.text,'created_date':str(elem.created_at)}) return exit
def handle_mentions(api, responses): for status in tweepy.Cursor(api.mentions_timeline).items(): process_status(status, responses)
def get_tweets(user_id, api): cursor = tweepy.Cursor(api.user_timeline, user_id).pages() while True: try: tweets = [page for page in cursor] except tweepy.TweepError as e: tweets = [] api_codes = [401, 404, 500] if not str(e): break if(int(filter(str.isdigit, str(e))) in api_codes): break print('get_tweets: ' + str(e)) return tweets
def get_and_process_tweets(user="realdonaldtrump"): """ A function that uses tweepy to download all the tweets by a given `user`, processes the tweets for stopwords & weird internet formatting, tokenizes the tweets using the NLTK, and then uses markovify to output a reusable JSON file for use in generating future tweets. """ all_tweets = [] # a list in which to store DJT's tweets. #get DJT's tweets. for tweet in tweepy.Cursor(api.user_timeline, id=user).items(): if tweet.source == 'Twitter for Android': # only get tweets from DJT's # insecure Android phone fishy_tweet = clean_tweet(tweet.text) # and add them to the list. all_tweets.append(fishy_tweet) # write his crappy tweets to a text file. with open('djt_tweets.txt', 'w') as f: for tweet in all_tweets: f.write(tweet + ' ') # need the space so they don't stick together. # open the file to POS tag it and process the results into JSON. with open("djt_tweets.txt") as t: text = t.read() # text_model = POSifiedText(input_text=text, state_size=3) model_json = text_model.to_json() # save the json to disk for future use. with open('djt_tweets.json', 'w', encoding='utf-8') as j: json.dump(model_json, j, ensure_ascii=False)
def collect(self, since_id: str=None) -> Iterable[Dict[str, Any]]: """Collect tweets :param since_id: TODO :returns: TODO """ logger.debug("Collecting tweets") data = json.load(open("tweets-5.json", "r")) yield from data # for page in limit_handled(tweepy.Cursor(self._api.list_timeline, self.account_name, # self.source_list).pages(1)): # yield from page