SDKでTwitterデータを収集するLogService
本記事では、LogService SDKでTwitterデータをLogServiceへ収集する方法を記載します。
前書き
LogService は、リアルタイムデータロギングサービスです。 ログの収集、消費、出荷、検索、および分析をサポートし、大量のログを処理および分析する能力を向上させます。
少し前になりますが、LogServiceについての資料をSlideShareへアップロードしていますので、こちらも参考になればと思います。
https://www2.slideshare.net/sbcloud/alibaba-cloud-log-service
今回はPython SDKを使ってTwitterデータをAlibaba Cloud LogServiceへ収集、蓄積、可視化してみましょう。構成図で、こんな感じです。
プロジェクト作成(LogService全体で共通事項)
まずはプロジェクトを作成します。LogServiceコンソールから 「Create Project」を選択し、起動します。
Project Nameをここでは「techblog」にし、プロジェクトを作成します。
その直後に "Do you want to create a Logstore for log data storage immediately?"、「Log Storeを作成しますか?」とポップアップが出ます。 Log StoreはLog Serviceでデータを蓄積するものなので、「OK」を選定します。
LogStore Nameをここでは「twitter_logstore」と入力し、LogStoreを作成します。
その後、「LogStoreが作成されました。今すぐデータアクセスしますか?」とポップアップが出ますが、これは必要に応じて選定すると良いです。 ちなみに「Yes」を選択した場合、50を超える様々なデータアクセス手法のコンソールが表示されます。
データ格納について
STEP1: Twitter APIの準備
Twitter APIを利用するために、Twitter Developer Platformサービスから、Twitter API利用申請をします。
(ここでの説明は省略します)
申請後、Twitter APIとしてCONSUMER_KEY、CONSUMER_SECRET 、ACCESS_KEY 、ACCESS_SECRET を取得します。
STEP2: LoggService 環境の準備
SDKでTwitterデータを収集する前に、LogService側でいろいろ準備する必要があります。
ENDPOINT 、ACCESSKEYID 、ACCESSKEY 、PROJECT 、LOGSTORE 、TOKEN 、topic 、source のパラメータ値を用意しなければなりません。
ENDPOINTはプロダクトサービスが利用している国を指します。
今回は日本リージョンを選定しているので、 ap-northeast-1.log.aliyuncs.com
を選定します。
ACCESSKEYID 、ACCESSKEY はAlibaba Cloudユーザーの認証情報です。これがあれば、様々なサービス・リソースへのプログラムによるアクセスを許可します。
これはコンソールから確認できます。RAMユーザー(子アカウント)の場合は、RAMユーザとしてのACCESSKEYID 、ACCESSKEYを入手する必要があります。
PROJECT はLogServiceのProjectのことを指します。上記「techblog」と作成したので、Projectはtechblog
にします。
LOGSTORE は 「techblog」らProject配下に作成したLogStoreを指定する必要があります。上記「twitter_logstore」を作成したので、ここはtwitter_logstore
にします。
TOKEN 、topic 、source は任意です。これはLogServiceでデータを出力するときに参照として残るパラメータ値です。
STEP3: ECSにて以下Pythonファイルを作成
上記のTwitter API CONSUMER_KEY、CONSUMER_SECRET 、ACCESS_KEY 、ACCESS_SECRET、
およびNDPOINT 、ACCESSKEYID 、ACCESSKEY 、PROJECT 、LOGSTORE 、TOKEN 、topic 、source のパラメータ値の準備ができたら、
ECSなどにて以下Pythonコードを記載します。 著者の Python version は Python 3.6.8
です。
#!/usr/bin/env python# -*- coding: utf-8 -*-# pip3 install -U aliyun-log-python-sdk# pip3 install tweepyfrom tweepy import StreamListenerfrom tweepy import Streamimport tweepy, json, timefrom datetime import datetimefrom aliyun.log.logitem import LogItemfrom aliyun.log.logclient import LogClientfrom aliyun.log.putlogsrequest import PutLogsRequestCONSUMER_KEY = "<your CONSUMER_KEY>"CONSUMER_SECRET ="<your CONSUMER_SECRET >"ACCESS_KEY = "<your ACCESS_KEY >"ACCESS_SECRET = "<your ACCESS_SECRET >"ENDPOINT = 'ap-northeast-1.log.aliyuncs.com'ACCESSKEYID = "<your ACCESSKEYID >"ACCESSKEY ="<your ACCESSKEY >"PROJECT = 'techblog'LOGSTORE = 'twitter_logstore'TOKEN = ""topic = 'twitter_log_demo'source = 'twitter'auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)auth.set_access_token(ACCESS_KEY, ACCESS_SECRET)api = tweepy.API(auth)client = LogClient(ENDPOINT, ACCESSKEYID, ACCESSKEY, TOKEN)class StdOutListener(StreamListener):def on_data(self, data):# print(data)tweet = json.loads(data)if not tweet['retweeted'] and 'RT @' not in tweet['text']: # retweetは取得しないnew_datetime = datetime.strftime(datetime.strptime(tweet["created_at"],"%a %b %d %H:%M:%S +0000 %Y"), "%Y-%m-%d %H:%M:%S")utc_datetime = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'if 'Web' in tweet['source']:isWeb = 1isAndroid = 0isiPhone = 0elif 'Android' in tweet['source']:isWeb = 0isAndroid = 1isiPhone = 0elif 'iPhone' in tweet['source']:isWeb = 0isAndroid = 0isiPhone = 1else:isWeb = 0isAndroid = 0isiPhone = 0contents = [( 'created_at', new_datetime ),( 'created_at_utc', utc_datetime ),( 'id_str', tweet['id_str'] ),( 'text', tweet['text'] ),( 'source', tweet['source'] ),( 'Web', str(isWeb) ),( 'Android', str(isAndroid) ),( 'iPhone', str(isiPhone) ),( 'tweet_size', str(len( tweet['text'] )) )]print(contents)# put LogServicelogitemList = []logItem = LogItem()logItem.set_time(int(time.time()))logItem.set_contents(contents)logitemList.append(logItem)request = PutLogsRequest(PROJECT, LOGSTORE, topic, source, logitemList)response = client.put_logs(request)response.log_print()def on_error(self, status):print(status)if __name__ == '__main__':query = "#鬼滅の刃" # 取得したい特定のキーワードやハッシュタグlistener = StdOutListener()twitterStream = Stream(auth, listener)twitterStream.filter(track = [query])
注意として、このコードはそのままでは実行できないので、以下コマンドでTweepy、logservice-sdkをインストール実行する必要があります。
pip3 install -U aliyun-log-python-sdkpip3 install tweepy
STEP4: 実行
ECSが閉じても恒久的に動くよう、nohup コマンドで 実行します。
[root@sts ~]# nohup python3 tweet.py &
これで以上です。取得したデータはこのような感じになります。
__source__: twitter__tag__:__client_ip__: 47.74.18.54__tag__:__receive_time__: 1607694838__topic__: twitter_log_democreated_at: 2020-12-1 13:53:53created_at_utc: 2020-12-1T13:53:58.171Zid_str: 1337395179389657091iPhone: 0Android: 1Web: 0source: <a href="http://twitter.com/download/android" rel="nofollow">Twitter for Android</a>text: これぞ至高の領域に近き剣士…#グラブル #鬼滅の刃 #日輪刀 https://t.co/IcW3ndTI4mtweet_size: 30
最後に
LogService Python SDKを使って、Twitterデータを収集する方法を簡単に説明しました。
この作業自体、5分もかからないです(Twitter API申請は除く、、)
ちなみにSDKは単にデータを収集するだけでなく、整形、ETL、格納処理もあります。
LogServiceはフルマネージド環境でありながら、様々なデータを収集し蓄積・可視化する事が可能です。
加えて、データ量や使い方に応じた課金なので、使い方次第ではコスト削減や、運用負荷の改善に効果があるのでは無いでしょうか。