Introduction

In this Notebook, we will query Twitter API to stream us all tweet with a specific keyword. Using the model trained in "Model's notebook", we will predict the sentiment of every tweet prior to store them into a NoSQL database done with MongoDB.

As a trial, I'll record all tweets from match France vs Argentina during the World Cup 2018 to do a further exploration on a futur Notebook.

Creation of the Stream and Database

In [17]:
import os
import datetime

import tweepy  # library to query twitter
import text_prediction  # library to predict the sentiment (reuse model trained in Model's notebook)

import pymongo
from pymongo import MongoClient
In [18]:
# key are requested on Twitter (they are not shared as they are private for every apps we register on Twitter)
with open("key.txt", "r", encoding="utf-8") as f:
    for line in f:
        key, value = line.split(":")
        if key =="ACCESS_TOKEN" :
            ACCESS_TOKEN = value[:-1]
        if key =="ACCESS_SECRET" :
            ACCESS_SECRET = value[:-1]
        if key =="CONSUMER_KEY" :
            CONSUMER_KEY = value[:-1]
        if key =="CONSUMER_SECRET" :
            CONSUMER_SECRET = value[:-1]
In [19]:
# required due to model serialization
def dummy_fun(doc):
    return doc

# Class which load the models into memory and process a sentence to sentiment
p = text_prediction.Text_Processor()
In [20]:
# Connect to MongoDB hosted in localhost on port 27017
client = MongoClient('localhost', 27017)

