-
Notifications
You must be signed in to change notification settings - Fork 0
/
csv_scheduler.py
88 lines (74 loc) · 2.79 KB
/
csv_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import tweepy
import csv
import json
import time
from datetime import datetime
from pathlib import Path
from time import strftime, gmtime
def execute():
# Authenticate to Twitter
authenticate_path = Path("authenticate.json")
if authenticate_path.exists():
with open(authenticate_path) as f:
authenticate = json.load(f)
first = authenticate["consumer_key"]
second = authenticate["consumer_secret"]
else:
first = input("consumer_key: ")
second = input("consumer_secret: ")
with open("authenticate.json","w") as f:
json.dump({"consumer_key":first,"consumer_secret":second},f)
auth = tweepy.OAuthHandler(first,second)
access_token_path = Path("access_token.json")
if access_token_path.exists():
with open(access_token_path) as f:
access_token = json.load(f)
auth.set_access_token(access_token["token"],access_token["secret"])
else:
try:
redirect_url = auth.get_authorization_url()
except tweepy.TweepError:
print('Error! Failed to get request token.')
print(redirect_url)
verifier = input("verification code: ")
try:
auth.get_access_token(verifier)
except tweepy.TweepError:
print('Error! Failed to get access token.')
with open("access_token.json","w") as f:
json.dump({"token":auth.access_token,"secret": auth.access_token_secret},f)
# Create API object
api = tweepy.API(auth)
last_line = 0
progress_path = Path("progress.json")
if progress_path.exists():
with open(progress_path) as f:
file = json.load(f)
last_line = file["last_line"]
tweets = get_tweets("example.csv")
for tweet in tweets[last_line:]:
# calcula quanto falta pra postagem
date = tweet["DiaPost"] + tweet["HoraPost"]
date_formt = datetime.strptime(date,"%d/%m/%Y%H:%M")
wait_time = date_formt - datetime.now()
if wait_time.total_seconds() > 0:
time.sleep(wait_time.total_seconds())
# Texto
api.update_status(tweet["Tweet1"])
last_tweet = api.user_timeline(id = api, count = 1)[0]
# Imagens
api.update_with_media(tweet["Tweet2"],in_reply_to_status_id=last_tweet.id)
last_tweet = api.user_timeline(id = api, count = 1)[0]
# Links
api.update_status(tweet["Tweet3"],in_reply_to_status_id=last_tweet.id)
# Atualiza postagens
last_line = last_line + 1
with open("progress.json","w") as f:
json.dump({"last_line":last_line},f)
# auth.set_access_token("ACCESS_TOKEN", "ACCESS_TOKEN_SECRET")
def get_tweets(csva):
tweets = []
reader = csv.DictReader(open(csva))
return [*reader]
if __name__ == '__main__':
execute()