Python twitter 模块,TwitterError() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用twitter.TwitterError()

项目:forget    作者:codl    | 项目源码 | 文件源码
def refresh_posts(posts):
    if not posts:
        return posts

    t = get_twitter_for_acc(posts[0].author)
    if not t:
        return
    try:
        tweets = t.statuses.lookup(
                    _id=",".join((post.twitter_id for post in posts)),
                    trim_user=True, tweet_mode='extended')
    except (URLError, TwitterError) as e:
        handle_error(e)
    refreshed_posts = list()
    for post in posts:
        tweet = next(
            (tweet for tweet in tweets if tweet['id_str'] == post.twitter_id),
            None)
        if not tweet:
            db.session.delete(post)
        else:
            post = db.session.merge(post_from_api_tweet_object(tweet))
            refreshed_posts.append(post)

    return refreshed_posts
项目:whistleblower    作者:datasciencebr    | 项目源码 | 文件源码
def test_follow_congresspeople(self, logging_mock):
        profiles = pd.DataFrame([
            ['DepEduardoCunha', 'DepEduardoCunha2'],
            ['DepRodrigomaia', None],
            [None, None]
        ], columns=['twitter_profile', 'secondary_twitter_profile'])
        self.subject._profiles = profiles
        calls = [
            mock.call.CreateFriendship(screen_name='DepEduardoCunha'),
            mock.call.CreateFriendship(screen_name='DepEduardoCunha2'),
            mock.call.CreateFriendship(screen_name='DepRodrigomaia'),
        ]
        self.subject.follow_congresspeople()
        self.api.assert_has_calls(calls, any_order=True)
        self.assertEqual(3, self.api.CreateFriendship.call_count)
        self.api.CreateFriendship.side_effect = TwitterError('Not found')
        self.subject.follow_congresspeople()
        logging_mock.warning.assert_called()
        self.assertEqual(3, logging_mock.warning.call_count)
项目:StockRecommendSystem    作者:doncat99    | 项目源码 | 文件源码
def enf_type(field, _type, val):
    """ Checks to see if a given val for a field (i.e., the name of the field)
    is of the proper _type. If it is not, raises a TwitterError with a brief
    explanation.

    Args:
        field:
            Name of the field you are checking.
        _type:
            Type that the value should be returned as.
        val:
            Value to convert to _type.

    Returns:
        val converted to type _type.

    """
    try:
        return _type(val)
    except ValueError:
        raise TwitterError({
            'message': '"{0}" must be type {1}'.format(field, _type.__name__)
        })
项目:forget    作者:codl    | 项目源码 | 文件源码
def twitter_login_step1():
    try:
        return redirect(libforget.twitter.get_login_url(
            callback=url_for('twitter_login_step2', _external=True),
            **app.config.get_namespace("TWITTER_")
            ))
    except (TwitterError, URLError):
        if sentry:
            sentry.captureException()
        return redirect(
                url_for('about', twitter_login_error='', _anchor='log_in'))
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def autopost_twitter(self):
        posts = FeedPost.objects.filter(for_network=conf.NETWORK_TWITTER)[:MAX_POSTS_PER_CALL]
        for post in posts:
            try:
                api.twitter.post(post.text, url=post.url)
            except twitter.TwitterError as e:
                logger.error("Twitter Autopost: error on #{0.pk}: {1.args}".format(post, e))
            else:
                logger.info("Twitter Autopost: posted #{0.pk} ('{0}')".format(post))
                post.scheduled = False
                post.posted = now()
                post.save()
项目:bart-crime    作者:ben174    | 项目源码 | 文件源码
def tweet_incident(sender, instance, created=False, **kwargs):
    if created:
        try:
            instance.tweet()
        except TwitterError as exc:
            print 'Exception while tweeting incident: {}'.format(str(exc))
项目:whistleblower    作者:datasciencebr    | 项目源码 | 文件源码
def follow_congresspeople(self):
        """
        Friend all congresspeople accounts on Twitter.
        """
        profiles = pd.concat([self.profiles()['twitter_profile'],
                              self.profiles()['secondary_twitter_profile']])
        for profile in profiles[profiles.notnull()].values:
            try:
                self.api.CreateFriendship(screen_name=profile)
            except twitter.TwitterError:
                logging.warning('{} profile not found'.format(profile))
