我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twitter.Api()。
def tweets(self): """ Generator yielding the last tweets (200 is the API limit) since the last tweet saved in Jarbas database. If no tweet is saved it yields the last 200 tweets. """ kwargs = dict( screen_name='RosieDaSerenata', count=200, # this is the maximum suported by Twitter API include_rts=False, exclude_replies=True ) latest_tweet = Tweet.objects.first() if latest_tweet: kwargs['since_id'] = latest_tweet.status api = twitter.Api(*self.credentials, sleep_on_rate_limit=True) yield from api.GetUserTimeline(**kwargs)
def tokenize_random_tweet(self): """ If the twitter library is installed and a twitter connection can be established, then tokenize a random tweet. """ try: import twitter except ImportError: print "Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/" from random import shuffle api = twitter.Api() tweets = api.GetPublicTimeline() if tweets: for tweet in tweets: if tweet.user.lang == 'en': return self.tokenize(tweet.text) else: raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")
def get_random_tweet(session): """ Grab token from session and get a tweet! """ token = session['user']['accessToken'].split(',') key, secret = token[0], token[1] api = twitter.Api(consumer_key=TWITTER_CONSUMER_KEY, consumer_secret=TWITTER_CONSUMER_SECRET, access_token_key=key, access_token_secret=secret) timeline = api.GetUserTimeline(screen_name='realDonaldTrump', include_rts=False, trim_user=True, exclude_replies=True, count=100) tweet = timeline[random.randint(0, len(timeline))] return tweet.text
def post(message, url=None): """ ?????????? ? Twitter """ config = SocialConfig.get_solo() message = format_message(message, url=url) if not message: return import twitter twitter_api = twitter.Api( config.twitter_client_id, config.twitter_client_secret, config.twitter_access_token, config.twitter_access_token_secret, ) twitter_api.PostUpdate(message)
def __init__(self): ## Load the API keys try: stream = open("app/models/api_keys.yml", 'r') keys = yaml.load(stream) except: print('Failed to load Twitter API keys from file, falling back on environment variables') keys = { 'consumer_key': os.environ['consumer_key'], 'consumer_secret': os.environ['consumer_secret'], 'access_token_key': os.environ['access_token_key'], 'access_token_secret': os.environ['access_token_secret'], } self.api = twitter.Api(consumer_key=keys['consumer_key'], consumer_secret=keys['consumer_secret'], access_token_key=keys['access_token_key'], access_token_secret=keys['access_token_secret'], sleep_on_rate_limit=False) # NOTE: setting sleep_on_rate_limit to True here means the application will sleep when we hit the API rate limit. It will sleep until we can safely make another API call. Making this False will make the API throw a hard error when the rate limit is hit. ## Request tweets for a given movie
def testGetUserTimeline(self): '''Test the twitter.Api GetUserTimeline method''' self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json?count=1&screen_name=kesuke', curry(self._OpenTestData, 'user_timeline-kesuke.json')) statuses = self._api.GetUserTimeline(screen_name='kesuke', count=1) # This is rather arbitrary, but spot checking is better than nothing self.assertEqual(89512102, statuses[0].id) self.assertEqual(718443, statuses[0].user.id) #def testGetFriendsTimeline(self): # '''Test the twitter.Api GetFriendsTimeline method''' # self._AddHandler('https://api.twitter.com/1.1/statuses/friends_timeline/kesuke.json', # curry(self._OpenTestData, 'friends_timeline-kesuke.json')) # statuses = self._api.GetFriendsTimeline('kesuke') # # This is rather arbitrary, but spot checking is better than nothing # self.assertEqual(20, len(statuses)) # self.assertEqual(718443, statuses[0].user.id)
def get_tweets(request): datas = [] p = ttp.Parser() try: api = twitter.Api( consumer_key=settings.TWITTER_CONSUMER_KEY, consumer_secret=settings.TWITTER_CONSUMER_SECRET, access_token_key=settings.TWITTER_ACCESS_TOKEN, access_token_secret=settings.TWITTER_ACCESS_TOKEN_SECRET ) tweets = api.GetUserTimeline(screen_name='kodlaco') for tweet in tweets: datas.append({ #'text': p.parse(tweet.text).html, 'text': tweet.text, 'id_str': tweet.id_str }) except: datas = [] return HttpResponse(json.dumps(datas), content_type="application/json")
def handle(self, *args, **options): api = twitter.Api(consumer_key=settings.TWITTER_CONSUMER_KEY, consumer_secret=settings.TWITTER_CONSUMER_SECRET, access_token_key=settings.TWITTER_ACCESS_TOKEN_KEY, access_token_secret=settings.TWITTER_ACCESS_TOKEN_SECRET) for currency_symbol in settings.SOCIAL_NETWORK_SENTIMENT_CONFIG['twitter']: print(currency_symbol) results = api.GetSearch("$" + currency_symbol, count=200) for tweet in results: if SocialNetworkMention.objects.filter(network_name='twitter', network_id=tweet.id).count() == 0: snm = SocialNetworkMention.objects.create( network_name='twitter', network_id=tweet.id, network_username=tweet.user.screen_name, network_created_on=datetime.datetime.fromtimestamp(tweet.GetCreatedAtInSeconds()), text=tweet.text, symbol=currency_symbol, ) snm.set_sentiment() snm.save()
def tokenize_random_tweet(self): """ If the twitter library is installed and a twitter connection can be established, then tokenize a random tweet. """ try: import twitter except ImportError: print "Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/" from random import shuffle api = twitter.Api() tweets = api.GetPublicTimeline() if tweets: for tweet in tweets: if tweet.user.lang == 'en': return self.tokenize(tweet.text) else: raise Exception( "Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")
def tokenize_random_tweet(self): """ If the twitter library is installed and a twitter connection can be established, then tokenize a random tweet. """ try: import twitter except ImportError: print("Apologies. The random tweet functionality requires the Python twitter library: http://code.google.com/p/python-twitter/") from random import shuffle api = twitter.Api() tweets = api.GetPublicTimeline() if tweets: for tweet in tweets: if tweet.user.lang == 'en': return self.tokenize(tweet.text) else: raise Exception("Apologies. I couldn't get Twitter to give me a public English-language tweet. Perhaps try again")
def respond(consumer_key, consumer_secret, token_key, token_secret): if consumer_key is None: print 'usage: python album_genre_bot.py <consumer_key> <consumer_secret> <token_key> <token_secret>' return api = twitter.Api(consumer_key=consumer_key, consumer_secret=consumer_secret, access_token_key=token_key, access_token_secret=token_secret) original_timestamp = get_time_stamp() new_timestamp = original_timestamp for m in api.GetMentions(): dt = parse_twitter_date(m.created_at) new_timestamp = max(dt, new_timestamp) if dt > original_timestamp and m.text.startswith(twitterId): genre = predict(clean_text(m.text).lower()) try: status = api.PostUpdate( in_reply_to_status_id=m.id, status='@{} {}'.format(m.user.screen_name, genre)) print m.text, print ' ==> ' + genre except Exception as e: print e save_time_stamp(new_timestamp)
def post_to_twitter(tw_post): tw_ck, tw_cs, tw_ak, tw_as = lamvery.secret.get('tw_keys').split(',') tw_api = twitter.Api(consumer_key=tw_ck, consumer_secret=tw_cs, access_token_key=tw_ak, access_token_secret=tw_as) try: tw_api.PostUpdate(tw_post) except: pass return True
def __init__(self, consumer_key, consumer_secret, access_token, access_token_secret, user): self.client = twitter.Api( consumer_key=consumer_key, consumer_secret=consumer_secret, access_token_key=access_token, access_token_secret=access_token_secret) if TwitterFeed.algorithm_choice != AlgorithmChoice.DUMMY: self.client.VerifyCredentials() self.user = user
def connect(): api = twitter.Api(consumer_key=settings.SOCIAL_AUTH_TWITTER_KEY, \ consumer_secret=settings.SOCIAL_AUTH_TWITTER_SECRET, \ access_token_key=settings.TWITTER_ACCESS_TOKEN_KEY, \ access_token_secret=settings.TWITTER_ACCESS_TOKEN_SECRET) return api
def init_twitter(): # load twitter API tokens with open(creds) as data_file: data = json.load(data_file) api = twitter.Api(consumer_key=data["consumer_key"], consumer_secret=data["consumer_secret"], access_token_key=data["access_token_key"], access_token_secret=data["access_token_secret"]) return api
def __init__(self, path): self.__twitter_keys = misc.GetKeys(path) self.__api = twitter.Api(consumer_key=str(self.__twitter_keys['consumer_key']), consumer_secret=str(self.__twitter_keys['consumer_secret']), access_token_key=str(self.__twitter_keys['access_token']), access_token_secret=str(self.__twitter_keys['access_secret']))
def connect(self): self.api = twitter.Api( consumer_key=settings.get_secret('TWITTER_CONSUMER_KEY'), consumer_secret=settings.get_secret('TWITTER_CONSUMER_SECRET'), access_token_key=settings.get_secret('TWITTER_ACCESS_TOKEN_KEY'), access_token_secret=settings.get_secret( 'TWITTER_ACCESS_TOKEN_SECRET'), )
def __init__(self, tokens): """ :param tokens: Dictionary of all tokens [consumer_key, consumer_secret, access_token_key, access_token_secret] required to initialize the Twitter Api """ self.api = twitter.Api( consumer_key=tokens['consumer_key'], consumer_secret=tokens['consumer_secret'], access_token_key=tokens['access_token_key'], access_token_secret=tokens['access_token_secret'] )
def __init__(self, raw_query): self.twitter_api = twitter.Api(consumer_key='fd5TwZfTmZR4Jaaaaaaaaa', consumer_secret='8aFf6eGf6kKEfrIQjsiQbYyveawaaaaaaaaa', access_token_key='14698049-rtGaaaaaaaaa', access_token_secret='Z2qkurxKaaaaaaaaa') self.raw_query = raw_query
def __init__(self, raw_query): self.twitter_api = twitter.Api(consumer_key='your own key', consumer_secret= 'your own key', access_token_key='your own key', access_token_secret='your own key') self.raw_query = raw_query # capture the tweets: # see http://python-twitter.readthedocs.io/en/latest/twitter.html
def __init__(self, stream=True): self.auth = helpers.twitter_auth() self.api = twitter.Api(*self.auth) self.stream = stream
def _get_twitter_client(self, name): """Get a twitter client.""" if name not in self._cached_twitter: conf = self.config['twitter'][name] self._cached_twitter[name] = twitter.Api( consumer_key=conf['consumer_key'], consumer_secret=conf['consumer_secret'], access_token_key=conf['access_token_key'], access_token_secret=conf['access_token_secret'] ) return self._cached_twitter[name]
def TwitterPost(video_info): # App python-tweeter # https://dev.twitter.com/apps/815176 tw = twitter.Api( consumer_key = cons_key, consumer_secret = cons_sec, access_token_key = acc_key, access_token_secret = acc_sec ) title = video_info.keys()[0] print "Title: %s" % title image = video_info[title]['thumb'] print "Image: %s" % image url = video_info[title]['url'] print "Video: %s" % url fd, file_image = tempfile.mkstemp() GetImage(image, fd) msg = "Watch again: %s\n%s #pyconse" % (title, url) print "Posting: %s" % msg print "Image: %s" % file_image try: tw.PostMedia(status = msg,media = file_image) os.unlink(file_image) except twitter.TwitterError as err: # twitter.TwitterError: [{u'message': u'Status is a duplicate.', u'code': 187}] print "ERROR: %s" % err.args[0][0]['message'] print "Post on Twitter failed. Trying again..." if os.path.exists(file_image): os.unlink(file_image) video = GetVideo() TwitterPost(video) return try: msg = "Don't forget to send you talk proposal for #pyconse (September 6th in Stockholm). key=%d " % random.randint(1000,5000) + \ "https://goo.gl/RjHpoS" print "Posting: %s" % msg tw.PostUpdate(msg) except twitter.TwitterError as err: error = err.args[0][0] print "ERROR: %s" % error['message']
def analyze(username): twitter_consumer_key = 'NXXzhmDN1kcu3N7YPxf8y7qvp' twitter_consumer_secret = 'vViSlyPlHkueZCrEPTnQJ5PP8ET8iVHNmFh6G4RRSkcpDiCFPT' twitter_access_token = '3260949332-GsQhU60z94XZA012BbcgsJmT9EOFZEfbVW6zboW' twitter_access_secret = 'NjxFN3dpdvmE4iZ0ci2wV5wfdeEqnmmMJssczSJv61fpf' twitter_api = twitter.Api(consumer_key=twitter_consumer_key, consumer_secret=twitter_consumer_secret, access_token_key=twitter_access_token, access_token_secret=twitter_access_secret) statuses = twitter_api.GetUserTimeline(screen_name=handle, count=200, include_rts=False) text = "" # Store the retrieved info for status in statuses: if (status.lang =='en'): # English tweets text += status.text.encode('utf-8') # The IBM Bluemix credentials for Personality Insights! pi_username = '91afa133-d7a9-469f-95e9-a38cb2c500f4' pi_password = 'EUiG6T4wGG7j' personality_insights = PersonalityInsights(username=pi_username, password=pi_password) result = personality_insights.profile(text) return result
def FetchTwitter(user, output): assert user statuses = twitter.Api().GetUserTimeline(id=user, count=1) s = statuses[0] xhtml = TEMPLATE % (s.user.screen_name, s.text, s.user.screen_name, s.id, s.relative_created_at) if output: Save(xhtml, output) else: print xhtml
def setUp(self): self._urllib = MockUrllib() api = twitter.Api(consumer_key='CONSUMER_KEY', consumer_secret='CONSUMER_SECRET', access_token_key='OAUTH_TOKEN', access_token_secret='OAUTH_SECRET', cache=None) api.SetUrllib(self._urllib) self._api = api
def testGetStatus(self): '''Test the twitter.Api GetStatus method''' self._AddHandler('https://api.twitter.com/1.1/statuses/show.json?include_my_retweet=1&id=89512102', curry(self._OpenTestData, 'show-89512102.json')) status = self._api.GetStatus(89512102) self.assertEqual(89512102, status.id) self.assertEqual(718443, status.user.id)
def testPostUpdate(self): '''Test the twitter.Api PostUpdate method''' self._AddHandler('https://api.twitter.com/1.1/statuses/update.json', curry(self._OpenTestData, 'update.json')) status = self._api.PostUpdate(u'??? ????? ?? ????????? ??????? ????? ?????'.encode('utf8')) # This is rather arbitrary, but spot checking is better than nothing self.assertEqual(u'??? ????? ?? ????????? ??????? ????? ?????', status.text)
def testPostRetweet(self): '''Test the twitter.Api PostRetweet method''' self._AddHandler('https://api.twitter.com/1.1/statuses/retweet/89512102.json', curry(self._OpenTestData, 'retweet.json')) status = self._api.PostRetweet(89512102) self.assertEqual(89512102, status.id)
def testPostUpdateLatLon(self): '''Test the twitter.Api PostUpdate method, when used in conjunction with latitude and longitude''' self._AddHandler('https://api.twitter.com/1.1/statuses/update.json', curry(self._OpenTestData, 'update_latlong.json')) #test another update with geo parameters, again test somewhat arbitrary status = self._api.PostUpdate(u'??? ????? ?? ????????? ??????? ????? ?????'.encode('utf8'), latitude=54.2, longitude=-2) self.assertEqual(u'??? ????? ?? ????????? ??????? ????? ?????', status.text) self.assertEqual(u'Point',status.GetGeo()['type']) self.assertEqual(26.2,status.GetGeo()['coordinates'][0]) self.assertEqual(127.5,status.GetGeo()['coordinates'][1])
def testGetReplies(self): '''Test the twitter.Api GetReplies method''' self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json', curry(self._OpenTestData, 'replies.json')) statuses = self._api.GetReplies() self.assertEqual(36657062, statuses[0].id)
def testGetFriends(self): '''Test the twitter.Api GetFriends method''' self._AddHandler('https://api.twitter.com/1.1/friends/list.json?cursor=123', curry(self._OpenTestData, 'friends.json')) users = self._api.GetFriends(cursor=123) buzz = [u.status for u in users if u.screen_name == 'buzz'] self.assertEqual(89543882, buzz[0].id)
def testGetDirectMessages(self): '''Test the twitter.Api GetDirectMessages method''' self._AddHandler('https://api.twitter.com/1.1/direct_messages.json', curry(self._OpenTestData, 'direct_messages.json')) statuses = self._api.GetDirectMessages() self.assertEqual(u'A légpárnás hajóm tele van angolnákkal.', statuses[0].text)
def testPostDirectMessage(self): '''Test the twitter.Api PostDirectMessage method''' self._AddHandler('https://api.twitter.com/1.1/direct_messages/new.json', curry(self._OpenTestData, 'direct_messages-new.json')) status = self._api.PostDirectMessage('test', u'??? ????? ?? ????????? ??????? ????? ?????'.encode('utf8')) # This is rather arbitrary, but spot checking is better than nothing self.assertEqual(u'??? ????? ?? ????????? ??????? ????? ?????', status.text)
def testDestroyDirectMessage(self): '''Test the twitter.Api DestroyDirectMessage method''' self._AddHandler('https://api.twitter.com/1.1/direct_messages/destroy.json', curry(self._OpenTestData, 'direct_message-destroy.json')) status = self._api.DestroyDirectMessage(3496342) # This is rather arbitrary, but spot checking is better than nothing self.assertEqual(673483, status.sender_id)
def testCreateFriendship(self): '''Test the twitter.Api CreateFriendship method''' self._AddHandler('https://api.twitter.com/1.1/friendships/create.json', curry(self._OpenTestData, 'friendship-create.json')) user = self._api.CreateFriendship('dewitt') # This is rather arbitrary, but spot checking is better than nothing self.assertEqual(673483, user.id)
def testDestroyFriendship(self): '''Test the twitter.Api DestroyFriendship method''' self._AddHandler('https://api.twitter.com/1.1/friendships/destroy.json', curry(self._OpenTestData, 'friendship-destroy.json')) user = self._api.DestroyFriendship('dewitt') # This is rather arbitrary, but spot checking is better than nothing self.assertEqual(673483, user.id)
def api_from_config(config): api = twitter.Api( consumer_key=config.get('twitter_api', 'consumer_key'), consumer_secret=config.get('twitter_api', 'consumer_secret'), access_token_key=config.get('twitter_api', 'access_token_key'), access_token_secret=config.get('twitter_api', 'access_token_secret'), tweet_mode='extended') return api
def getAPI(): api = twitter.Api( consumer_key = '', consumer_secret = '', access_token_key = '', access_token_secret = '' ) return api
def __init__(self, credentials): err_msg = '' exception = None self._logger = logging.getLogger(__name__) try: self._twitter_api = twitter.Api( consumer_key=credentials['consumer_key'], consumer_secret=credentials['consumer_secret'], access_token_key=credentials['access_token_key'], access_token_secret=credentials['access_token_secret'] ) except twitter.TwitterError as exc: err_msg = 'The following error: %s, occurred while connecting ' \ 'to the twitter API.' % exc.message exception = exc except KeyError as exc: err_msg = 'Malformed credentials dictionary: %s.' % exc.message exception = exc except Exception as exc: err_msg = 'An unhandled error: %s, occurred while connecting ' \ 'to the twitter API.' % exc exception = exc if err_msg: logging.getLogger(__name__).log(logging.ERROR, err_msg) raise exception
def getTwitterApi(root_path): global gloabl_api if gloabl_api is None: import twitter config = configparser.ConfigParser() config.read(root_path + "/" + "config.ini") api_key = config.get('Twitter', 'KEY') api_secret = config.get('Twitter', 'SECRET') access_token_key = config.get('Twitter', 'TOKEN_KEY') access_token_secret = config.get('Twitter', 'TOKEN_SECRET') http = config.get('Proxy', 'HTTP') https = config.get('Proxy', 'HTTPS') proxies = {'http': http, 'https': https} gloabl_api = twitter.Api(api_key, api_secret, access_token_key, access_token_secret, timeout = 15, proxies=proxies) return gloabl_api
def __init__(self, storage, **kwargs): super(TwitterTrainer, self).__init__(storage, **kwargs) from twitter import Api as TwitterApi # The word to be used as the first search term when searching for tweets self.random_seed_word = kwargs.get('random_seed_word', 'random') self.api = TwitterApi( consumer_key=kwargs.get('twitter_consumer_key'), consumer_secret=kwargs.get('twitter_consumer_secret'), access_token_key=kwargs.get('twitter_access_token_key'), access_token_secret=kwargs.get('twitter_access_token_secret') )
def __init__(self, config): auth_dict = config["auth"] self.api = twitter.Api( consumer_key=auth_dict["api_key"], consumer_secret=auth_dict["api_secret"], access_token_key=auth_dict["token"], access_token_secret=auth_dict["token_secret"], tweet_mode="extended", ) self.screen_name = config["user"]["screen_name"]
def __init__(self): self.client = twitter.Api(consumer_key=twitter_consumer_key, consumer_secret=twitter_consumer_secret, access_token_key=twitter_access_token, access_token_secret=twitter_access_secret) # setup
def get_api(consumer_key, consumer_secret, access_key, access_secret): api = twitter.Api(consumer_key=consumer_key, consumer_secret=consumer_secret, access_token_key=access_key, access_token_secret=access_secret, sleep_on_rate_limit=True) return api
def get_tweets(user_id): api = twitter.Api(consumer_key=c_key, consumer_secret=c_secret, access_token_key=a_key, access_token_secret=a_secret) # print api.VerifyCredentials() l = [] t = api.GetUserTimeline(user_id=user_id, count=25) # print t tweets = [i.AsDict() for i in t] for i in tweets : l.append(i['text']) print l
def get_posts(screenname): api = twitter.Api(consumer_key='ikJIFDBN0h1LZomIkCRKbQXUE', consumer_secret='NS411RIiYBO2Gzj67pqcgFnD0AY3mPV5hWjkujCklZjeLurwDa', access_token_key='280279077-y9OmVTnveqglTJTy2ND2uPCy9wLWj9PnHDA0Kl0j', access_token_secret='HYW79wDKilDY44QX5mGupWx0001V06jz8mWCLt9aYihXe') #TEMPORARILY SMALL FOR TESTING posts = api.GetUserTimeline(screen_name=screenname, count=15, exclude_replies=True) return posts
def tweet(self): if can_tweet(): account = twitter.Api( username=settings.TWITTER_USERNAME, password=settings.TWITTER_PASSWORD, ) account.PostUpdate(self.as_tweet()) else: raise ImproperlyConfigured( "Unable to send tweet due to either " "missing python-twitter or required settings." )
def __init__(self, **kwargs): super(TwitterAdapter, self).__init__(**kwargs) self.api = twitter.Api( consumer_key=kwargs["twitter_consumer_key"], consumer_secret=kwargs["twitter_consumer_secret"], access_token_key=kwargs["twitter_access_token_key"], access_token_secret=kwargs["twitter_access_token_secret"] )