def twittercallback(): verification = request.args["oauth_verifier"] auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) try: auth.request_token = session["request_token"] except KeyError: flash("Please login again", "danger") return redirect(url_for("bp.home")) try: auth.get_access_token(verification) except tweepy.TweepError: flash("Failed to get access token", "danger") return redirect(url_for("bp.home")) session["access_token"] = auth.access_token session["access_token_secret"] = auth.access_token_secret return render_template("twittercallback.html", form=HashtagForm())
def send(self, picture): "Send a tweet. `picture` is a `Result` object from `picdescbot.common`" retries = 0 status = None filename = picture.url.split('/')[-1] data = picture.download_picture() try: while retries < 3 and not status: if retries > 0: self.log.info('retrying...') data.seek(0) try: status = self.api.update_with_media(filename=filename, status=picture.caption, file=data) except tweepy.TweepError as e: self.log.error("Error when sending tweet: %s" % e) retries += 1 if retries >= 3: raise else: time.sleep(5) finally: data.close(really=True) return status.id
def log_tweep_error(logger, tweep_error): """Log a TweepError exception.""" if tweep_error.api_code: if tweep_error.api_code == 32: logger.error("invalid API authentication tokens") elif tweep_error.api_code == 34: logger.error("requested object (user, Tweet, etc) not found") elif tweep_error.api_code == 64: logger.error("your account is suspended and is not permitted") elif tweep_error.api_code == 130: logger.error("Twitter is currently in over capacity") elif tweep_error.api_code == 131: logger.error("internal Twitter error occurred") elif tweep_error.api_code == 135: logger.error("could not authenticate your API tokens") elif tweep_error.api_code == 136: logger.error("you have been blocked to perform this action") elif tweep_error.api_code == 179: logger.error("you are not authorized to see this Tweet") else: logger.error("error while using the REST API: %s", tweep_error) else: logger.error("error with Twitter: %s", tweep_error)
def get_hydrated(writer, user_ids=None, screen_names=None): """Get hydrated Twitter User-objects from a list of user ids and/or screen names.""" LOGGER.info("get_hydrated() starting") ensure_at_least_one(user_ids=user_ids, screen_names=screen_names) user_ids = user_ids if user_ids else [] screen_names = screen_names if screen_names else [] # initialize config and Twitter API config = read_config() api = get_oauth_api(config) # OAuth gives more capacity for the users/lookup API # process user ids and/or screen names, storing returned users in JSON format num_users = 0 for chunk in gen_chunks(user_ids, screen_names, size=LOOKUP_USERS_PER_REQUEST): try: num_users += write_objs(writer, api.lookup_users, {"user_ids": chunk[0], "screen_names": chunk[1]}) except TweepError as err: log_tweep_error(LOGGER, err) LOGGER.info("downloaded %d user(s)", num_users) # finished LOGGER.info("get_hydrated() finished")
def get_followers(writer, user_id=None, screen_name=None): """Get the ids of the followers for a Twitter user id or screen name.""" LOGGER.info("get_followers() starting") ensure_only_one(user_id=user_id, screen_name=screen_name) # initialize config and Twitter API config = read_config() api = get_app_auth_api(config) # process user id or screen name, storing returned ids in plain text args = {"count": FOLLOWERS_IDS_COUNT} if user_id is not None: args.update({"user_id": user_id}) if screen_name is not None: args.update({"screen_name": screen_name}) limit = config.getint("followers", "limit") try: num_ids = write_ids(writer, api.followers_ids, args, cursored=True, limit=limit) LOGGER.info("downloaded %d follower id(s)", num_ids) except TweepError as err: log_tweep_error(LOGGER, err) # finished LOGGER.info("get_followers() finished")
def get_friends(writer, user_id=None, screen_name=None): """Get the ids of the friends for a Twitter user id or screen name.""" LOGGER.info("get_friends() starting") ensure_only_one(user_id=user_id, screen_name=screen_name) # initialize config and Twitter API config = read_config() api = get_app_auth_api(config) # process user id or screen name, storing returned ids in plain text args = {"count": FRIENDS_IDS_COUNT} if user_id is not None: args.update({"user_id": user_id}) if screen_name is not None: args.update({"screen_name": screen_name}) limit = config.getint("friends", "limit") try: num_ids = write_ids(writer, api.friends_ids, args, cursored=True, limit=limit) LOGGER.info("downloaded %d friend id(s)", num_ids) except TweepError as err: log_tweep_error(LOGGER, err) # finished LOGGER.info("get_friends() finished")
def get_hydrated(writer, tweet_ids): """Get hydrated Tweet-objects from a list of Tweet ids.""" LOGGER.info("get_hydrated() starting") # initialize config and Twitter API config = read_config() api = get_oauth_api(config) # OAuth gives more capacity for the statuses/lookup API # process Tweet ids, storing returned Tweets in JSON format num_tweets = 0 for chunk in gen_chunks(tweet_ids, size=LOOKUP_STATUSES_PER_REQUEST): try: num_tweets = write_objs(writer, api.statuses_lookup, {"id_": chunk[0]}) except TweepError as err: log_tweep_error(LOGGER, err) LOGGER.info("downloaded %d Tweet(s)", num_tweets) # finished LOGGER.info("get_hydrated() finished")
def get_retweets(writer, tweet_id): """Get hydrated Retweet-objects for a given Tweet id.""" LOGGER.info("get_retweets() starting") # initialize config and Twitter API config = read_config() api = get_app_auth_api(config) # process Tweet id, storing returned Retweets in JSON format try: num_retweets = write_objs(writer, api.retweets, {"id": tweet_id, "count": RETWEETS_COUNT}) LOGGER.info("downloaded %d Retweet(s)", num_retweets) except TweepError as err: log_tweep_error(LOGGER, err) # finished LOGGER.info("get_retweets() finished")
def tweet_status_trends(self): """ Tweets the top trending places """ logging.info("Updating status with Trending places....") try: base_text = "Top trending places in #OSM " + DATE.strftime('%d/%m') + ': ' end_text = '' count_available = TWITTER_STATUS_LIMIT - len(base_text) - len(end_text) text = Ft().get_cities_from_file(str(DATE.date()), REGION, count_available) img = Ft.get_trending_graph(str(DATE.date()), REGION) if text: self.api.update_with_media(img, base_text + text + end_text) self.state['last_tweet'] = time.time() else: self.api.update_status(ERROR_MSG) logging.info("Could not update status. Rechecking in a while....") except tweepy.TweepError as e: self._log_tweepy_error('Can\'t update status because', e)
def manage_auth_handlers(auths): index = 0 while True: api = auths[index] try: limit = api.rate_limit_status() status_limit = limit['resources']['statuses']['/statuses/user_timeline']['remaining'] if status_limit > 180: return api except tweepy.TweepError as e: #print('manage_auth_handlers ' + str(e)) pass finally: if index == (len(auths) - 1): index = 0 else: index += 1
def get_latest_tweet(self): """Checks the twitter handle for new tweets. If there has been a new tweet, it will return the Tweet to be checked for companies""" try: latest_tweet = self.api.user_timeline(screen_name=self.handle, count=1)[0] tweet = latest_tweet.text.encode('ascii', 'ignore').decode('utf-8') # Removes emjois with open(f'{LATEST_TWEET}{self.handle}.txt', "r") as f: old_tweet = f.read() if tweet != old_tweet: with open(f'{LATEST_TWEET}{self.handle}.txt', 'w') as f: f.write(tweet) self.tweet_id = latest_tweet.id_str self.tweet = tweet return tweet except tweepy.TweepError as error: logging.debug(error)
def initial_tweet(self, matches): """Tweets when a company is mentioned, along with it's sentiment.""" sentiment = self.sentiment_analysis() sentiment_dict = {"positive": u"\U00002705", "negative": u"\U0000274E", "neutral": u"\U00002796" } for comp in matches: try: self.api.update_status(f'{self.handle} just mentioned {comp.upper()} {sentiment}ly ' f'in their latest tweet! ' f'https://twitter.com/{self.handle}/status/{self.tweet_id}') except tweepy.TweepError as error: logging.debug(error)
def share_output(self): """Calls difference_in_shares from the Companies module, Outputs the data to twitter.""" share_dict = company.get_company_dict() for comp in share_dict: try: self.api.update_status( f'Since {share_dict[comp]["handle"]} mentioned {comp.upper()}, {share_dict[comp]["day"]} days ago, ' f'their shares have changed from {share_dict[comp]["initialSharePrice"]:.2f} to ' f"{share_dict[comp]['currentSharePrice']:} that's a {share_dict[comp]['shareChange']:.3f}% change!" ) except tweepy.TweepError as error: logging.debug(error)
def get_friends(self, callback, pages_limit=0): api = self._api user = self._user if user.friends_count > _FRIENDS_COUNT_MAX_: logging.warning('The user [%d]-[%s] has too many [%d] friends!' % (user.id, user.screen_name, user.friends_count)) return cursor = tweepy.Cursor(api.friends_ids, user_id=user.id, screen_name=user.screen_name) friends = [] try: for friends_page in cursor.pages(pages_limit): friends.extend(friends_page) if callable(callback): callback(friends) except tweepy.TweepError as e: logging.warning([user.id, user.screen_name, e])
def __init__(self, twitter_username): # TODO: Login to twitter for corpus generation using end user's credentials auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET) # Connect to Twitter - raise TweepError if we brick out on this try: api = tweepy.API(auth) except tweepy.TweepError: # TODO: make sure this error bubbles up and gets handled gracefully raise PermissionError("Twitter Auth failed") usr = api.get_user(twitter_username) self.username = twitter_username self.image = usr.profile_image_url # Exposes entire api - for debugging only # self.api = usr self.description = usr.description self.screen_name = usr.screen_name self.name = usr.name
def tweet_status(status, image_path=None): try: auth = tweepy.OAuthHandler(settings.CONSUMER_KEY, settings.CONSUMER_SECRET) auth.set_access_token(settings.ACCESS_TOKEN, settings.ACCESS_TOKEN_SECRET) api = tweepy.API(auth) if image_path: new_tweet = api.update_with_media(image_path, status=status) else: new_tweet = api.update_status(status) except tweepy.TweepError as ex: raise MeteoSangueException(ex) try: mention = '{0} {1}'.format( ' '.join([ass['twitter_id'] for ass in settings.BLOOD_ASSOCIATIONS if 'twitter_id' in ass]), 'Nuovo bollettino meteo ?', ) api.update_status(mention, in_reply_to_status_id=new_tweet.id) except tweepy.TweepError as ex: pass #Mention is allowed to fail silently
def check_human_accounts(): api = get_api(key3[0], key3[1], key3[2], key3[3]) ids = [] try: with open('..//humans.txt', 'r') as f: for line in f: ids.append(line.rstrip()) ids.append('\n') os.remove('..//humans.txt') with open('..//existing_humans_copy.txt', 'r') as f_read: with open('..//humans.txt', 'w') as f_write: f_write.write(''.join(ids)) for line in f_read: user = api.get_user(line.rstrip()) print(line) if not user.protected: f_write.write(line) except tweepy.TweepError: f_read.close() f_write.close()
def check_bot_accounts(): api = get_api(key2[0], key2[1], key2[2], key2[3]) ids = [] try: with open('..//bots.txt', 'r') as f: for line in f: ids.append(line.rstrip()) ids.append('\n') os.remove('..//bots.txt') with open('..//existing_bots_copy.txt', 'r') as f_read: with open('..//bots.txt', 'w') as f_write: f_write.write(''.join(ids)) for line in f_read: user = api.get_user(line.rstrip()) print(line) if not user.protected: f_write.write(line) except tweepy.TweepError: f_read.close() f_write.close()
def get_data_from_humans(): try: x = [] with open('..//human_x.txt', 'r') as f: reader = csv.reader(f) for row in reader: x.append(row) os.remove('..//human_x.txt') with open('..//humans_copy.txt', 'r') as f_read: with open('..//human_x.txt', 'w') as f_write: writer = csv.writer(f_write, lineterminator='\n') for row in x: writer.writerow(row) for count, line in enumerate(f_read): row = get_data(line.rstrip(), api) writer.writerow(row) print(line) except tweepy.TweepError: f_write.close()
def tweet_search(api, query, max_tweets, max_id, since_id, geocode): ''' Function that takes in a search string 'query', the maximum number of tweets 'max_tweets', and the minimum (i.e., starting) tweet id. It returns a list of tweepy.models.Status objects. ''' searched_tweets = [] while len(searched_tweets) < max_tweets: remaining_tweets = max_tweets - len(searched_tweets) try: new_tweets = api.search(q=query, count=remaining_tweets, since_id=str(since_id), max_id=str(max_id-1)) # geocode=geocode) print('found',len(new_tweets),'tweets') if not new_tweets: print('no tweets found') break searched_tweets.extend(new_tweets) max_id = new_tweets[-1].id except tweepy.TweepError: print('exception raised, waiting 15 minutes') print('(until:', dt.datetime.now()+dt.timedelta(minutes=15), ')') time.sleep(15*60) break # stop the loop return searched_tweets, max_id
def process_request(self, obj): """Convert a ReservoirQuery into an API request and get the response. Parameters ---------- obj : |ReservoirQuery| Returns ------- |Cargo| """ try: # TODO(LH): handle rate limit cursor = self._get_statuses(obj) data = [status._json for status in cursor.items()] status_code = 200 except tweepy.TweepError as error: data = [] status_code = error.api_code return Cargo(status_code=status_code, data=data)
def extract_tweets_from_a_source(self, source): if '@' not in source: source = '@' + source extracted_tweets = [] try: print "Extracting %s..." % source max_twitter_id = None while True: tw = self.API.user_timeline(screen_name=source, count=self.N_TWEETS_PER_REQUEST, max_id=max_twitter_id) if not len(tw): break extracted_tweets += [self.get_filtered_tweet(t) for t in tw] earliest_tweet_date = extracted_tweets[-1]['created_at'] if earliest_tweet_date < self.from_time: break max_twitter_id = extracted_tweets[-1]['id'] - 1 except TweepError: print "Error processing", source print "\textracted %d tweets for %s" % (len(extracted_tweets), source) self.TWEET_STORAGE += extracted_tweets return extracted_tweets
def _handle_twitter_rate_limit(cursor): '''Handle twitter rate limits. If rate limit is reached, the next element will be accessed again after sleep time''' while True: try: yield cursor.next() except tweepy.RateLimitError: log.info('Twitter API rate limit error. Sleeping for {} secs.') \ .format(TWITTER_API_RATE_LIMIT_PERIOD) sleep_time = TWITTER_API_RATE_LIMIT_PERIOD time.sleep(sleep_time) except tweepy.TweepError as e: if str(e.api_code) == TWITTER_API_USER_NOT_FOUND_ERROR_CODE: raise ValueError( 'Requested user was not found. Check your configuration') raise e
def tweet_grab(self, query, count): tweets = [] try: tweets_fetched = self.api.search(q=query, count=count) for tweet in tweets_fetched: tweets_parsed = {} tweets_parsed['text'] = tweet.text tweets_parsed['sentiment'] = self.tweet_sentiment(tweet.text) if tweet.retweet_count > 0: if tweets_parsed not in tweets: tweets.append(tweets_parsed) else: tweets.append(tweets_parsed) return tweets except tweepy.TweepError: # pragma no cover print('Error : ' + str(tweepy.TweepError))
def twittersignin(): auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) try: redirect_url = auth.get_authorization_url() session["request_token"] = auth.request_token except tweepy.TweepError: flash("Failed to get request token", "danger") return redirect(url_for("bp.home")) return redirect(redirect_url)
def twitter(): auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg", "http://"+ request.environ["HTTP_HOST"] + "/auth/twitter") auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET) api = tweepy.API(auth) try: if api.me().name: return redirect(url_for('index')) except tweepy.TweepError: pass redirect_url = auth.get_authorization_url() session["request_token"] = auth.request_token return redirect(redirect_url)
def twitter_bot(): # Only allocate part of the gpu memory when predicting. gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.2) tf_config = tf.ConfigProto(gpu_options=gpu_options) consumer_key = os.getenv("consumer_key") consumer_secret = os.getenv("consumer_secret") access_token = os.getenv("access_token") access_token_secret = os.getenv("access_token_secret") auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) with tf.Session(config=tf_config) as sess: predictor = predict.EasyPredictor(sess) for tweet in tweets(): status_id, status, bot_flag = tweet print("Processing {0}...".format(status.text)) screen_name = status.author.screen_name replies = predictor.predict(status.text) if not replies: print("no reply") continue reply_body = replies[0] if reply_body is None: print("No reply predicted") else: try: post_reply(api, bot_flag, reply_body, screen_name, status_id) except tweepy.TweepError as e: # duplicate status if e.api_code == 187: pass else: raise mark_tweet_processed(status_id)
def bulk_process(logger, output_dir, filename_tmpl, function, func_input, var_arg, resume=False): # pylint: disable=too-many-arguments """Process a function in bulk using an iterable input and a variable argument.""" if not path.exists(output_dir): makedirs(output_dir) logger.info("created output directory: %s", output_dir) num_processed = 0 for basename, value in func_input: output_filename = path.join(output_dir, filename_tmpl % basename) # check if there is a previous processing and skip or resume it latest_id = None if path.exists(output_filename): if not resume: logger.warning("skipping existing file: %s", output_filename) continue latest_id = _get_latest_id(output_filename) # process the input element with the provided function try: logger.info("processing: %s", value) args = {var_arg: value} if latest_id is not None: args.update({"since_id": latest_id}) logger.info("latest id processed: %d", latest_id) with open(output_filename, "a" if resume else "w") as writer: function(writer, **args) num_processed += 1 except TweepError: logger.exception("exception while using the REST API") return num_processed
def get_timeline(writer, user_id=None, screen_name=None, since_id=0): """Get hydrated Tweet-objects from a user timeline.""" LOGGER.info("get_timeline() starting") ensure_only_one(user_id=user_id, screen_name=screen_name) # initialize config and Twitter API config = read_config() api = get_app_auth_api(config) # process user id or screen name, storing returned Tweets in JSON format num_tweets = 0 args = {"count": TIMELINE_COUNT} if user_id is not None: args.update({"user_id": user_id}) if screen_name is not None: args.update({"screen_name": screen_name}) if since_id > 0: args.update({"since_id": since_id}) limit = config.getint("timeline", "limit") try: num_tweets = write_objs(writer, api.user_timeline, args, cursored=True, limit=limit) LOGGER.info("downloaded %d Tweet(s)", num_tweets) except TweepError as err: log_tweep_error(LOGGER, err) # finished LOGGER.info("get_timeline() finished")
def search(writer, query, since_id=0): """Get hydrated Tweet-objects using the Search API.""" LOGGER.info("search() starting") # initialize config and Twitter API config = read_config() api = get_app_auth_api(config) # process the query, storing returned Tweets in JSON format num_tweets = 0 search_params = { "q": query, "count": SEARCH_COUNT, "result_type": "recent", } if since_id > 0: search_params.update({"since_id": since_id}) limit = config.getint("search", "limit") try: num_tweets = write_objs(writer, api.search, search_params, cursored=True, limit=limit) LOGGER.info("downloaded %d Tweet(s)", num_tweets) except TweepError as err: log_tweep_error(LOGGER, err) # finished LOGGER.info("search() finished")
def handle(self, *args, **options): api_keys = settings.TWITTER_USER_KEYS auth = tweepy.OAuthHandler(api_keys['consumer_key'], api_keys['consumer_secret']) auth.set_access_token(api_keys['access_token_key'], api_keys['access_token_secret']) new_portraits = Portrait.objects.filter(active=True, demo_portrait=False) print('scheduled portraits', new_portraits.count()) for portrait in new_portraits: is_new_portrait = portrait.portrait_content is None print('user', portrait.auth_screen_name) print('new', is_new_portrait) try: portrait_api(portrait) print('OK') except TweepError as err: print('ERROR', err) portrait.active = False portrait.save() continue except Exception as err: print('ERROR', err) continue # to avoid too many connections time.sleep(5)
def handle(self, *args, **options): api_keys = settings.TWITTER_USER_KEYS auth = tweepy.OAuthHandler(api_keys['consumer_key'], api_keys['consumer_secret']) try: redirect_url = auth.get_authorization_url() except tweepy.TweepError as e: print('Error! Failed to get request token.') raise e print(redirect_url) verifier = input('Verifier:') print(auth.get_access_token(verifier=verifier))
def random_tweet(): # build a twitter list of five possible tweets tweet_list = [ 'Charm was a scheme for making strangers like and trust a person immediately, no matter what the charmer had in mind.', "There is no good reason good can't triumph over evil, if only angels will get organized along the lines of the mafia.", "Shrapnel was invented by an Englishman of the same name. Don't you wish you could have something named after you?", "If your brains were dynamite there wouldn't be enough to blow your hat off.", "And so on." ] x = 0 # create a loop to tweet twice while x < 2: try: # generate a random variable between 0 and 4 rand = random.randint(0, 4) # use that random variable to grab an item from tweet_list api.update_status(tweet_list[rand]) # take a 30-second break so we don't overwhelm our followers sleep(30) #increment our counter x += 1 # if there's an error, catch it instead of crashing and print out the error message except tweepy.TweepError, e: print 'Error sending tweet:', e[0][0]['message'] # a function to tweet a gif
def gif_tweet(): try: # takes a file location and a text string to update status api.update_with_media('../img/pulp-fiction-search.gif','BEHOLD, a gif') # if there's an error, catch it instead of crashing and print out the error message except tweepy.TweepError, e: print 'Error sending tweet:', e[0][0]['message'] # utility function for retrieving secret keys # from our secret key file
def on_follow(self, f_id): """ Follow back on being followed """ try: self.api.create_friendship(f_id, follow=True) self.friends.append(f_id) logging.info('Followed user id {}'.format(f_id)) except tweepy.TweepError as e: self._log_tweepy_error('Unable to follow user', e)
def _follow_all(self): """ follows all followers on initialization """ logging.info("Following back all followers....") try: self.followers['new'] = list(set(self.followers['existing']) - set(self.friends)) self._handle_followers() except tweepy.TweepError as e: self._log_tweepy_error('Can\'t follow back existing followers', e)
def _check_followers(self): """ Checks followers. """ logging.info("Checking for new followers...") try: self.followers['new'] = [f_id for f_id in self.api.followers_ids(self.id) if f_id not in self.followers['existing']] self.state['last_follow_check'] = time.time() except tweepy.TweepError as e: self._log_tweepy_error('Can\'t update followers', e)
def twitter_bot(): tf_config = tf.ConfigProto(device_count = {"GPU":0}, log_device_placement = True) auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET) api = tweepy.API(auth) with tf.Session(config = tf_config) as sess: predictor = predict.EasyPredictor(sess) for tweet in tweets(): status_id, status, bot_flag = tweet print("Processing {0}...".format(status.text)) screen_name = status.author.screen_name replies = predictor.predict(status.text) if not replies: print("no reply") continue reply_body = replies[0] if reply_body is None: print("No reply predicted") else: try: if is_contain(status.text, '??????'): special_reply(api, bot_flag, screen_name, status_id, code = 1) elif is_contain(status.text, '????'): special_reply(api, bot_flag, screen_name, status_id, code = 2) elif is_contain(status.text, '?????'): special_reply(api, bot_flag, screen_name, status_id, code = 3) else: post_reply(api, bot_flag, reply_body, screen_name, status_id) except tweepy.TweepError as e: if e.api_code == 187: pass else: raise mark_tweet_processed(status_id)
def get_tweets(user_id, api): cursor = tweepy.Cursor(api.user_timeline, user_id).pages() while True: try: tweets = [page for page in cursor] except tweepy.TweepError as e: tweets = [] api_codes = [401, 404, 500] if not str(e): break if(int(filter(str.isdigit, str(e))) in api_codes): break print('get_tweets: ' + str(e)) return tweets
def user_status_count(user_id, api): try: user = api.get_user(user_id=user_id) if(user.statuses_count): count = user.statuses_count except tweepy.TweepError as e: count = 0 #print(e.message[0]['message']) finally: return count
def verify_working_credentials(api): verified = True try: api.verify_credentials() except tweepy.TweepError as e: verified = False finally: return verified
def send_token(): global auth auth = tweepy.OAuthHandler(consumer_key, consumer_secret, callback_url) try: redirect_url = auth.get_authorization_url() session['request_token'] = auth.request_token except tweepy.TweepError: print ('Error! Failed to get request token.') return redirect(redirect_url) # callback. once twitter authorizes user, it sends user back to this page
def get_verification(): global auth #get the verifier key from the request url verifier = request.args['oauth_verifier'] auth = tweepy.OAuthHandler(consumer_key, consumer_secret) token = session['request_token'] del session['request_token'] auth.request_token = token try: auth.get_access_token(verifier) except tweepy.TweepError: print 'Error! Failed to get access token.' api = tweepy.API(auth) # cache info to avoid rate limit access_info['api'] = api access_info['following'] = api.friends_ids(api.me().screen_name) access_info['followers'] = api.followers_ids(api.me().screen_name) access_info["access_token"] = auth.access_token access_info["access_token_secret"] = auth.access_token_secret return redirect(url_for('twitter_DL'))
def troll_bot_analyzer(user, api): try: user_data = data_user(user, api) except tweepy.TweepError: logging.error("This user is protected or does not exist. His information cannot be accessed") else: if len(user_data["tweets"]) == 0: logging.error("There is not enough information to classify this user") return False hashtags_per_tweet = float(user_data["number_hashtags"]) / len(user_data["tweets"]) mentions_per_tweet = float(user_data["number_mentions"]) / len(user_data["tweets"]) percentage_tweet_with_mention = float(user_data["tweet_with_mentions"]) / len(user_data["tweets"]) percentage_tweet_with_hashtag = float(user_data["tweets_with_hashtags"]) / len(user_data["tweets"]) signs_per_char, capitals_per_char, activity, percentage_tweet_with_omg = drama_queen(user_data) periodicity, answer = periodicity_answer(user_data) diversity_hashtags = tweet_iteration_hashtags(user_data) diversity_tweets = tweet_iteration_stemming(user_data) urls_percentage = tweet_iteration_urls(user_data) num_stalker, who_stalker = stalker(user_data) per_drama_queen = percentage_drama_queen(activity, percentage_tweet_with_omg, capitals_per_char, signs_per_char, percentage_tweet_with_hashtag, percentage_tweet_with_mention, mentions_per_tweet, hashtags_per_tweet) per_bot = percentage_bot(periodicity, answer, diversity_tweets) per_stalker, famous, non_famous = percentage_stalker(num_stalker, who_stalker, mentions_per_tweet, percentage_tweet_with_mention, api) if per_stalker == 0: per_stalker = num_stalker per_spammer = percentage_spammer(diversity_tweets, diversity_hashtags, urls_percentage) per_hater = (1 - sentiment(user_data)) * 100 max_value = [per_bot, per_drama_queen, per_stalker, per_hater, per_spammer] index = max_value.index(max(max_value)) labels = ["bot", "drama_queen", "stalker", "hater", "spammer"] final = labels[index] return {"user_id": user, "bot": per_bot, "drama_queen": per_drama_queen, "stalker": per_stalker, "hater": per_hater, "spammer": per_spammer, "famous": famous, "non_famous": non_famous, "stalked": who_stalker, "final": final}
def get_user(self, uid=None, name=None): if not self._api: raise tweepy.TweepError('Api NOT inited!') try: user = self._api.get_user(user_id=uid, screen_name=name) self._user = user except tweepy.TweepError as e: logging.error('Uid ({0}) and name ({1}) has error: {2}'.format(uid, name, e)) if e.api_code in self._IGNORE_ERROR_CODES: return None raise e return self
def authentication(method): def judge(self, *args, **kwargs): if not self._api: raise tweepy.TweepError('Api NOT inited!') if not self._user: raise tweepy.TweepError('User NOT inited!') method(self, *args, **kwargs) return self return judge
def get_user_init(): global status global path global rev try: path = 'D:/Twitter/Depth-%d/%s' % (depth_num,str(user_id)) try: os.makedirs(path) except FileExistsError: print("Folder already exist") status="skip" return data = api.get_user(user_id) if data['protected']==False: get_user(data) rev=1 else: status="skip" print("Protected") except tweepy.RateLimitError: countdown(960) get_user_init() except tweepy.TweepError as e: if tweepy.TweepError is "[{'message': 'Over capacity', 'code': 130}]" or e[0]['code'] is 130: countdown_te(600,e) get_user_init() elif tweepy.TweepError is "[{'message': 'User not found.', 'code':50}]" or e[0]['code'] is 50: status="skip" return else: print(e)
def get_id(sn): try: return(api.get_user(screen_name=sn)['id']) except tweepy.RateLimitError: countdown(960) get_id(sn) except tweepy.TweepError as e: if tweepy.TweepError is "[{u'message': u'Over capacity', u'code': 130}]" or e is "[{u'message': u'Over capacity', u'code': 130}]": countdown_te(600,e) get_id(sn) else: print(e)
def limit_handler(cursor): while True: try: yield cursor.next() except tweepy.RateLimitError: countdown(960) except tweepy.TweepError as e: if tweepy.TweepError is "[{u'message': u'Over capacity', u'code': 130}]" or e is "[{u'message': u'Over capacity', u'code': 130}]": countdown_te(600,e) else: print(e)