项目:homemadescripts    作者:helioloureiro    | 项目源码 | 文件源码
def TwitterPost(video_info):
    # App python-tweeter
    # https://dev.twitter.com/apps/815176
    tw = twitter.Api(
        consumer_key = cons_key,
        consumer_secret = cons_sec,
        access_token_key = acc_key,
        access_token_secret = acc_sec
    )
    title = video_info.keys()[0]
    print "Title: %s" % title
    image = video_info[title]['thumb']
    print "Image: %s" % image
    url = video_info[title]['url']
    print "Video: %s" % url
    fd, file_image = tempfile.mkstemp()
    GetImage(image, fd)
    msg = "Watch again: %s\n%s #pyconse" % (title, url)
    print "Posting: %s" % msg
    print "Image: %s" % file_image
    try:
        tw.PostMedia(status = msg,media = file_image)
        os.unlink(file_image)
    except twitter.TwitterError as err:
        # twitter.TwitterError: [{u'message': u'Status is a duplicate.', u'code': 187}]
        print "ERROR: %s" % err.args[0][0]['message']
        print "Post on Twitter failed. Trying again..."
        if os.path.exists(file_image):
            os.unlink(file_image)
        video = GetVideo()
        TwitterPost(video)
        return
    try:
        msg = "Don't forget to send you talk proposal for #pyconse (September 6th in Stockholm). key=%d " % random.randint(1000,5000) + \
            "https://goo.gl/RjHpoS"
        print "Posting: %s" % msg
        tw.PostUpdate(msg)
    except twitter.TwitterError as err:
        error = err.args[0][0]
        print "ERROR: %s" % error['message']
项目:homemadescripts    作者:helioloureiro    | 项目源码 | 文件源码
def SendingList(api, tag, list_name):
    """
    Sending message
    """
    global FFS, CONTROLE, counter, dryrun

    print "Running for %s: %d" % (tag, len( FFS[list_name] ))
    MSG = tag + " "
    for name in FFS[list_name]:
        if CONTROLE.has_key(name):
            continue
        else:
            CONTROLE[name] = 1
        print counter, name
        name = '@' + name
        if ( len(MSG + " " + name) > SIZE):
            print MSG
            if not dryrun:
                api.PostUpdate(MSG)
                sleep(randrange(1,30) * 60)
            MSG = tag + " " + name
        else:
            MSG += " " + name
        counter += 1
    print MSG
    if not dryrun:
        try:
            if not re.search("@", MSG):
                # empty
                return
            api.PostUpdate(MSG)
        except twitter.TwitterError as e:
            if ( re.search('Status is a duplicate.', e.message) ):
                pass
        sleep(randrange(1,30) * 60)
项目:gif-disco    作者:futurice    | 项目源码 | 文件源码
def testTwitterError(self):
    '''Test that twitter responses containing an error message are wrapped.'''
    self._AddHandler('https://api.twitter.com/1.1/statuses/user_timeline.json',
                     curry(self._OpenTestData, 'public_timeline_error.json'))
    # Manually try/catch so we can check the exception's value
    try:
      statuses = self._api.GetUserTimeline()
    except twitter.TwitterError, error:
      # If the error message matches, the test passes
      self.assertEqual('test error', error.message)
    else:
      self.fail('TwitterError expected')
项目:web    作者:pyjobs    | 项目源码 | 文件源码
def __init__(self, credentials):
        err_msg = ''
        exception = None

        self._logger = logging.getLogger(__name__)

        try:
            self._twitter_api = twitter.Api(
                consumer_key=credentials['consumer_key'],
                consumer_secret=credentials['consumer_secret'],
                access_token_key=credentials['access_token_key'],
                access_token_secret=credentials['access_token_secret']
            )
        except twitter.TwitterError as exc:
            err_msg = 'The following error: %s, occurred while connecting ' \
                      'to the twitter API.' % exc.message
            exception = exc
        except KeyError as exc:
            err_msg = 'Malformed credentials dictionary: %s.' % exc.message
            exception = exc
        except Exception as exc:
            err_msg = 'An unhandled error: %s, occurred while connecting ' \
                      'to the twitter API.' % exc
            exception = exc

        if err_msg:
            logging.getLogger(__name__).log(logging.ERROR, err_msg)
            raise exception
