-
Notifications
You must be signed in to change notification settings - Fork 0
/
instagram_bot_scraper.py
203 lines (168 loc) · 7.65 KB
/
instagram_bot_scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
from selenium.webdriver import Chrome, ChromeOptions
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.common.exceptions import StaleElementReferenceException, TimeoutException
from os import path, makedirs, listdir, getcwd
from time import sleep
from util import valid_input
class InstagramScrapper(object):
"""
An bot that grabs instagram data from your browser
"""
def __init__(self, username, password):
opts = ChromeOptions()
opts.add_experimental_option('w3c', False)
self.driver = Chrome(
chrome_options=opts, executable_path=path.join(getcwd(), 'chromedriver'))
self.user = {'username': username.lower(), 'password': password}
self.open_instagram_and_login()
def open_instagram_and_login(self):
"""
Opens instagram.com in Chrome and logs you in using given credentials.
"""
login_route = "https://www.instagram.com/accounts/login/?source=auth_switcher"
# Open Instragram
self.driver.get(login_route)
wait = WebDriverWait(self.driver, 10)
wait.until(EC.presence_of_element_located((By.NAME, 'username')))
# Input Login Credentials
username_input = self.driver.find_element_by_name("username")
password_input = self.driver.find_element_by_name("password")
username_input.send_keys(self.user['username'])
password_input.send_keys(self.user['password'])
# Login
self.driver.find_element_by_css_selector("button[type=submit]").click()
sleep(2)
def _generate_accounts_from(self, container, expectation):
"""
A generator that yields account usernames for every new scroll down
the "followers/following" list container on instagram. This is allows
us to operate on the new results (extract their usernames from the html)
while new results are loading. Alternatively we would have had to wait
for all the results to load, and then process the html for accounts -
which was way more inefficient.
"""
def get_account(li):
return li.find_elements_by_tag_name('a')[-1].text
count = 0
while count < expectation:
try:
list_items = container.find_elements_by_css_selector(
f'li:nth-child(n+{count+1})')
self.driver.execute_script(
'arguments[0].scrollIntoView()', list_items[-1])
for account in map(get_account, list_items):
count += 1
yield account
if count >= expectation:
break
sleep(0.1)
except:
sleep(0.2)
def _log(self, account_type: str, log_filepath: str, update=False, mutuals_only=False) -> [str]:
"""
An internal method that does the heavy lifting when logging information.
"""
# Go to profile page
desired_url = f"https://www.instagram.com/{self.user['username']}/"
self.driver.get(desired_url)
wait = WebDriverWait(self.driver, 5)
wait.until(lambda driver: driver.current_url == desired_url)
anchor_href = f"/{self.user['username']}/{account_type}/"
if mutuals_only:
anchor_href += "mutualOnly"
cancel = False # Need this flag because the 'finally' runs after return
try:
locator = (By.CSS_SELECTOR, f'a[href="{anchor_href}"]')
link = wait.until(EC.presence_of_element_located(locator))
link_text = link.find_element_by_tag_name('span').text
num_following = link_text.split(
' ')[-2] if mutuals_only else link_text
num_following = int(num_following.replace(',', ''))
except ValueError: # We will assume this user has 1-3 mutual followers
spans = link.find_elements_by_css_selector('span span')
num_following = len(spans)
except TimeoutException: # We will assume this user has no mutual
cancel = True # followers and we will skip to the next one
finally:
if cancel:
return print(f"> ! {self.user['username']} has no mutual followers")
link.click()
if mutuals_only:
btn = (By.CSS_SELECTOR,
f"a[href='/{self.user['username']}/followers/mutualFirst']")
wait.until(EC.presence_of_element_located(btn)).click()
selector = 'div[role="dialog"] div.isgrP ul div.PZuss'
list_container = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, selector)))
accounts_gen = self._generate_accounts_from(
list_container, num_following-1)
accounts = []
with open(log_filepath, 'w') as out:
for account in accounts_gen:
accounts.append(account)
out.write(account + "\n")
return accounts
def log_following(self, log_filepath="following.txt", update=False):
"""
Navigates to /username/following and records the usernames of everyone
you follow into the log_filepath.
"""
try:
return self._log("following", log_filepath, update)
except TimeoutException:
self.driver.refresh()
sleep(2)
return self._log("following", log_filepath, update)
def log_followers(self, log_filepath="followers.txt", update=False):
"""
Navigates to /username/followers and records the usernames of everyone
that follows you into the log_filepath.
"""
try:
return self._log("followers", log_filepath, update)
except TimeoutException:
self.driver.refresh()
sleep(2)
return self._log("followers", log_filepath, update)
def log_connections(self, log_filepath="connections.txt"):
"""
Logs the people following you, the people you follow, and the intersection
of the two.
"""
# Get followers
followers_path = f"data/instagram/{self.user['username']}/followers.txt"
followers = self.log_followers(followers_path)
# Get following
following_path = f"data/instagram/{self.user['username']}/following.txt"
following = self.log_following(following_path)
# Get intersection
connections = set(following).intersection(set(followers))
print(
f"Found {len(connections)} connections with {self.user['username']}")
with open(log_filepath, 'w') as out:
print("Saving results...")
out.write("\n".join(connections))
print("Done.")
return connections
def log_mutuals_with(self, username):
"""
Logs your connections, then all of the mutual relationships between them.
"""
log_path = f"data/instagram/{self.user['username']}/"
if not path.exists(log_path):
makedirs(log_path)
log_path += f"mutuals_with_{username}.txt"
if not path.exists(log_path):
try:
self._log("followers", log_path, mutuals_only=True)
print("> Saved", log_path)
except TimeoutException:
print("Stuff didn't load in time. Refreshing and trying again.")
self.driver.refresh()
sleep(2)
return self.log_mutuals_with(username)
else:
print("> Skipping", self.user['username'])