我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用twitter.TwitterError()。
def refresh_posts(posts): if not posts: return posts t = get_twitter_for_acc(posts[0].author) if not t: return try: tweets = t.statuses.lookup( _id=",".join((post.twitter_id for post in posts)), trim_user=True, tweet_mode='extended') except (URLError, TwitterError) as e: handle_error(e) refreshed_posts = list() for post in posts: tweet = next( (tweet for tweet in tweets if tweet['id_str'] == post.twitter_id), None) if not tweet: db.session.delete(post) else: post = db.session.merge(post_from_api_tweet_object(tweet)) refreshed_posts.append(post) return refreshed_posts
def test_follow_congresspeople(self, logging_mock): profiles = pd.DataFrame([ ['DepEduardoCunha', 'DepEduardoCunha2'], ['DepRodrigomaia', None], [None, None] ], columns=['twitter_profile', 'secondary_twitter_profile']) self.subject._profiles = profiles calls = [ mock.call.CreateFriendship(screen_name='DepEduardoCunha'), mock.call.CreateFriendship(screen_name='DepEduardoCunha2'), mock.call.CreateFriendship(screen_name='DepRodrigomaia'), ] self.subject.follow_congresspeople() self.api.assert_has_calls(calls, any_order=True) self.assertEqual(3, self.api.CreateFriendship.call_count) self.api.CreateFriendship.side_effect = TwitterError('Not found') self.subject.follow_congresspeople() logging_mock.warning.assert_called() self.assertEqual(3, logging_mock.warning.call_count)
def enf_type(field, _type, val): """ Checks to see if a given val for a field (i.e., the name of the field) is of the proper _type. If it is not, raises a TwitterError with a brief explanation. Args: field: Name of the field you are checking. _type: Type that the value should be returned as. val: Value to convert to _type. Returns: val converted to type _type. """ try: return _type(val) except ValueError: raise TwitterError({ 'message': '"{0}" must be type {1}'.format(field, _type.__name__) })
def twitter_login_step1(): try: return redirect(libforget.twitter.get_login_url( callback=url_for('twitter_login_step2', _external=True), **app.config.get_namespace("TWITTER_") )) except (TwitterError, URLError): if sentry: sentry.captureException() return redirect( url_for('about', twitter_login_error='', _anchor='log_in'))
def autopost_twitter(self): posts = FeedPost.objects.filter(for_network=conf.NETWORK_TWITTER)[:MAX_POSTS_PER_CALL] for post in posts: try: api.twitter.post(post.text, url=post.url) except twitter.TwitterError as e: logger.error("Twitter Autopost: error on #{0.pk}: {1.args}".format(post, e)) else: logger.info("Twitter Autopost: posted #{0.pk} ('{0}')".format(post)) post.scheduled = False post.posted = now() post.save()
def tweet_incident(sender, instance, created=False, **kwargs): if created: try: instance.tweet() except TwitterError as exc: print 'Exception while tweeting incident: {}'.format(str(exc))
def follow_congresspeople(self): """ Friend all congresspeople accounts on Twitter. """ profiles = pd.concat([self.profiles()['twitter_profile'], self.profiles()['secondary_twitter_profile']]) for profile in profiles[profiles.notnull()].values: try: self.api.CreateFriendship(screen_name=profile) except twitter.TwitterError: logging.warning('{} profile not found'.format(profile))
def TwitterPost(video_info): # App python-tweeter # https://dev.twitter.com/apps/815176 tw = twitter.Api( consumer_key = cons_key, consumer_secret = cons_sec, access_token_key = acc_key, access_token_secret = acc_sec ) title = video_info.keys()[0] print "Title: %s" % title image = video_info[title]['thumb'] print "Image: %s" % image url = video_info[title]['url'] print "Video: %s" % url fd, file_image = tempfile.mkstemp() GetImage(image, fd) msg = "Watch again: %s\n%s #pyconse" % (title, url) print "Posting: %s" % msg print "Image: %s" % file_image try: tw.PostMedia(status = msg,media = file_image) os.unlink(file_image) except twitter.TwitterError as err: # twitter.TwitterError: [{u'message': u'Status is a duplicate.', u'code': 187}] print "ERROR: %s" % err.args[0][0]['message'] print "Post on Twitter failed. Trying again..." if os.path.exists(file_image): os.unlink(file_image) video = GetVideo() TwitterPost(video) return try: msg = "Don't forget to send you talk proposal for #pyconse (September 6th in Stockholm). key=%d " % random.randint(1000,5000) + \ "https://goo.gl/RjHpoS" print "Posting: %s" % msg tw.PostUpdate(msg) except twitter.TwitterError as err: error = err.args[0][0] print "ERROR: %s" % error['message']
def SendingList(api, tag, list_name): """ Sending message """ global FFS, CONTROLE, counter, dryrun print "Running for %s: %d" % (tag, len( FFS[list_name] )) MSG = tag + " " for name in FFS[list_name]: if CONTROLE.has_key(name): continue else: CONTROLE[name] = 1 print counter, name name = '@' + name if ( len(MSG + " " + name) > SIZE): print MSG if not dryrun: api.PostUpdate(MSG) sleep(randrange(1,30) * 60) MSG = tag + " " + name else: MSG += " " + name counter += 1 print MSG if not dryrun: try: if not re.search("@", MSG): # empty return api.PostUpdate(MSG) except twitter.TwitterError as e: if ( re.search('Status is a duplicate.', e.message) ): pass sleep(randrange(1,30) * 60)
def testTwitterError(self): '''Test that twitter responses containing an error message are wrapped.''' self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json', curry(self._OpenTestData, 'public_timeline_error.json')) # Manually try/catch so we can check the exception's value try: statuses = self._api.GetUserTimeline() except twitter.TwitterError, error: # If the error message matches, the test passes self.assertEqual('test error', error.message) else: self.fail('TwitterError expected')
def __init__(self, credentials): err_msg = '' exception = None self._logger = logging.getLogger(__name__) try: self._twitter_api = twitter.Api( consumer_key=credentials['consumer_key'], consumer_secret=credentials['consumer_secret'], access_token_key=credentials['access_token_key'], access_token_secret=credentials['access_token_secret'] ) except twitter.TwitterError as exc: err_msg = 'The following error: %s, occurred while connecting ' \ 'to the twitter API.' % exc.message exception = exc except KeyError as exc: err_msg = 'Malformed credentials dictionary: %s.' % exc.message exception = exc except Exception as exc: err_msg = 'An unhandled error: %s, occurred while connecting ' \ 'to the twitter API.' % exc exception = exc if err_msg: logging.getLogger(__name__).log(logging.ERROR, err_msg) raise exception
def _push_job_offers_to_twitter(self, num_tweets_to_push): # Do not push every job offer at once. The Twitter API won't allow it. # We thus push them num_offers_to_push at a time. if num_tweets_to_push > self.MAX_TWEETS_TO_PUSH: err_msg = 'Cannot push %s tweets at once, pushing %s tweets ' \ 'instead.' % (num_tweets_to_push, self.MAX_TWEETS_TO_PUSH) self._logging(logging.WARNING, err_msg) num_tweets_to_push = self.MAX_TWEETS_TO_PUSH self._logging(logging.INFO, 'Acquiring unpublished job offers.') to_push = JobAlchemy.get_not_pushed_on_twitter(num_tweets_to_push) for job_offer in to_push: tweet = self._format_tweet(job_offer.id, job_offer.title) try: self._logging(logging.INFO, 'Publishing to Twitter.') self._twitter_api.PostUpdate(tweet) except twitter.TwitterError as exc: err_msg = '[Job offer id: %s] The following error: %s, ' \ 'occurred while pushing the following tweet: %s.' \ % (job_offer.id, exc.message, tweet) self._logging(logging.WARNING, err_msg) except Exception as exc: err_msg = '[Job offer id: %s] An unhandled error: %s, ' \ 'occurred while pushing the following tweet: %s.' \ % (job_offer.id, exc, tweet) self._logging(logging.ERROR, err_msg) else: # The tweet has been pushed successfully. Mark the job offer as # pushed on Twitter in the Postgresql database, so we don't push # it again on Twitter later on. self._logging(logging.INFO, 'Marking as published on Twitter.') JobAlchemy.set_pushed_on_twitter(job_offer.id, True)
def get_statements(self): """ Returns list of random statements from the API. """ from twitter import TwitterError statements = [] # Generate a random word random_word = self.random_word(self.random_seed_word) self.logger.info(u'Requesting 50 random tweets containing the word {}'.format(random_word)) tweets = self.api.GetSearch(term=random_word, count=50) for tweet in tweets: statement = Statement(tweet.text) if tweet.in_reply_to_status_id: try: status = self.api.GetStatus(tweet.in_reply_to_status_id) statement.add_response(Response(status.text)) statements.append(statement) except TwitterError as error: self.logger.warning(str(error)) self.logger.info('Adding {} tweets with responses'.format(len(statements))) return statements
def fetch_acc(account, cursor): t = get_twitter_for_acc(account) if not t: print("no twitter access, aborting") return try: user = t.account.verify_credentials() db.session.merge(account_from_api_user_object(user)) kwargs = { 'user_id': account.twitter_id, 'count': 200, 'trim_user': True, 'tweet_mode': 'extended', } if cursor: kwargs.update(cursor) if 'max_id' not in kwargs: most_recent_post = ( Post.query.order_by(db.desc(Post.created_at)) .filter(Post.author_id == account.id).first()) if most_recent_post: kwargs['since_id'] = most_recent_post.twitter_id tweets = t.statuses.user_timeline(**kwargs) except (TwitterError, URLError) as e: handle_error(e) print("processing {} tweets for {acc}".format(len(tweets), acc=account)) if len(tweets) > 0: kwargs['max_id'] = +inf for tweet in tweets: db.session.merge(post_from_api_tweet_object(tweet)) kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id']) else: kwargs = None db.session.commit() return kwargs
def parse_media_file(passed_media): """ Parses a media file and attempts to return a file-like object and information about the media file. Args: passed_media: media file which to parse. Returns: file-like object, the filename of the media file, the file size, and the type of media. """ img_formats = ['image/jpeg', 'image/png', 'image/gif', 'image/bmp', 'image/webp'] video_formats = ['video/mp4', 'video/quicktime'] # If passed_media is a string, check if it points to a URL, otherwise, # it should point to local file. Create a reference to a file obj for # each case such that data_file ends up with a read() method. if not hasattr(passed_media, 'read'): if passed_media.startswith('http'): data_file = http_to_file(passed_media) filename = os.path.basename(passed_media) else: data_file = open(os.path.realpath(passed_media), 'rb') filename = os.path.basename(passed_media) # Otherwise, if a file object was passed in the first place, # create the standard reference to media_file (i.e., rename it to fp). else: if passed_media.mode not in ['rb', 'rb+', 'w+b']: raise TwitterError('File mode must be "rb" or "rb+"') filename = os.path.basename(passed_media.name) data_file = passed_media data_file.seek(0, 2) file_size = data_file.tell() try: data_file.seek(0) except Exception as e: pass media_type = mimetypes.guess_type(os.path.basename(filename))[0] if media_type is not None: if media_type in img_formats and file_size > 5 * 1048576: raise TwitterError({'message': 'Images must be less than 5MB.'}) elif media_type in video_formats and file_size > 15 * 1048576: raise TwitterError({'message': 'Videos must be less than 15MB.'}) elif media_type not in img_formats and media_type not in video_formats: raise TwitterError({'message': 'Media type could not be determined.'}) return data_file, filename, file_size, media_type
def get_tweets(tweet_ids, consumer_key, consumer_secret, access_token, access_token_secret, nlp): """ Expands tweets from Twitter :param tweet_ids: the list of tweet IDs to expand :return: a dictionary of tweet ID to tweet text """ # Save tweets in a temporary file, in case the script stops working and re-starts tweets = {} if os.path.exists('tweet_temp'): with codecs.open('tweet_temp', 'r', 'utf-8') as f_in: lines = [tuple(line.strip().split('\t')) for line in f_in] tweets = { tweet_id : tweet for (tweet_id, tweet) in lines } api = twitter.Api(consumer_key=consumer_key, consumer_secret=consumer_secret, access_token_key=access_token, access_token_secret=access_token_secret) [sleeptime, resettime] = reset_sleep_time(api) with codecs.open('tweet_temp', 'a', 'utf-8') as f_out: for tweet_id in tweet_ids: # We didn't download this tweet yet if not tweet_id in tweets: try: curr_tweet = api.GetStatus(tweet_id, include_entities=False) tweets[tweet_id] = clean_tweet(' '.join([t.lower_ for t in nlp(curr_tweet.text)])) except twitter.TwitterError as err: error = str(err) # If the rate limit exceeded, this script should be stopped and resumed the next day if 'Rate limit exceeded' in error: raise # Other error - the tweet is not available :( print 'Error reading tweet id:', tweet_id, ':', error tweets[tweet_id] = 'TWEET IS NOT AVAILABLE' print >> f_out, '\t'.join((tweet_id, tweets[tweet_id])) time.sleep(sleeptime) if time.time() >= resettime: [sleeptime, resettime] = reset_sleep_time(api) return tweets