项目:web    作者:pyjobs    | 项目源码 | 文件源码
def _push_job_offers_to_twitter(self, num_tweets_to_push):
        # Do not push every job offer at once. The Twitter API won't allow it.
        # We thus push them num_offers_to_push at a time.
        if num_tweets_to_push > self.MAX_TWEETS_TO_PUSH:
            err_msg = 'Cannot push %s tweets at once, pushing %s tweets ' \
                      'instead.' % (num_tweets_to_push, self.MAX_TWEETS_TO_PUSH)
            self._logging(logging.WARNING, err_msg)

            num_tweets_to_push = self.MAX_TWEETS_TO_PUSH

        self._logging(logging.INFO, 'Acquiring unpublished job offers.')
        to_push = JobAlchemy.get_not_pushed_on_twitter(num_tweets_to_push)

        for job_offer in to_push:
            tweet = self._format_tweet(job_offer.id, job_offer.title)

            try:
                self._logging(logging.INFO, 'Publishing to Twitter.')
                self._twitter_api.PostUpdate(tweet)
            except twitter.TwitterError as exc:
                err_msg = '[Job offer id: %s] The following error: %s, ' \
                          'occurred while pushing the following tweet: %s.' \
                          % (job_offer.id, exc.message, tweet)
                self._logging(logging.WARNING, err_msg)
            except Exception as exc:
                err_msg = '[Job offer id: %s] An unhandled error: %s, ' \
                          'occurred while pushing the following tweet: %s.' \
                          % (job_offer.id, exc, tweet)
                self._logging(logging.ERROR, err_msg)
            else:
                # The tweet has been pushed successfully. Mark the job offer as
                # pushed on Twitter in the Postgresql database, so we don't push
                # it again on Twitter later on.
                self._logging(logging.INFO, 'Marking as published on Twitter.')
                JobAlchemy.set_pushed_on_twitter(job_offer.id, True)
项目:Tutorial-Chatterbot    作者:isipalma    | 项目源码 | 文件源码
def get_statements(self):
        """
        Returns list of random statements from the API.
        """
        from twitter import TwitterError
        statements = []

        # Generate a random word
        random_word = self.random_word(self.random_seed_word)

        self.logger.info(u'Requesting 50 random tweets containing the word {}'.format(random_word))
        tweets = self.api.GetSearch(term=random_word, count=50)
        for tweet in tweets:
            statement = Statement(tweet.text)

            if tweet.in_reply_to_status_id:
                try:
                    status = self.api.GetStatus(tweet.in_reply_to_status_id)
                    statement.add_response(Response(status.text))
                    statements.append(statement)
                except TwitterError as error:
                    self.logger.warning(str(error))

        self.logger.info('Adding {} tweets with responses'.format(len(statements)))

        return statements
项目:forget    作者:codl    | 项目源码 | 文件源码
def fetch_acc(account, cursor):
    t = get_twitter_for_acc(account)
    if not t:
        print("no twitter access, aborting")
        return

    try:
        user = t.account.verify_credentials()
        db.session.merge(account_from_api_user_object(user))

        kwargs = {
                'user_id': account.twitter_id,
                'count': 200,
                'trim_user': True,
                'tweet_mode': 'extended',
                }
        if cursor:
            kwargs.update(cursor)

        if 'max_id' not in kwargs:
            most_recent_post = (
                    Post.query.order_by(db.desc(Post.created_at))
                    .filter(Post.author_id == account.id).first())
            if most_recent_post:
                kwargs['since_id'] = most_recent_post.twitter_id

        tweets = t.statuses.user_timeline(**kwargs)
    except (TwitterError, URLError) as e:
        handle_error(e)

    print("processing {} tweets for {acc}".format(len(tweets), acc=account))

    if len(tweets) > 0:

        kwargs['max_id'] = +inf

        for tweet in tweets:
            db.session.merge(post_from_api_tweet_object(tweet))
            kwargs['max_id'] = min(tweet['id'] - 1, kwargs['max_id'])

    else:
        kwargs = None

    db.session.commit()

    return kwargs