db = client['Twitter_db']
collection_clean = db['tweets_clean']
# collection_raw = db['tweets_raw']
In [21]:
# log into twitter API
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
api = tweepy.API(auth)
In [22]:
# Class to handle the stream of tweets
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        tweet = status._json
        try:
            if "text" in tweet:
                obj = {}
                keep = True
                
                obj["_id"] = tweet["id"]
                
                if "extended_tweet" in tweet:
                    obj["text"] = tweet["extended_tweet"]["full_text"]
                elif "text" in tweet:
                    obj["text"] = tweet["text"]
                else:
                    keep = False
                
                if obj["text"][:4] == "RT @":
                    keep = False
                
                if keep:
                    obj["time"] = datetime.datetime.fromtimestamp(int(tweet["timestamp_ms"])//1000)
                    hashtags = []
                    for hashtag in tweet["entities"]["hashtags"]:
                        if len(hashtag) > 0:
                            hashtags.append(hashtag["text"])
                    obj["hashtags"] = hashtags
                    
                    tokens, pred = p.predict(obj["text"])
                    
                    obj["sentiment"] = pred
                    obj["tokens"] = tokens
                    
                    collection_clean.insert_one(obj)
#                     collection_raw.insert_one(tweet)
        except AttributeError as Attr_err:
            print(Attr_err)
            pass
        except pymongo.errors.DuplicateKeyError:
            print(obj["_id"], "duplicated")
            pass
    
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False
        
myStreamListener = MyStreamListener()
In [23]:
# Start the stream and store processed tweet to MongoDB
try:
    myStream.disconnect()
except:
    pass

myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
myStream.filter(track=['#FRAARG'], languages=["en"]) #, async=True
1013053797113978880 duplicated
1013058077929730048 duplicated
1013058077929730048 duplicated
1013058578645684224 duplicated
1013058843859922944 duplicated
1013060042931757057 duplicated
1013062114187964416 duplicated
1013062354794188800 duplicated
1013062615944310785 duplicated
1013063064575496192 duplicated
1013063122246987777 duplicated
1013063363234914305 duplicated
1013063789888135168 duplicated
1013064712165224449 duplicated
1013065140315406337 duplicated
1013065346587222016 duplicated
1013065624438763520 duplicated
1013065878395674624 duplicated
1013065897613783042 duplicated
1013067637956284416 duplicated
1013068407703392257 duplicated
1013068904011259904 duplicated
1013069149461975040 duplicated
1013069656113008640 duplicated
1013070143943999488 duplicated
1013070155444654080 duplicated
1013070157441187841 duplicated
1013070202915999746 duplicated
1013070409305116678 duplicated
1013070408516554752 duplicated
1013070659147173888 duplicated
1013070662355701760 duplicated
1013071181686034432 duplicated
1013071666518413316 duplicated
1013071917207650304 duplicated
1013071922924384256 duplicated
1013072169079730176 duplicated
1013072419261595650 duplicated
1013072431324508160 duplicated
1013072419551043589 duplicated
1013073184730632192 duplicated
1013073435281408000 duplicated
1013075690403459072 duplicated
1013076447869759488 duplicated
1013076640870436866 duplicated
1013076697971875840 duplicated
1013076948791250945 duplicated
1013077457417617408 duplicated
1013077470122086400 duplicated
1013078962065424384 duplicated
1013079224578527233 duplicated
1013079475737817089 duplicated
1013079970271330305 duplicated
1013080221896015873 duplicated
1013080227407331329 duplicated
1013080233812033536 duplicated
1013080725241856005 duplicated
1013080976887447552 duplicated
1013080975834828800 duplicated
1013081227451097088 duplicated
1013081413678239744 duplicated
1013081489141977089 duplicated
1013081980672626691 duplicated
1013082498471882752 duplicated
1013082739233308672 duplicated
1013083755941068801 duplicated
1013084498077016064 duplicated
1013084751245205504 duplicated
1013084757893230594 duplicated
1013084757524189184 duplicated
1013084756563693570 duplicated
1013086100745670656 duplicated
1013086262930989056 duplicated
1013086512123035648 duplicated
1013086512123035648 duplicated
1013087014814715904 duplicated
1013087520668741636 duplicated
1013087518554591233 duplicated
1013087518554591233 duplicated
1013087520597237760 duplicated
1013087772415025154 duplicated
1013088274292924417 duplicated
1013089038272614400 duplicated
1013089288982937603 duplicated
1013089782661091330 duplicated
1013090052367237120 duplicated
1013090554937163776 duplicated
1013093058613661698 duplicated
1013094339277250560 duplicated
1013095069782593536 duplicated
1013095069782593536 duplicated
1013095332249403392 duplicated
---------------------------------------------------------------------------
WantReadError                             Traceback (most recent call last)
C:\python36\envs\machine_learning\lib\site-packages\urllib3\contrib\pyopenssl.py in recv_into(self, *args, **kwargs)
    279         try:
--> 280             return self.connection.recv_into(*args, **kwargs)
    281         except OpenSSL.SSL.SysCallError as e:

C:\python36\envs\machine_learning\lib\site-packages\OpenSSL\SSL.py in recv_into(self, buffer, nbytes, flags)
   1624             result = _lib.SSL_read(self._ssl, buf, nbytes)
-> 1625         self._raise_ssl_error(self._ssl, result)
   1626 

C:\python36\envs\machine_learning\lib\site-packages\OpenSSL\SSL.py in _raise_ssl_error(self, ssl, result)
   1430         if error == _lib.SSL_ERROR_WANT_READ:
-> 1431             raise WantReadError()
   1432         elif error == _lib.SSL_ERROR_WANT_WRITE:

WantReadError: 

During handling of the above exception, another exception occurred:

KeyboardInterrupt                         Traceback (most recent call last)
<ipython-input-23-bba6dcfa7322> in <module>()
      6 
      7 myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
----> 8 myStream.filter(track=['#FRAARG'], languages=["en"]) #, async=True

C:\python36\envs\machine_learning\lib\site-packages\tweepy\streaming.py in filter(self, follow, track, async, locations, stall_warnings, languages, encoding, filter_level)
    448         self.session.params = {'delimited': 'length'}
    449         self.host = 'stream.twitter.com'
--> 450         self._start(async)
    451 
    452     def sitestream(self, follow, stall_warnings=False,

C:\python36\envs\machine_learning\lib\site-packages\tweepy\streaming.py in _start(self, async)
    362             self._thread.start()
    363         else:
--> 364             self._run()
    365 
    366     def on_closed(self, resp):

C:\python36\envs\machine_learning\lib\site-packages\tweepy\streaming.py in _run(self)
    264                     self.snooze_time = self.snooze_time_step
    265                     self.listener.on_connect()
--> 266                     self._read_loop(resp)
    267             except (Timeout, ssl.SSLError) as exc:
    268                 # This is still necessary, as a SSLError can actually be

C:\python36\envs\machine_learning\lib\site-packages\tweepy\streaming.py in _read_loop(self, resp)
    314             length = 0
    315             while not resp.raw.closed:
--> 316                 line = buf.read_line().strip()
    317                 if not line:
    318                     self.listener.keep_alive()  # keep-alive new lines are expected

C:\python36\envs\machine_learning\lib\site-packages\tweepy\streaming.py in read_line(self, sep)
    179             else:
    180                 start = len(self._buffer)
--> 181             self._buffer += self._stream.read(self._chunk_size)
    182         return six.b('')
    183 

C:\python36\envs\machine_learning\lib\site-packages\urllib3\response.py in read(self, amt, decode_content, cache_content)
    382             else:
    383                 cache_content = False
--> 384                 data = self._fp.read(amt)
    385                 if amt != 0 and not data:  # Platform-specific: Buggy versions of Python.
    386                     # Close the connection when no data is returned

C:\python36\envs\machine_learning\lib\http\client.py in read(self, amt)
    447             # Amount is given, implement using readinto
    448             b = bytearray(amt)
--> 449             n = self.readinto(b)
    450             return memoryview(b)[:n].tobytes()
    451         else:

C:\python36\envs\machine_learning\lib\http\client.py in readinto(self, b)
    481 
    482         if self.chunked:
--> 483             return self._readinto_chunked(b)
    484 
    485         if self.length is not None:

C:\python36\envs\machine_learning\lib\http\client.py in _readinto_chunked(self, b)
    576         try:
    577             while True:
--> 578                 chunk_left = self._get_chunk_left()
    579                 if chunk_left is None:
    580                     return total_bytes

C:\python36\envs\machine_learning\lib\http\client.py in _get_chunk_left(self)
    544                 self._safe_read(2)  # toss the CRLF at the end of the chunk
    545             try:
--> 546                 chunk_left = self._read_next_chunk_size()
    547             except ValueError:
    548                 raise IncompleteRead(b'')

C:\python36\envs\machine_learning\lib\http\client.py in _read_next_chunk_size(self)
    504     def _read_next_chunk_size(self):
    505         # Read the next chunk size from the file
--> 506         line = self.fp.readline(_MAXLINE + 1)
    507         if len(line) > _MAXLINE:
    508             raise LineTooLong("chunk size")

C:\python36\envs\machine_learning\lib\socket.py in readinto(self, b)
    584         while True:
    585             try:
--> 586                 return self._sock.recv_into(b)
    587             except timeout:
    588                 self._timeout_occurred = True

C:\python36\envs\machine_learning\lib\site-packages\urllib3\contrib\pyopenssl.py in recv_into(self, *args, **kwargs)
    290                 raise
    291         except OpenSSL.SSL.WantReadError:
--> 292             rd = util.wait_for_read(self.socket, self.socket.gettimeout())
    293             if not rd:
    294                 raise timeout('The read operation timed out')

C:\python36\envs\machine_learning\lib\site-packages\urllib3\util\wait.py in wait_for_read(socks, timeout)
     31     or optionally a single socket if passed in. Returns a list of
     32     sockets that can be read from immediately. """
---> 33     return _wait_for_io_events(socks, EVENT_READ, timeout)
     34 
     35 

C:\python36\envs\machine_learning\lib\site-packages\urllib3\util\wait.py in _wait_for_io_events(socks, events, timeout)
     24             selector.register(sock, events)
     25         return [key[0].fileobj for key in
---> 26                 selector.select(timeout) if key[1] & events]
     27 
     28 

C:\python36\envs\machine_learning\lib\site-packages\urllib3\util\selectors.py in select(self, timeout)
    318             ready = []
    319             r, w, _ = _syscall_wrapper(self._select, True, self._readers,
--> 320                                        self._writers, timeout)
    321             r = set(r)
    322             w = set(w)

C:\python36\envs\machine_learning\lib\site-packages\urllib3\util\selectors.py in _syscall_wrapper(func, _, *args, **kwargs)
     62         and recalculate their timeouts. """
     63         try:
---> 64             return func(*args, **kwargs)
     65         except (OSError, IOError, select.error) as e:
     66             errcode = None

C:\python36\envs\machine_learning\lib\site-packages\urllib3\util\selectors.py in _select(self, r, w, timeout)
    308         def _select(self, r, w, timeout=None):
    309             """ Wrapper for select.select because timeout is a positional arg """
--> 310             return select.select(r, w, [], timeout)
    311 
    312         def select(self, timeout=None):

KeyboardInterrupt: 
In [24]:
myStream.disconnect()

Results

In [25]:
import json
import datetime

from bson import json_util

import matplotlib.pyplot as plt

import pymongo
from pymongo import MongoClient
In [26]:
client = MongoClient('localhost', 27017)

db = client['Twitter_db']
collection_clean = db['tweets_clean']
In [33]:
# first tweet saved
print(json.dumps(collection_clean.find_one(), indent=4, default=json_util.default))
{
    "_id": 1013053260956098561,
    "text": "Come on France. #worldcup2018 #FRAARG",
    "time": {
        "$date": 1530372893000
    },
    "hashtags": [
        "worldcup2018",
        "FRAARG"
    ],
    "sentiment": 0.6849161215932269,
    "tokens": [
        "come",
        "on",
        "franc"
    ]
}
In [39]:
pipeline_sort = {
                    "$sort": {
                        "_id" : 1
                    }
                }

pipeline_group = { 
                    "$group" : 
                        { 
                            "_id" : { 
                                "year": { "$year": "$time" },
                                "month": { "$month": "$time" },
                                "day": { "$dayOfMonth": "$time" },
                                "hour": { "$hour": "$time" },
                                "minute": { "$minute": "$time" },
                            },
                            "count" : { 
                                "$sum" : 1 
                            },
                            "sentiment_global" : { 
                                "$avg" : "$sentiment" 
                            },
#                             "res" : { 
#                                 "$push" : { "$divide": [ {"$trunc": { "$multiply": [ "$sentiment" , 1000 ] } }, 1000 ] }
#                             },
#                             "key" : {
#                                 "$push" : "$tokens" 
#                             },
                            "maxi" : { 
                                "$max" : "$sentiment" 
                            },
                            "mini" : { 
                                "$min" : "$sentiment" 
                            },
                        }
                    }


pipeline = [
    pipeline_group,
    pipeline_sort
]

cursor = collection_clean.aggregate(pipeline)
In [38]:
for i, each in enumerate(cursor):
    print(json.dumps(each, indent=4, default=json_util.default))
    break
{
    "_id": {
        "year": 2018,
        "month": 6,
        "day": 30,
        "hour": 15,
        "minute": 34
    },
    "count": 8,
    "sentiment_global": 0.7055819751784014,
    "maxi": 0.8557995837767274,
    "mini": 0.4663488250645606
}
In [40]:
x = []
y_max = []
y_min = []
y_avg = []
nb = []
for i, each in enumerate(cursor):
    x.append(datetime.datetime(year = each["_id"]["year"], 
                          month = each["_id"]["month"], 
                          day = each["_id"]["day"], 
                          hour = each["_id"]["hour"], 
                          minute = each["_id"]["minute"]))
    y_max.append(each["maxi"])
    y_min.append(each["mini"])
    y_avg.append(each["sentiment_global"])
    nb.append(each["count"])
In [41]:
fig, (ax1, ax2) = plt.subplots(2, 1, figsize = (20, 20))
ax1.plot(x, y_max, label="sentiment max")
ax1.plot(x, y_min, label="sentiment min")
ax1.plot(x, y_avg, label="sentiment avg")
ax1.set_xlabel("Date", fontsize=20)
ax1.set_ylabel("Sentiment", fontsize=20)
ax1.set_title("Evolution of Sentiment over time", fontsize=30)

ax2.plot(x, nb)
ax2.set_title("Evolution of Number of Tweets over time", fontsize=30)
ax2.set_xlabel("Date", fontsize=20)
ax2.set_ylabel("Number Tweets", fontsize=20)
plt.show()

We can see some peaks (9 in total) related to most probably the 7 goals and 2 other actions. This is where we will dig more in details in a future Notebook. In term of sentiments, our model is not perfect at it is evaluating only words and we have often few of them as it is tweet. As a result, the trend si not really visible. The average has only few peaks also related most probably to goals.

Conclusion

In this Notebook, we took a look at the twitter API to stream tweet. A NoSQL database has been used to store the content and we used the model of Sentiment Analysis trained previously to see the trend.

As the sentiment Analysis is not good enougth to highlight trends. We will explore more in depth the content of tweet later.