我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用twitter.OAuth()。
def load(self): self.oauth = OAuth( self.config.get(c.ACCESS_TOKEN_KEY), self.config.get(c.ACCESS_TOKEN_SECRET_KEY), self.config.get(c.CONSUMER_KEY_KEY), self.config.get(c.CONSUMER_SECRET_KEY), ) self.client = TwitterAPI(auth=self.oauth) context = dict(plugin=self, bot=self.bot) self.add_web_handlers( (r'/twitter/oauth_token', OAuthTokenHandler, context), (r'/twitter/authorize', OAuthAuthorizeHandler, context), (r'/twitter/status', TwitterStatusHandler, context), (r'/twitter/disconnect', TwitterDisconnectHandler, context), ) self.monitored = set() self.restore_monitored() self.request_tokens = dict() self.last_tweets = dict() self.interactive_tweets = dict()
def create_api(): """create a twitter api""" from twitter import OAuth, Twitter from app import APP_INSTANCE as app access_token = app.get_config('api.twitter.access_token') access_secret = app.get_config('api.twitter.access_secret') consumer_key = app.get_config('api.twitter.consumer_key') consumer_secret = app.get_config('api.twitter.consumer_secret') # temporary fix for certificate error ssl._create_default_https_context = ssl._create_unverified_context oauth = OAuth(access_token, access_secret, consumer_key, consumer_secret) # Initiate the connection to Twitter API return Twitter(auth=oauth)
def __init__(self, consumer_key, consumer_secret, user_credentials): credentials = json.loads(user_credentials) auth = OAuth(credentials['oauth_token'], credentials['oauth_token_secret'], consumer_key, consumer_secret) self.api = Twitter(auth=auth)
def connect(self): self.__client = Twitter( auth=OAuth(self.__token, self.__token_key, self.__con_secret, self.__con_secret_key)) # Send a start up tweet
def save_user(self, user, oauth_token, oauth_verifier): oauth = OAuth( oauth_token, self.request_tokens[oauth_token], self.config.get(c.CONSUMER_KEY_KEY), self.config.get(c.CONSUMER_SECRET_KEY), ) tweaked_twitter_client = TwitterAPI( auth = oauth, format = '', api_version = None ) try: response = tweaked_twitter_client.oauth.access_token( oauth_verifier = oauth_verifier ) except Exception as e: self.error(e) pass else: params = dict( (key, value) for key, value in ( component.split('=') for component in response.split('&') ) ) with self.transaction() as trans: trans.execute(q.add_user, dict( id = user.id, token = params['oauth_token'], secret = params['oauth_token_secret'] ))
def get_user_api(self, oauth_user): oauth = OAuth( oauth_user.token, oauth_user.secret, self.config.get(c.CONSUMER_KEY_KEY), self.config.get(c.CONSUMER_SECRET_KEY), ) return TwitterAPI(auth = oauth)
def get_twitter_client(): return twitter.Twitter( auth=twitter.OAuth( os.environ.get('ACCESS_TOKEN'), os.environ.get('ACCESS_TOKEN_SECRET'), os.environ.get('CONSUMER_KEY'), os.environ.get('CONSUMER_SECRET'), ) )
def get_login_url(callback='oob', consumer_key=None, consumer_secret=None): twitter = Twitter( auth=OAuth('', '', consumer_key, consumer_secret), format='', api_version=None) resp = url_decode(twitter.oauth.request_token(oauth_callback=callback)) oauth_token = resp['oauth_token'] oauth_token_secret = resp['oauth_token_secret'] token = OAuthToken(token=oauth_token, token_secret=oauth_token_secret) db.session.merge(token) db.session.commit() return ( "https://api.twitter.com/oauth/authenticate?oauth_token=%s" % (oauth_token,))
def receive_verifier(oauth_token, oauth_verifier, consumer_key=None, consumer_secret=None): temp_token = OAuthToken.query.get(oauth_token) if not temp_token: raise Exception("OAuth token has expired") twitter = Twitter( auth=OAuth(temp_token.token, temp_token.token_secret, consumer_key, consumer_secret), format='', api_version=None) resp = url_decode( twitter.oauth.access_token(oauth_verifier=oauth_verifier)) db.session.delete(temp_token) new_token = OAuthToken(token=resp['oauth_token'], token_secret=resp['oauth_token_secret']) new_token = db.session.merge(new_token) new_twitter = Twitter( auth=OAuth(new_token.token, new_token.token_secret, consumer_key, consumer_secret)) remote_acct = new_twitter.account.verify_credentials() acct = account_from_api_user_object(remote_acct) acct = db.session.merge(acct) new_token.account = acct db.session.commit() return new_token
def get_twitter_for_acc(account): consumer_key = app.config['TWITTER_CONSUMER_KEY'] consumer_secret = app.config['TWITTER_CONSUMER_SECRET'] tokens = (OAuthToken.query.with_parent(account) .order_by(db.desc(OAuthToken.created_at)).all()) for token in tokens: t = Twitter( auth=OAuth(token.token, token.token_secret, consumer_key, consumer_secret)) try: t.account.verify_credentials() return t except TwitterHTTPError as e: if e.e.code == 401: # token revoked if sentry: sentry.captureMessage( 'Twitter auth revoked', extra=locals()) db.session.delete(token) db.session.commit() else: raise TemporaryError(e) except URLError as e: raise TemporaryError(e) return None
def twitter_login(self, cons_key, cons_secret, access_token, \ access_token_secret): """Logs in to Twitter, using the provided access keys. You can get these for your own Twitter account at apps.twitter.com Arguments cons_key - String of your Consumer Key (API Key) cons_secret - String of your Consumer Secret (API Secret) access_token - String of your Access Token access_token_secret - String of your Access Token Secret """ # Raise an Exception if the twitter library wasn't imported if not IMPTWITTER: self._error(u'twitter_login', u"The 'twitter' library could not be imported. Check whether it is installed correctly.") # Log in to a Twitter account self._oauth = twitter.OAuth(access_token, access_token_secret, \ cons_key, cons_secret) self._t = twitter.Twitter(auth=self._oauth) self._ts = twitter.TwitterStream(auth=self._oauth) self._loggedin = True # Get the bot's own user credentials self._credentials = self._t.account.verify_credentials()
def _twitter_reconnect(self): """Logs in to Twitter, using the stored OAuth. This function is intended for internal use, and should ONLY be called after twitter_login has been called. """ # Report the reconnection attempt. self._message(u'_twitter_reconnect', \ u"Attempting to reconnect to Twitter.") # Raise an Exception if the twitter library wasn't imported if not IMPTWITTER: self._error(u'_twitter_reconnect', u"The 'twitter' library could not be imported. Check whether it is installed correctly.") # Log in to a Twitter account self._t = twitter.Twitter(auth=self._oauth) self._ts = twitter.TwitterStream(auth=self._oauth) self._loggedin = True # Get the bot's own user credentials self._credentials = self._t.account.verify_credentials() # Report the reconnection success. self._message(u'_twitter_reconnect', \ u"Successfully reconnected to Twitter!")
def __init__(self): ACCESS_TOKEN = self.access_token ACCESS_SECRET = self.access_secret CONSUMER_KEY = self.consumer_key CONSUMER_SECRET = self.consumer_secret print ACCESS_TOKEN, ACCESS_SECRET, CONSUMER_KEY, CONSUMER_SECRET oauth = OAuth(ACCESS_TOKEN, ACCESS_SECRET, CONSUMER_KEY, CONSUMER_SECRET) self.twitter = Twitter(auth=oauth)
def post_tweet(text, image_path=None, user_token=None, user_secret=None, consumer_key=None, consumer_secret=None): """ Post a tweet, optionally with an image attached. """ if len(text) > 140: raise ValueError('tweet is too long') auth = twitter.OAuth( user_token or os.environ['TWITTER_USER_TOKEN'], user_secret or os.environ['TWITTER_USER_SECRET'], consumer_key or os.environ['TWITTER_CONSUMER_KEY'], consumer_secret or os.environ['TWITTER_CONSUMER_SECRET']) image_data, image_id = None, None if image_path: with open(image_path, 'rb') as image_file: image_data = image_file.read() t_up = twitter.Twitter(domain='upload.twitter.com', auth=auth) image_id = t_up.media.upload(media=image_data)['media_id_string'] if not (text.strip() or image_id): raise ValueError('no text or images to tweet') t_api = twitter.Twitter(auth=auth) params = { 'status': text, 'trim_user': True, } if image_id: params.update({ 'media[]': base64.b64encode(image_data), '_base64': True, }) return t_api.statuses.update_with_media(**params) else: return t_api.statuses.update(**params)
def __init__(self, bot): super(TweetPlugin, self).__init__(bot) self.command = "!tweet" self.twitter = twitter.Twitter( auth=twitter.OAuth( config["PLUGINS"]["TWITTER_ATOKEN"], config["PLUGINS"]["TWITTER_ASECRET"], config["PLUGINS"]["TWITTER_CKEY"], config["PLUGINS"]["TWITTER_CSECRET"] ) )
def connect(self): self.client = Twitter( auth=OAuth(self.token, self.token_key, self.con_secret, self.con_secret_key)) #Set the appropriate settings for each alert
def __init__(self): self.t = Twitter(auth=OAuth( \ config.TOKEN, config.TOKEN_KEY, config.CON_SECRET, config.CON_SECRET_KEY) )