项目:StockRecommendSystem    作者:doncat99    | 项目源码 | 文件源码
def parse_media_file(passed_media):
    """ Parses a media file and attempts to return a file-like object and
    information about the media file.

    Args:
        passed_media: media file which to parse.

    Returns:
        file-like object, the filename of the media file, the file size, and
        the type of media.
    """
    img_formats = ['image/jpeg',
                   'image/png',
                   'image/gif',
                   'image/bmp',
                   'image/webp']
    video_formats = ['video/mp4',
                     'video/quicktime']

    # If passed_media is a string, check if it points to a URL, otherwise,
    # it should point to local file. Create a reference to a file obj for
    #  each case such that data_file ends up with a read() method.
    if not hasattr(passed_media, 'read'):
        if passed_media.startswith('http'):
            data_file = http_to_file(passed_media)
            filename = os.path.basename(passed_media)
        else:
            data_file = open(os.path.realpath(passed_media), 'rb')
            filename = os.path.basename(passed_media)

    # Otherwise, if a file object was passed in the first place,
    # create the standard reference to media_file (i.e., rename it to fp).
    else:
        if passed_media.mode not in ['rb', 'rb+', 'w+b']:
            raise TwitterError('File mode must be "rb" or "rb+"')
        filename = os.path.basename(passed_media.name)
        data_file = passed_media

    data_file.seek(0, 2)
    file_size = data_file.tell()

    try:
        data_file.seek(0)
    except Exception as e:
        pass

    media_type = mimetypes.guess_type(os.path.basename(filename))[0]
    if media_type is not None:
        if media_type in img_formats and file_size > 5 * 1048576:
            raise TwitterError({'message': 'Images must be less than 5MB.'})
        elif media_type in video_formats and file_size > 15 * 1048576:
            raise TwitterError({'message': 'Videos must be less than 15MB.'})
        elif media_type not in img_formats and media_type not in video_formats:
            raise TwitterError({'message': 'Media type could not be determined.'})

    return data_file, filename, file_size, media_type
项目:Chirps    作者:vered1986    | 项目源码 | 文件源码
def get_tweets(tweet_ids, consumer_key, consumer_secret, access_token, access_token_secret, nlp):
    """
    Expands tweets from Twitter
    :param tweet_ids: the list of tweet IDs to expand
    :return: a dictionary of tweet ID to tweet text
    """

    # Save tweets in a temporary file, in case the script stops working and re-starts
    tweets = {}
    if os.path.exists('tweet_temp'):
        with codecs.open('tweet_temp', 'r', 'utf-8') as f_in:
            lines = [tuple(line.strip().split('\t')) for line in f_in]
            tweets = { tweet_id : tweet for (tweet_id, tweet) in lines }

    api = twitter.Api(consumer_key=consumer_key, consumer_secret=consumer_secret, access_token_key=access_token,
                      access_token_secret=access_token_secret)

    [sleeptime, resettime] = reset_sleep_time(api)

    with codecs.open('tweet_temp', 'a', 'utf-8') as f_out:

        for tweet_id in tweet_ids:

            # We didn't download this tweet yet
            if not tweet_id in tweets:
                try:
                    curr_tweet = api.GetStatus(tweet_id, include_entities=False)
                    tweets[tweet_id] = clean_tweet(' '.join([t.lower_ for t in nlp(curr_tweet.text)]))

                except twitter.TwitterError as err:
                    error = str(err)

                    # If the rate limit exceeded, this script should be stopped and resumed the next day
                    if 'Rate limit exceeded' in error:
                        raise

                    # Other error - the tweet is not available :(
                    print 'Error reading tweet id:', tweet_id, ':', error
                    tweets[tweet_id] = 'TWEET IS NOT AVAILABLE'

                print >> f_out, '\t'.join((tweet_id, tweets[tweet_id]))

                time.sleep(sleeptime)
                if time.time() >= resettime:
                    [sleeptime, resettime] = reset_sleep_time(api)

    return tweets