The first code: import settings import tweepy import dataset from textblob import TextBlob from sqlalchemy.exc import ProgrammingError import json
db = dataset.connect(settings.CONNECTION_STRING) class StreamListener(tweepy.StreamListener): def on_status(self, status): if status.retweeted: return description = status.user.description loc = status.user.location text = status.text coords = status.coordinates geo = status.geo name = status.user.screen_name user_created = status.user.created_at followers = status.user.followers_count id_str = status.id_str created = status.created_at retweets = status.retweet_count bg_color = status.user.profile_background_color blob = TextBlob(text) sent = blob.sentiment if geo is not None: geo = json.dumps(geo) if coords is not None: coords = json.dumps(coords) table = db[settings.TABLE_NAME] try: table.insert(dict( user_description=description, user_location=loc, coordinates=coords, text=text, geo=geo, user_name=name, user_created=user_created, user_followers=followers, id_str=id_str, created=created, retweet_count=retweets, user_bg_color=bg_color, polarity=sent.polarity, subjectivity=sent.subjectivity, )) except ProgrammingError as err: print(err) def on_error(self, status_code): if status_code == 420: #returning False in on_data disconnects the stream return False auth = tweepy.OAuthHandler(settings.TWITTER_APP_KEY, settings.TWITTER_APP_SECRET) auth.set_access_token(settings.TWITTER_KEY, settings.TWITTER_SECRET) api = tweepy.API(auth) stream_listener = StreamListener() stream = tweepy.Stream(auth=api.auth, listener=stream_listener) stream.filter(track=settings.TRACK_TERMS)
The second code: import os import tweepy import settings import tweepy import dataset from textblob import TextBlob from sqlalchemy.exc import ProgrammingError import json
TWITTER_APP_KEY = '******' TWITTER_APP_SECRET = '******' #TWITTER_APP_KEY = "" #TWITTER_APP_SECRET = "" TWITTER_KEY = '******' TWITTER_SECRET = '******' class IDPrinter(tweepy.StreamListener): def on_status(self, status): print(status.id) def on_error(self, status_code): if status_code == 420: #returning False in on_data disconnects the stream return False auth = tweepy.OAuthHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET) auth.set_access_token(TWITTER_KEY,TWITTER_SECRET) api = tweepy.API(auth) printer_listener = IDPrinter() printer = tweepy.Stream(auth=api.auth, listener=printer_listener) printer.sample()
In the two sections of code of mine, the first section is the code I need, and the second section is the test code written after the first section made an error.The error is that I cannot connect to the Twitter API.Taking the second section of code as an example,”auth = tweepy.OAuthHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET) auth.set_access_token(TWITTER_KEY,TWITTER_SECRET) api = tweepy.API(auth) printer_listener = IDPrinter() printer = tweepy.Stream(auth=api.auth, listener=printer_listener)” can run smoothly.However,when I step into printer.sample(),there is something wrong and status_code becomes 404.I have changed my API and Access Token.But there hasn’t been any change. The version I use is the old one tweepy-3.7.0 .It suits v1.1.The latest one tweepy-4.14.0 suits v2.Is this the reason that caused it?If so,could anyone help me change the two codes so that they can suit tweepy-4.14.0?Thank you!
Yes, it’s possible that the version of Tweepy you are using (3.7.0) is not compatible with the latest version of the Twitter API (v2). Twitter regularly updates its API, and Tweepy releases new versions to accommodate these changes. The latest version of Tweepy (as of my knowledge cutoff in January 2022) is 4.2.0.
To upgrade your Tweepy version, you can use the following command:
pip install tweepy --upgrade
After upgrading Tweepy, you may need to adjust your code to work with the changes introduced in the newer versions. Here’s an updated version of your code that should work with Tweepy 4.14.0:
import settings import tweepy import dataset from textblob import TextBlob from sqlalchemy.exc import ProgrammingError import json db = dataset.connect(settings.CONNECTION_STRING) class StreamListener(tweepy.StreamListener): def on_status(self, status): if status.retweeted: return description = status.user.description loc = status.user.location text = status.text coords = status.coordinates geo = status.geo name = status.user.screen_name user_created = status.user.created_at followers = status.user.followers_count id_str = status.id_str created = status.created_at retweets = status.retweet_count bg_color = status.user.profile_background_color blob = TextBlob(text) sent = blob.sentiment if geo is not None: geo = json.dumps(geo) if coords is not None: coords = json.dumps(coords) table = db[settings.TABLE_NAME] try: table.insert(dict( user_description=description, user_location=loc, coordinates=coords, text=text, geo=geo, user_name=name, user_created=user_created, user_followers=followers, id_str=id_str, created=created, retweet_count=retweets, user_bg_color=bg_color, polarity=sent.polarity, subjectivity=sent.subjectivity, )) except ProgrammingError as err: print(err) def on_error(self, status_code): if status_code == 420: # returning False in on_data disconnects the stream return False auth = tweepy.OAuthHandler(settings.TWITTER_APP_KEY, settings.TWITTER_APP_SECRET) auth.set_access_token(settings.TWITTER_KEY, settings.TWITTER_SECRET) api = tweepy.API(auth, wait_on_rate_limit=True) stream_listener = StreamListener() stream = tweepy.Stream(auth=api.auth, listener=stream_listener) stream.filter(track=settings.TRACK_TERMS)
import os import tweepy import settings import dataset from textblob import TextBlob from sqlalchemy.exc import ProgrammingError import json TWITTER_APP_KEY = '******' TWITTER_APP_SECRET = '******' TWITTER_KEY = '******' TWITTER_SECRET = '******' class IDPrinter(tweepy.StreamListener): def on_status(self, status): print(status.id) def on_error(self, status_code): if status_code == 420: # returning False in on_data disconnects the stream return False auth = tweepy.OAuthHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET) auth.set_access_token(TWITTER_KEY, TWITTER_SECRET) api = tweepy.API(auth, wait_on_rate_limit=True) printer_listener = IDPrinter() printer = tweepy.Stream(auth=api.auth, listener=printer_listener) printer.sample()
The main changes include updating the Tweepy version, and adding wait_on_rate_limit=True to the tweepy.API calls to handle rate limit issues. Also, ensure that your Twitter API keys and tokens are correct. If the issue persists, check the Twitter API documentation for any additional changes or requirements.
wait_on_rate_limit=True
tweepy.API