def analyzetweets(self, access_token, access_token_secret, mytweets=False, q=None): auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) sentimentlist = [] subjectivitylist = [] number = NUMBER_OF_TWEETS tweets = tweepy.Cursor(api.user_timeline).items() if mytweets else tweepy.Cursor(api.search, q=q).items(number) for index, tweet in enumerate(tweets): analysis = TextBlob(tweet.text).sentiment sentimentlist.append(analysis.polarity) subjectivitylist.append(analysis.subjectivity) self.update_state(state="RUNNING", meta={"current": index + 1, "total": number}) sentimentavg = float(sum(sentimentlist) / max(len(sentimentlist), 1)) subjectivityavg = float(sum(subjectivitylist) / max(len(subjectivitylist), 1)) return {"current": number, "total": number, "subjectivityavg": subjectivityavg, "sentimentavg": sentimentavg}
def __init__(self): date_time_name = datetime.utcnow().strftime("%Y-%m-%d_%H-%M-%S") logging.basicConfig(filename=date_time_name + '.log', level=logging.INFO) path = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) self.config = configparser.ConfigParser() self.config.read(os.path.join(path, "configuration.txt")) self.sleep_time = int(self.config.get("settings", "time_between_retweets")) self.search_term = self.config.get("settings", "search_query") self.tweet_language = self.config.get("settings", "tweet_language") self.max_age_in_minutes = int(self.config.get("settings", "max_age_in_minutes")) self.last_id_file = self.build_save_point() self.savepoint = self.retrieve_save_point(self.last_id_file) auth = tweepy.OAuthHandler(self.config.get("twitter", "consumer_key"), self.config. get("twitter", "consumer_secret")) auth.set_access_token(self.config.get("twitter", "access_token"), self.config. get("twitter", "access_token_secret")) self.api = tweepy.API(auth)
def init_app(self, app): consumer_key = app.config.get('TWITTER_CONSUMER_KEY') consumer_secret = app.config.get('TWITTER_CONSUMER_SECRET') access_token = app.config.get('TWITTER_ACCESS_TOKEN') access_token_secret = app.config.get('TWITTER_ACCESS_TOKEN_SECRET') if not all([consumer_key, consumer_secret, access_token, access_token_secret]): raise RuntimeError( 'At least one of the following configuration values was not ' 'set: TWITTER_CONSUMER_KEY, TWITTER_CONSUMER_SECRET, ' 'TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET.' ) auth = tweepy.auth.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) app.extensions['tweepy'] = api
def send_notification(message): auth = tweepy.OAuthHandler("T4NRPcEtUrCEU58FesRmRtkdW", "zmpbytgPpSbro6RZcXsKgYQoz24zLH3vYZHOHAAs5j33P4eoRg") auth.set_access_token(config.TWITTER_ACCESS_TOKEN, config.TWITTER_ACCESS_TOKEN_SECRET) api = tweepy.API(auth) try: api.auth.get_username() except: logger.error(u"check your twitter credits!") return False logger.info(u"sending notification to twitter: %s" % message) if config.TWITTER_USE_DM: status = api.send_direct_message(user=config.TWITTER_DM_USER, text=message) else: status = api.update_status(status=message) if status: logger.info(u"Notification to twitter successfully send: %s" % status.text) return True else: logger.error(u"unable to send twitter notification: %s" % status) return False
def tweet_listener(): consumer_key = os.getenv("consumer_key") consumer_secret = os.getenv("consumer_secret") access_token = os.getenv("access_token") access_token_secret = os.getenv("access_token_secret") auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth) while True: try: stream = tweepy.Stream(auth=api.auth, listener=StreamListener(api)) print("listener starting...") stream.userstream() except Exception as e: print(e) print(e.__doc__)
def _initialize_api(self): """ Handles authentication with Twitter API using tweepy. Requires a file at config/twitter_creds.json with the following attributes: "access_token": "access_secret": "consumer_token": "consumer_secret": See Twitter OAUTH docs + Tweepy docs for more details http://docs.tweepy.org/en/v3.5.0/auth_tutorial.html :return: """ with open(self.loc.format('../config/twitter_creds.json')) as fp: config = json.load(fp) auth = tweepy.OAuthHandler(config['consumer_token'], config['consumer_secret']) auth.set_access_token(config['access_token'], config['access_secret']) self.logger.info('Successfully Authenticated to Twitter API') self.api = tweepy.API(auth)
def main(): """Module""" consumer_key = "" consumer_secret = "" access_token = "" access_token_secret = "" auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth, wait_on_rate_limit=True) quijote = QuijoteTweet("quijote.txt", "kek.txt") status_file = GettingId("last_status_id.txt") while True: id_status = status_file.get_id_from_file() tweet = quijote.get_quijote_tweet() api.update_status(tweet, in_reply_to_status_id=id_status) list_statuses = api.user_timeline(api.me().id) status_file.save_id_to_file(list_statuses[0].id) sleep(900)
def gymkhana_main(): json_config = open('tokens.json', 'r') tokens = json.load(json_config) json_config.close() consumer_key = tokens['consumer_key'] consumer_secret = tokens['consumer_secret'] access_token = tokens['access_token'] access_token_secret = tokens['access_token_secret'] auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) api = tweepy.API(auth, wait_on_rate_limit=True) listener = GymkhanaListener(api) stream = tweepy.Stream(api.auth, listener) filtro = ['@pytwe_bot', 'pytwe_bot', 'pytwe'] stream.filter(track=filtro)
def sendTweet(link, site, name): global restock # setup twitter C_KEY = "C_KEY" C_SECRET = "C_SECRET" A_TOKEN = "A_TOKEN" A_TOKEN_SECRET = "A_TOKEN_SECRET" auth = tweepy.OAuthHandler(C_KEY, C_SECRET) auth.set_access_token(A_TOKEN, A_TOKEN_SECRET) api = tweepy.API(auth) # send tweet alert = "\U0001f6a8 " sos = "\U0001f198 " flag = "\U0001f6a9 " tweet = alert+sos+flag+" IN STOCK "+flag+sos+alert tweet += "\n"+name tweet += "\nSite: "+site+"\n" tweet += link+"\n" tweet += strftime("%Y-%m-%d %H:%M:%S", gmtime()) print(tweet) api.update_status(tweet.encode('utf-8')) restock = 1
def sendTweet(link, site, name, price): global restock # setup twitter C_KEY = "KEYS" C_SECRET = "KEYS" A_TOKEN = "KEYS" A_TOKEN_SECRET = "KEYS" auth = tweepy.OAuthHandler(C_KEY, C_SECRET) auth.set_access_token(A_TOKEN, A_TOKEN_SECRET) api = tweepy.API(auth) # send tweet alert = "\U0001f6a8 " sos = "\U0001f198 " flag = "\U0001f6a9 " tweet = alert+sos+flag+" IN STOCK "+flag+sos+alert tweet += "\n"+name tweet += "\n$"+price tweet += "\nSite: "+site+"\n" tweet += link+"\n" tweet += strftime("%Y-%m-%d %H:%M:%S", gmtime()) print(tweet) api.update_status(tweet.encode('utf-8')) restock = 1
def __init__(self, CK=CK,CS=CS,AT=AT,AS=AS, byGetF=True): self.SAMPLE_NUM = 50 # if it's 1.0, ex) follow=100 and follower=0 # if it's 0.5, ex) follow=100 and follower=100 # if it's 0.25 ex) follow=33 and follower=100 self.MIN_FRIENDS_RATIO = 0.35 # 1 month self.MAX_DAY_SPAN = 7*4. # if it's 1.0, all tweets have url or hashtag self.MIN_HASHURL_RATIO = 0.55 auth = tweepy.OAuthHandler(CK, CS) auth.set_access_token(AT, AS) self.API = tweepy.API(auth, api_root='/1.1', wait_on_rate_limit=True) self.ME = self._getMe() if byGetF: # self.friends is not used now # self.friends = self.getFriendIds(self.ME) self.friends = [] self.followers = self.getFollowerIds(self.ME)
def getTimeline(self, limit=50000, resultType="recent"): try: tweets = [] tweetsObj = tweepy.Cursor(self.API.home_timeline, result_type=resultType, exclude_replies = False).items(limit) pBar = tqdm(tweetsObj, ascii=True, total=limit, desc="Getting Tweets!") for cnt, tweet in enumerate(pBar): pBar.update(1) if not cnt < limit: break tweets.append(tweet) except tweepy.error.TweepError as et: print(et) except Exception as e: print(e) return tweets
def getFriendIds(self, userId, limit=100000): if self._byProtected(userId): return [] friendIds = [] try: friends = tweepy.Cursor(\ self.API.friends_ids,\ user_id = userId, \ cursor = -1\ ).items() for cnt, friend in enumerate(friends): if not cnt < limit: break friendIds.append(friend) return friendIds except tweepy.error.TweepError as et: print(et) return []
def getTweets(self, userId, limit=50): tweets = [] try: tweetsObj = tweepy.Cursor( \ self.API.user_timeline, \ user_id=userId, \ exclude_replies = True \ ).items(limit) for cnt, tweet in enumerate(tweetsObj): if not cnt < limit: break # print(tweet.text.replace("\n", "")) tweets.append(tweet) except tweepy.error.TweepError as et: print(et) return tweets
def favoriteTweet(self, tweetId=None, tweet=None): if not tweetId is None and not tweet is None: return False if not tweet is None: tId = self._getTweetId(tweet) elif not tweetId is None: tId = tweetId else: print("please input a tweet id") try: self.API.create_favorite(tId) return 1, "Succeed in favoritting this tweet! %d"%tId except tweepy.error.TweepError as tp: #print(type(tp.reason)) if "429" in tp.reason: return 429, "Favo restriction! %d"%tId if "139" in tp.reason: return 139, "You have already favorite it! %d"%tId return -1, "Exception! %s"%str(tp.reason)
def is_target(screen_name, disable_targeting, model_file='cluster.pkl'): """ Returns a boolean for whether the user should be selected according to label identity returned by a prediction from a pretrained clustering algorithm. """ if disable_targeting: return True else: auth = tweepy.OAuthHandler(credentials.consumer_key, credentials.consumer_secret) auth.set_access_token(credentials.access_token, credentials.access_token_secret) api = tweepy.API(auth, parser=tweepy.parsers.JSONParser()) user_array = numpy.array([api.get_user(screen_name=screen_name)]) model = joblib.load(model_file) cluster_label = model.predict(user_array) return cluster_label == 1
def log_tweep_error(logger, tweep_error): """Log a TweepError exception.""" if tweep_error.api_code: if tweep_error.api_code == 32: logger.error("invalid API authentication tokens") elif tweep_error.api_code == 34: logger.error("requested object (user, Tweet, etc) not found") elif tweep_error.api_code == 64: logger.error("your account is suspended and is not permitted") elif tweep_error.api_code == 130: logger.error("Twitter is currently in over capacity") elif tweep_error.api_code == 131: logger.error("internal Twitter error occurred") elif tweep_error.api_code == 135: logger.error("could not authenticate your API tokens") elif tweep_error.api_code == 136: logger.error("you have been blocked to perform this action") elif tweep_error.api_code == 179: logger.error("you are not authorized to see this Tweet") else: logger.error("error while using the REST API: %s", tweep_error) else: logger.error("error with Twitter: %s", tweep_error)
def twitter_url(match, bot=None): # Find the tweet ID from the URL tweet_id = match.group(1) # Get the tweet using the tweepy API api = get_api(bot) if not api: return try: tweet = api.get_status(tweet_id) user = tweet.user except tweepy.error.TweepError: return # Format the return the text of the tweet text = " ".join(tweet.text.split()) if user.verified: prefix = u"\u2713" else: prefix = "" time = timesince.timesince(tweet.created_at, datetime.utcnow()) h = HTMLParser() return u"{}@\x02{}\x02 ({}): {} ({} ago)".format(prefix, user.screen_name, user.name, h.unescape(text), time)
def twitter_poster(tcreds): """Pass me a dict with twitter creds. Returns: a function to call to post to the given twitter account and get dict with relevant info """ auth = tweepy.OAuthHandler(tcreds["consumer_key"], tcreds["consumer_secret"]) auth.set_access_token(tcreds["access_token"], tcreds["access_secret"]) twitter = tweepy.API(auth) print("created credentials") def post_tweet(text): sobj = twitter.update_status(text) print("posted tweet") url = "https://twitter.com/" + sobj.user.screen_name + "/status/" + str(sobj.id) return {"text": sobj.text, "id": sobj.id, "date": sobj.created_at.isoformat(), "account": sobj.user.screen_name, "url": url} return post_tweet
def handle(self, *args, **options): path = '{0}/users'.format(settings.PORTRAIT_FOLDER) screen_name = options.get('screen_name', None) if not screen_name: print('no user') return api_keys = settings.TWITTER_USER_KEYS auth = tweepy.OAuthHandler(api_keys['consumer_key'], api_keys['consumer_secret']) auth.set_access_token(api_keys['access_token_key'], api_keys['access_token_secret']) api = tweepy.API(auth) user_response = api.get_user(screen_name)._json print(user_response) create_portrait_model(user_response, has_auth=False)
def test_sending_images(self): # ensure there is an image as the mock object will not do anything shutil.copy('./image.jpg', '/tmp/image.jpg') client = boto3.client('s3') client.download_file = MagicMock(return_value=None) auth = tweepy.OAuthHandler('foo', 'bar') api = tweepy.API(auth) api.update_with_media = MagicMock(return_value=Status()) tweet_images = TweetS3Images(api, client) tweet_images.send_image('test_bucket', 'image.jpg', cleanup=True) client.download_file.assert_called_with('test_bucket', 'image.jpg', '/tmp/image.jpg') api.update_with_media.assert_called_with( filename='image.jpg', status='New image image.jpg brought to you by lambda-tweet', file=tweet_images.get_file()) self.assertFalse(os.path.exists('/tmp/image-test.jpg'), 'The image was not cleaned up correctly.')
def get_tweets(self, since_id): """Looks up metadata for all Trump tweets since the specified ID.""" tweets = [] # Include the first ID by passing along an earlier one. since_id = str(int(since_id) - 1) # Use tweet_mode=extended so we get the full text. for status in Cursor(self.twitter_api.user_timeline, user_id=TRUMP_USER_ID, since_id=since_id, tweet_mode="extended").items(): # Use the raw JSON, just like the streaming API. tweets.append(status._json) self.logs.debug("Got tweets: %s" % tweets) return tweets
def get_tweet_text(self, tweet): """Returns the full text of a tweet.""" # The format for getting at the full text is different depending on # whether the tweet came through the REST API or the Streaming API: # https://dev.twitter.com/overview/api/upcoming-changes-to-tweets try: if "extended_tweet" in tweet: self.logs.debug("Decoding extended tweet from Streaming API.") return tweet["extended_tweet"]["full_text"] elif "full_text" in tweet: self.logs.debug("Decoding extended tweet from REST API.") return tweet["full_text"] else: self.logs.debug("Decoding short tweet.") return tweet["text"] except KeyError: self.logs.error("Malformed tweet: %s" % tweet) return None
def load_credentials(path=VAULT_PATH): ''' Load credentials from vault. ''' gist, api = None, None with open(path, 'r') as vault_file: try: vault = json.loads(vault_file.read()) auth = tweepy.OAuthHandler(vault['twitter']['consumer-key'], vault['twitter']['consumer-secret']) auth.set_access_token(vault['twitter']['access-token'], vault['twitter']['access-token-secret']) api = tweepy.API(auth) gist = vault['github'] except IOError: print 'Unable to read vault-file: {0}.'.format(path) except (KeyError, ValueError): print 'Unable to parse the vault-file.' return gist, api
def load_credentials(path=VAULT_PATH): ''' Load credentials from vault. ''' api = None with open(path, 'r') as vault_file: try: vault = json.loads(vault_file.read()) auth = tweepy.OAuthHandler(vault['twitter']['consumer-key'], vault['twitter']['consumer-secret']) auth.set_access_token(vault['twitter']['access-token'], vault['twitter']['access-token-secret']) api = tweepy.API(auth) except IOError: print 'Unable to read vault-file: {0}.'.format(path) except (KeyError, ValueError): print 'Unable to parse the vault-file.' return api
def authenticate(): #generate auth details and assign appropriately _get_secrets() consumer_key = secret_keys[0] consumer_secret = secret_keys[1] access_token = secret_keys[2] access_token_secret = secret_keys[3] #access our twitter app with the appropriate credentials auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) #create tweepy object global api api = tweepy.API(auth) #a test function to see if all the authenticateion is working
def get_new_tweets(wfm, querytype, query, old_tweets): # Authenticate with "app authentication" mode (high rate limit, read only) consumer_key = os.environ['CJW_TWITTER_CONSUMER_KEY'] consumer_secret = os.environ['CJW_TWITTER_CONSUMER_SECRET'] auth = tweepy.AppAuthHandler(consumer_key, consumer_secret) api = tweepy.API(auth) # Only get last 100 tweets, because that is twitter API max for single call if querytype == Twitter.QUERY_TYPE_USER: tweetsgen = api.user_timeline(query, count=200) else: tweetsgen = api.search(q=query, count=100) # Columns to retrieve and store from Twitter # Also, we use this to figure ou the index the id field when merging old and new tweets cols = ['id', 'created_at', 'text', 'in_reply_to_screen_name', 'in_reply_to_status_id', 'retweeted', 'retweet_count', 'favorited', 'favorite_count', 'source'] tweets = [[getattr(t, x) for x in cols] for t in tweetsgen] table = pd.DataFrame(tweets, columns=cols) return table # Combine this set of tweets with previous set of tweets
def sendTweet(item,color,link, size): # line 102 auth = tweepy.OAuthHandler(C_KEY, C_SECRET) auth.set_access_token(A_TOKEN, A_TOKEN_SECRET) api = tweepy.API(auth) tweet = item+"\n" tweet += color+'\n' tweet += size.title()+'\n' tweet += link+'\n' tweet += "Restock!"+'\n' tweet += str(datetime.utcnow().strftime('%H:%M:%S.%f')[:-3]) try: api.update_status(tweet) print(tweet) except: print("Error sending tweet!")
def main(): parser = argparse.ArgumentParser() parser.add_argument('source_file', type = argparse.FileType('a')) parser.add_argument('target_file', type = argparse.FileType('a')) parser.add_argument('--languages', nargs = '+', default = ['ja']) args = parser.parse_args() while True: try: auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET) api = tweepy.API(auth) reply_stream_listener = ReplyStreamListener(api, args.target_file, args.source_file) reply_stream = tweepy.Stream(auth = api.auth, listener = reply_stream_listener) reply_stream.sample(languages = args.languages) except: traceback.print_exc(limit = 10, file = sys.stderr, chain = False) time.sleep(10) continue
def tweet_listener(): auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET) api = tweepy.API(auth) while True: try: stream = tweepy.Stream(auth=api.auth, listener=StreamListener(api)) print("listener starting...") stream.userstream() except Exception as e: print(e) print(e.__doc__)
def authenticate(self): ''' Push the keys and tokens to OAuth to get the access to the API ''' # set OAuth self.auth = tweepy.OAuthHandler( self.appinfo['consumer'], self.appinfo['consumer_secret']) self.auth.set_access_token( self.appinfo['token'], self.appinfo['token_secret']) # TODO : Encrypt the contents of appinfo.json # API access self.api = tweepy.API( self.auth, wait_on_rate_limit = self.wait_ratelimit ) # TODO : Bypass the rate limit of Twitter API
def upload_images(twitter_api, images, image_format='jpeg', quality=90): '''Upload Image(s) to twitter using the given settings Args: twitter_api (:class:`tweepy.API`): the Twitter API instance to use images (list of :class:`PIL.Image.Image`): images to upload image_format (string): file format to upload as (e.g. 'jpeg', 'png') quality (int): quality percentage used for (Jpeg) compression Returns: list of media_ids ''' media_ids = [] for image in images: img_io = BytesIO() image.save(img_io, image_format, quality=quality) filename = "temp." + image_format img_io.seek(0) upload_res = twitter_api.media_upload(filename, file=img_io) media_ids.append(upload_res.media_id) return media_ids
def post_tweet(twitter_api, tweet_text, reply_id=None, media_ids=None): '''post a tweet to the timeline, trims tweet if appropriate Args: twitter_api (:class:`tweepy.API`): the Twitter API instance to use tweet_text (string): the status text to tweet reply_id (int): optional, nests the tweet as reply to that tweet (use id_str element from a tweet) media_ids (list of media_ids): optional, media to attach to the tweet Returns: bool: True if posted successfully, False otherwise ''' tweet_text = trim_tweet_text(tweet_text) try: twitter_api.update_status(tweet_text, reply_id, media_ids=media_ids) return True except tweepy.error.TweepError as e: cozmo.logger.error("post_tweet Error: " + str(e)) return False
def get_access_creds(): ''' Twitter API authentication credentials are stored in a file as: consumer_key \t consumer_secret \t access_token \t access_secret ''' oauths = [] print('Building list of developer access credentials...') credentials = pd.read_csv('twitter_dev_accounts', sep='\t', header=None, names=['consumer_key', 'consumer_secret', 'access_token', 'access_secret']) for index, row in credentials.iterrows(): auth = tweepy.auth.OAuthHandler(str(row['consumer_key']), str(row['consumer_secret'])) auth.set_access_token(str(row['access_token']), str(row['access_secret'])) oauth_api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True) if(verify_working_credentials(oauth_api)): oauths.append(oauth_api) return oauths
def run(self): while screen_names.count() > users.count(): s_name = screen_names.find_one_and_update({"collected": False}, {"$set": {"collected": True}}) logging.info("User {} in thread {}.".format(s_name["_id"], self.getName())) try: users.insert_one(data_user(s_name["_id"], self.api)) screen_names.update_one({"_id": s_name["_id"]}, {"$set": {"completed": True}}) except TweepError as e: logging.error("Ups, Arrived to API limit in thread {}. Exception: {}".format(self.getName(), e)) sleep(60 * 15) except Exception as e: logging.error("Could not store user {}, the Exceception was {}.".format(s_name["user"], e)) # Create a new process for each twitter authorization
def auth(config): """ Perform authentication with Twitter and return a client instance to communicate with Twitter :param config: ResponseBot config :type config: :class:`~responsebot.utils.config_utils.ResponseBotConfig` :return: client instance to execute twitter action :rtype: :class:`~responsebot.responsebot_client.ResponseBotClient` :raises: :class:`~responsebot.common.exceptions.AuthenticationError`: If failed to authenticate :raises: :class:`~responsebot.common.exceptions.APIQuotaError`: If API call rate reached limit """ auth = tweepy.OAuthHandler(config.get('consumer_key'), config.get('consumer_secret')) auth.set_access_token(config.get('token_key'), config.get('token_secret')) api = tweepy.API(auth) try: api.verify_credentials() except RateLimitError as e: raise APIQuotaError(e.args[0][0]['message']) except TweepError as e: raise AuthenticationError(e.args[0][0]['message']) else: logging.info('Successfully authenticated as %s' % api.me().screen_name) return ResponseBotClient(config=config, client=api)
def __init__(self, twitter_username): # TODO: Login to twitter for corpus generation using end user's credentials auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET) # Connect to Twitter - raise TweepError if we brick out on this try: api = tweepy.API(auth) except tweepy.TweepError: # TODO: make sure this error bubbles up and gets handled gracefully raise PermissionError("Twitter Auth failed") usr = api.get_user(twitter_username) self.username = twitter_username self.image = usr.profile_image_url # Exposes entire api - for debugging only # self.api = usr self.description = usr.description self.screen_name = usr.screen_name self.name = usr.name
def __init__(self, twitter_username): # TODO: Login to twitter for corpus generation using end user's credentials auth = tweepy.OAuthHandler(CONSUMER_KEY,CONSUMER_SECRET) auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET) # Connect to Twitter - raise TweepError if we brick out on this try: api = tweepy.API(auth) except tweepy.TweepError: # TODO: make sure this error bubbles up and gets handled gracefully raise PermissionError("Twitter Auth failed") usr = api.get_user(twitter_username) self.username = twitter_username self.image = usr.profile_image_url # Exposes entire api - for debugging only # self.api = usr self.description = usr.description self.screen_name = usr.screen_name self.name = usr.name
def __init__(self): ''' Class constructor or initialization method. ''' consumer_key = '18QHFMz0zvycM2KLrTMfrafI1' consumer_secret = 'WNwYGKBXmbfsY7ysZXxPJ4Voa7rgtLxGocuDHbIJ1TZLShtBVF' access_token = '843094924299976704-GNJyLjovEGFAiOWLswFBagKxlebRQUq' access_token_secret = 'L39Wz6lXKSavutPqhopmNwK7egJiSrwRxVohjbqVqbQvM' try: self.auth = OAuthHandler(consumer_key, consumer_secret) self.auth.set_access_token(access_token, access_token_secret) self.api = tweepy.API(self.auth) except: print("Error: Authentication Failed")