diff --git a/README.md b/README.md
index 7c5af83..ea9395a 100644
--- a/README.md
+++ b/README.md
@@ -227,6 +227,11 @@ Output:
String |
If output_format parameter is set to CSV, then it is valid for directory parameter to be passed. If not passed then CSV file will be saved in current working directory. |
+
+ headless |
+ Boolean |
+ Whether to run crawler headlessly?. Default is True |
+
@@ -486,6 +491,26 @@ Output:
String |
If output parameter is set to CSV, then it is valid for directory parameter to be passed. If not passed then CSV file will be saved in current working directory. |
+
+ since_id |
+ Integer |
+ After (NOT inclusive) a specified Snowflake ID. Example here |
+
+
+ max_id |
+ Integer |
+ At or before (inclusive) a specified Snowflake ID. Example here |
+
+
+ within_time |
+ String |
+ Search within the last number of days, hours, minutes, or seconds. Example 2d, 3h, 5m, 30s . |
+
+
+ headless |
+ Boolean |
+ Whether to run crawler headlessly?. Default is True |
+
diff --git a/requirements.txt b/requirements.txt
deleted file mode 100644
index 090c302..0000000
--- a/requirements.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-python-dateutil==2.8.1
-selenium==3.141.0
-selenium-wire==4.3.1
-webdriver-manager==3.2.2
-fake-headers==1.0.2
\ No newline at end of file
diff --git a/setup.py b/setup.py
index b43b443..ca3981a 100644
--- a/setup.py
+++ b/setup.py
@@ -3,14 +3,10 @@
with open("README.md", "r", encoding="utf-8") as file:
long_description = file.read()
-requirements = []
-
-for line in open("requirements.txt", 'r', encoding="utf-8").readlines():
- requirements.append(line.replace("\n", ""))
setuptools.setup(
name="twitter_scraper_selenium",
- version="0.1.7",
+ version="2.0.0",
author="Sajid Shaikh",
author_email="shaikhsajid3732@gmail.com",
description="Python package to scrap twitter's front-end easily with selenium",
@@ -41,5 +37,11 @@
],
python_requires=">=3.6",
- install_requires=requirements
+ install_requires=[
+ 'python-dateutil==2.8.2',
+ 'selenium==4.3.0',
+ 'selenium-wire==4.6.4',
+ 'webdriver-manager==3.2.2',
+ 'fake-headers==1.0.2'
+ ]
)
diff --git a/twitter_scraper_selenium/driver_initialization.py b/twitter_scraper_selenium/driver_initialization.py
index 1478a40..da80bbf 100644
--- a/twitter_scraper_selenium/driver_initialization.py
+++ b/twitter_scraper_selenium/driver_initialization.py
@@ -2,27 +2,32 @@
try:
from seleniumwire import webdriver
# to add capabilities for chrome and firefox, import their Options with different aliases
- from selenium.webdriver.chrome.options import Options as ChromeOptions
- from selenium.webdriver.firefox.options import Options as FirefoxOptions
+ from selenium.webdriver.chrome.options import Options as CustomChromeOptions
+ from selenium.webdriver.firefox.options import Options as CustomFireFoxOptions
# import webdriver for downloading respective driver for the browser
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
from fake_headers import Headers
+ from selenium.webdriver.chrome.service import Service as ChromeService
+ from selenium.webdriver.firefox.service import Service as FirefoxService
+
except Exception as ex:
print(ex)
class Initializer:
- def __init__(self, browser_name, proxy=None):
+ def __init__(self, browser_name, headless, proxy=None):
self.browser_name = browser_name
self.proxy = proxy
+ self.headless = headless
def set_properties(self, browser_option):
"""adds capabilities to the driver"""
header = Headers().generate()['User-Agent']
- browser_option.add_argument(
- '--headless') # runs browser in headless mode
+ if self.headless:
+ browser_option.add_argument(
+ '--headless') # runs browser in headless mode
browser_option.add_argument('--no-sandbox')
browser_option.add_argument("--disable-dev-shm-usage")
browser_option.add_argument('--ignore-certificate-errors')
@@ -37,7 +42,7 @@ def set_driver_for_browser(self, browser_name):
"""expects browser name and returns a driver instance"""
# if browser is suppose to be chrome
if browser_name.lower() == "chrome":
- browser_option = ChromeOptions()
+ browser_option = CustomChromeOptions()
# automatically installs chromedriver and initialize it and returns the instance
if self.proxy is not None:
options = {
@@ -46,12 +51,13 @@ def set_driver_for_browser(self, browser_name):
'no_proxy': 'localhost, 127.0.0.1'
}
print("Using: {}".format(self.proxy))
- return webdriver.Chrome(executable_path=ChromeDriverManager().install(),
+
+ return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()),
options=self.set_properties(browser_option), seleniumwire_options=options)
- return webdriver.Chrome(executable_path=ChromeDriverManager().install(), options=self.set_properties(browser_option))
+ return webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=self.set_properties(browser_option))
elif browser_name.lower() == "firefox":
- browser_option = FirefoxOptions()
+ browser_option = CustomFireFoxOptions()
if self.proxy is not None:
options = {
'https': 'https://{}'.format(self.proxy.replace(" ", "")),
@@ -59,11 +65,12 @@ def set_driver_for_browser(self, browser_name):
'no_proxy': 'localhost, 127.0.0.1'
}
print("Using: {}".format(self.proxy))
- return webdriver.Firefox(executable_path=GeckoDriverManager().install(),
+
+ return webdriver.Firefox(service=FirefoxService(executable_path=GeckoDriverManager().install()),
options=self.set_properties(browser_option), seleniumwire_options=options)
# automatically installs geckodriver and initialize it and returns the instance
- return webdriver.Firefox(executable_path=GeckoDriverManager().install(), options=self.set_properties(browser_option))
+ return webdriver.Firefox(service=FirefoxService(executable_path=GeckoDriverManager().install()), options=self.set_properties(browser_option))
else:
# if browser_name is not chrome neither firefox than raise an exception
raise Exception("Browser not supported!")
diff --git a/twitter_scraper_selenium/driver_utils.py b/twitter_scraper_selenium/driver_utils.py
index bebc2e6..cdf2a0a 100644
--- a/twitter_scraper_selenium/driver_utils.py
+++ b/twitter_scraper_selenium/driver_utils.py
@@ -13,39 +13,41 @@
from random import randint
except Exception as ex:
frameinfo = currentframe()
- print("Error on line no. {} : {}".format(frameinfo.f_lineno,ex))
+ print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
frameinfo = currentframe()
+
class Utilities:
- """this class contains all the method related to driver behaviour,
- like scrolling, waiting for element to appear, it contains all static
- method, which accepts driver instance as a argument"""
+ """this class contains all the method related to driver behaviour,
+ like scrolling, waiting for element to appear, it contains all static
+ method, which accepts driver instance as a argument"""
- @staticmethod
- def __wait_until_tweets_appear(driver):
- try:
- WebDriverWait(driver, 10).until(EC.presence_of_element_located(
- (By.CSS_SELECTOR, '[data-testid="tweet"]')))
- except WebDriverException:
- print("Tweets did not appear!")
+ @staticmethod
+ def __wait_until_tweets_appear(driver):
+ try:
+ WebDriverWait(driver, 10).until(EC.presence_of_element_located(
+ (By.CSS_SELECTOR, '[data-testid="tweet"]')))
+ except WebDriverException:
+ print(
+ "Tweets did not appear!, Try setting headless=False to see what is happening")
- @staticmethod
- def __scroll_down(driver):
- try:
- body = driver.find_element_by_css_selector('body')
- for _ in range(3):
- body.send_keys(Keys.PAGE_DOWN)
- except Exception as ex:
- print("Error on line no. {} : {}".format(frameinfo.f_lineno,ex))
+ @staticmethod
+ def __scroll_down(driver):
+ try:
+ body = driver.find_element(By.CSS_SELECTOR, 'body')
+ for _ in range(randint(1,3)):
+ body.send_keys(Keys.PAGE_DOWN)
+ except Exception as ex:
+ print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
- @staticmethod
- def __wait_until_completion(driver):
- """waits until the page have completed loading"""
- try:
- state = ""
- while state != "complete":
- time.sleep(randint(3, 5))
- state = driver.execute_script("return document.readyState")
- except Exception as ex:
- print(ex)
+ @staticmethod
+ def __wait_until_completion(driver):
+ """waits until the page have completed loading"""
+ try:
+ state = ""
+ while state != "complete":
+ time.sleep(randint(3, 5))
+ state = driver.execute_script("return document.readyState")
+ except Exception as ex:
+ print(ex)
diff --git a/twitter_scraper_selenium/element_finder.py b/twitter_scraper_selenium/element_finder.py
index 54b54ec..4addc12 100644
--- a/twitter_scraper_selenium/element_finder.py
+++ b/twitter_scraper_selenium/element_finder.py
@@ -4,164 +4,178 @@
from .scraping_utilities import Scraping_utilities
from inspect import currentframe
from dateutil.parser import parse
+ from selenium.webdriver.common.by import By
except Exception as ex:
frameinfo = currentframe()
- print("Error on line no. {} : {}".format(frameinfo.f_lineno,ex))
+ print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
frameinfo = currentframe()
+
class Finder:
- """
- this class should contain all the static method to find that accept
- webdriver instance and perform operation to find elements and return the
- found element.
- method should follow convention like so:
-
- @staticmethod
- def __method_name(parameters):
- """
-
- @staticmethod
- def __fetch_all_tweets(driver):
- try:
- return driver.find_elements_by_css_selector('[data-testid="tweet"]')
- except:
- print("Error at method fetch_all_tweets on line no. {} : {}".format(frameinfo.f_lineno, ex))
-
- @staticmethod
- def __find_replies(tweet):
- try:
- replies_element = tweet.find_element_by_css_selector('[data-testid="reply"]')
- replies = replies_element.get_attribute("aria-label")
- return Scraping_utilities._Scraping_utilities__extract_digits(replies)
- except Exception as ex:
- print("Error at method find_replies on line no. {} : {}".format(frameinfo.f_lineno, ex))
- return ""
-
- @staticmethod
- def __find_shares(tweet):
- try:
- shares_element = tweet.find_element_by_css_selector('[data-testid="retweet"]')
- shares = shares_element.get_attribute("aria-label")
- return Scraping_utilities._Scraping_utilities__extract_digits(shares)
- except Exception as ex:
- print("Error at method find_shares on line no. {} : {}".format(frameinfo.f_lineno, ex))
- return ""
-
- @staticmethod
- def __find_status(tweet):
- try:
- anchor = tweet.find_element_by_css_selector("a.r-bcqeeo.r-3s2u2q.r-qvutc0")
- return (anchor.get_attribute("href").split("/"), anchor.get_attribute("href"))
- except Exception as ex:
- print("Error at method find_status on line no. {} : {}".format(frameinfo.f_lineno, ex))
- return []
-
- @staticmethod
- def __find_all_anchor_tags(tweet):
- try:
- return tweet.find_elements_by_tag_name('a')
- except Exception as ex:
- print("Error at method find_all_anchor_tags on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
-
- @staticmethod
- def __find_timestamp(tweet):
- try:
- timestamp = tweet.find_element_by_tag_name(
- "time").get_attribute("datetime")
- posted_time = parse(timestamp).isoformat()
- return posted_time
- except Exception as ex:
- print("Error at method find_timestamp on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
-
-
- @staticmethod
- def __find_content(tweet):
- try:
- #content_element = tweet.find_element_by_css_selector('.//*[@dir="auto"]')[4]
- content_element = tweet.find_element_by_css_selector('div[lang]')
- return content_element.text
- except NoSuchElementException:
- return ""
- except Exception as ex:
- print("Error at method find_content on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
-
- @staticmethod
- def __find_like(tweet):
- try:
- like_element = tweet.find_element_by_css_selector('[data-testid="like"]')
- likes = like_element.get_attribute("aria-label")
- return Scraping_utilities._Scraping_utilities__extract_digits(likes)
- except Exception as ex:
- print("Error at method find_like on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
- @staticmethod
- def __find_images(tweet):
- try:
- image_element = tweet.find_elements_by_css_selector(
- 'div[data-testid="tweetPhoto"]')
- images = []
- for image_div in image_element:
- href = image_div.find_element_by_tag_name("img").get_attribute("src")
- images.append(href)
- return images
- except Exception as ex:
- print("Error at method __find_images on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
-
- @staticmethod
- def __find_videos(tweet):
- try:
- image_element = tweet.find_elements_by_css_selector(
- 'div[data-testid="videoPlayer"]')
- videos = []
- for video_div in image_element:
- href = video_div.find_element_by_tag_name("video").get_attribute("src")
- videos.append(href)
- return videos
- except Exception as ex:
- print("Error at method find_videos on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
-
- @staticmethod
- def __is_retweet(tweet):
- try:
- tweet.find_element_by_css_selector('div.r-92ng3h.r-qvutc0')
- return True
- except NoSuchElementException:
- return False
- except Exception as ex:
- print("Error at method is_retweet on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
- return False
-
- @staticmethod
- def __find_name_from_post(tweet,is_retweet=False):
- try:
- name = "NA"
- anchors = Finder.__find_all_anchor_tags(tweet)
- if len(anchors) > 2:
- if is_retweet:
- name = anchors[2].text.strip()
- else:
- name = anchors[1].text.split("\n")[0]
- return name
- except Exception as ex:
- print("Error at method __find_name_from_post on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
-
- @staticmethod
- def __find_external_link(tweet):
- try:
- card = tweet.find_element_by_css_selector('[data-testid="card.wrapper"]')
- href = card.find_element_by_tag_name('a')
- return href.get_attribute("href")
-
- except NoSuchElementException:
- return ""
- except Exception as ex:
- print("Error at method __find_external_link on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
+ """
+ this class should contain all the static method to find that accept
+ webdriver instance and perform operation to find elements and return the
+ found element.
+ method should follow convention like so:
+
+ @staticmethod
+ def __method_name(parameters):
+ """
+
+ @staticmethod
+ def __fetch_all_tweets(driver):
+ try:
+ return driver.find_elements(By.CSS_SELECTOR, '[data-testid="tweet"]')
+ except:
+ print("Error at method fetch_all_tweets on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+ return []
+
+ @staticmethod
+ def __find_replies(tweet):
+ try:
+ replies_element = tweet.find_element(
+ By.CSS_SELECTOR, '[data-testid="reply"]')
+ replies = replies_element.get_attribute("aria-label")
+ return Scraping_utilities._Scraping_utilities__extract_digits(replies)
+ except Exception as ex:
+ print("Error at method find_replies on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+ return ""
+
+ @staticmethod
+ def __find_shares(tweet):
+ try:
+ shares_element = tweet.find_element(
+ By.CSS_SELECTOR, '[data-testid="retweet"]')
+ shares = shares_element.get_attribute("aria-label")
+ return Scraping_utilities._Scraping_utilities__extract_digits(shares)
+ except Exception as ex:
+ print("Error at method find_shares on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+ return ""
+
+ @staticmethod
+ def __find_status(tweet):
+ try:
+ anchor = tweet.find_element(
+ By.CSS_SELECTOR, "a.r-bcqeeo.r-3s2u2q.r-qvutc0")
+ return (anchor.get_attribute("href").split("/"), anchor.get_attribute("href"))
+ except Exception as ex:
+ print("Error at method find_status on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+ return []
+
+ @staticmethod
+ def __find_all_anchor_tags(tweet):
+ try:
+ return tweet.find_elements(By.TAG_NAME, 'a')
+ except Exception as ex:
+ print("Error at method find_all_anchor_tags on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __find_timestamp(tweet):
+ try:
+ timestamp = tweet.find_element(By.TAG_NAME,
+ "time").get_attribute("datetime")
+ posted_time = parse(timestamp).isoformat()
+ return posted_time
+ except Exception as ex:
+ print("Error at method find_timestamp on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __find_content(tweet):
+ try:
+ #content_element = tweet.find_element('.//*[@dir="auto"]')[4]
+ content_element = tweet.find_element(By.CSS_SELECTOR, 'div[lang]')
+ return content_element.text
+ except NoSuchElementException:
+ return ""
+ except Exception as ex:
+ print("Error at method find_content on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __find_like(tweet):
+ try:
+ like_element = tweet.find_element(
+ By.CSS_SELECTOR, '[data-testid="like"]')
+ likes = like_element.get_attribute("aria-label")
+ return Scraping_utilities._Scraping_utilities__extract_digits(likes)
+ except Exception as ex:
+ print("Error at method find_like on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __find_images(tweet):
+ try:
+ image_element = tweet.find_elements(By.CSS_SELECTOR,
+ 'div[data-testid="tweetPhoto"]')
+ images = []
+ for image_div in image_element:
+ href = image_div.find_element(By.TAG_NAME,
+ "img").get_attribute("src")
+ images.append(href)
+ return images
+ except Exception as ex:
+ print("Error at method __find_images on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __find_videos(tweet):
+ try:
+ image_element = tweet.find_elements(By.CSS_SELECTOR,
+ 'div[data-testid="videoPlayer"]')
+ videos = []
+ for video_div in image_element:
+ href = video_div.find_element(
+ By.TAG_NAME, "video").get_attribute("src")
+ videos.append(href)
+ return videos
+ except Exception as ex:
+ print("Error at method find_videos on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __is_retweet(tweet):
+ try:
+ tweet.find_element(By.CSS_SELECTOR, 'div.r-92ng3h.r-qvutc0')
+ return True
+ except NoSuchElementException:
+ return False
+ except Exception as ex:
+ print("Error at method is_retweet on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+ return False
+
+ @staticmethod
+ def __find_name_from_post(tweet, is_retweet=False):
+ try:
+ name = "NA"
+ anchors = Finder.__find_all_anchor_tags(tweet)
+ if len(anchors) > 2:
+ if is_retweet:
+ name = anchors[2].text.strip()
+ else:
+ name = anchors[1].text.split("\n")[0]
+ return name
+ except Exception as ex:
+ print("Error at method __find_name_from_post on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __find_external_link(tweet):
+ try:
+ card = tweet.find_element(
+ By.CSS_SELECTOR, '[data-testid="card.wrapper"]')
+ href = card.find_element(By.TAG_NAME, 'a')
+ return href.get_attribute("href")
+
+ except NoSuchElementException:
+ return ""
+ except Exception as ex:
+ print("Error at method __find_external_link on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
diff --git a/twitter_scraper_selenium/keyword.py b/twitter_scraper_selenium/keyword.py
index 0059b9b..84879eb 100644
--- a/twitter_scraper_selenium/keyword.py
+++ b/twitter_scraper_selenium/keyword.py
@@ -1,192 +1,211 @@
#!/usr/bin/env python3
try:
- from datetime import datetime,timedelta
- from .driver_initialization import Initializer
- from .driver_utils import Utilities
- from inspect import currentframe
- from .element_finder import Finder
- import re,json,os,csv
- from urllib.parse import quote
+ from datetime import datetime, timedelta
+ from .driver_initialization import Initializer
+ from .driver_utils import Utilities
+ from inspect import currentframe
+ from .element_finder import Finder
+ import re
+ import json
+ import os
+ import csv
+ from twitter_scraper_selenium.scraping_utilities import Scraping_utilities
except Exception as ex:
- print(ex)
+ print(ex)
frameinfo = currentframe()
+
class Keyword:
- """this class needs to be instantiated in order to find something
- on twitter related to keywords"""
-
- def __init__(self, keyword,browser,until,
- since, proxy, tweets_count):
- self.keyword = keyword
- self.URL = "https://twitter.com/search?q={}%20until%3A{}%20since%3A{}&src=typed_query&f=live".format(
- quote(keyword), until, since)
- self.__driver = ""
- self.browser= browser
- self.proxy = proxy
- self.tweets_count = tweets_count
- self.posts_data = {}
- self.retry = 10
- def __start_driver(self):
- """changes the class member __driver value to driver on call"""
- self.__driver = Initializer(self.browser, self.proxy).init()
-
- def __close_driver(self):
- self.__driver.close()
- self.__driver.quit()
-
- def __check_tweets_presence(self, tweet_list):
- if len(tweet_list) <= 0:
- self.retry -= 1
-
- def __check_retry(self):
- return self.retry <= 0
-
- def __fetch_and_store_data(self):
- try:
- all_ready_fetched_posts = []
- present_tweets = Finder._Finder__fetch_all_tweets(self.__driver)
- self.__check_tweets_presence(present_tweets)
- all_ready_fetched_posts.extend(present_tweets)
-
- while len(self.posts_data) < self.tweets_count:
- for tweet in present_tweets:
- name = Finder._Finder__find_name_from_post(tweet)
- status,tweet_url = Finder._Finder__find_status(tweet)
- replies = Finder._Finder__find_replies(tweet)
- retweets = Finder._Finder__find_shares(tweet)
- username = tweet_url.split("/")[3]
- status = status[-1]
- is_retweet = Finder._Finder__is_retweet(tweet)
- posted_time = Finder._Finder__find_timestamp(tweet)
- content = Finder._Finder__find_content(tweet)
- likes = Finder._Finder__find_like(tweet)
- images = Finder._Finder__find_images(tweet)
- videos = Finder._Finder__find_videos(tweet)
- hashtags = re.findall(r"#(\w+)", content)
- mentions = re.findall(r"@(\w+)", content)
- profile_picture = "https://twitter.com/{}/photo".format(username)
- link = Finder._Finder__find_external_link(tweet)
-
- self.posts_data[status] = {
- "tweet_id" : status,
- "username" : username,
- "name" : name,
- "profile_picture" : profile_picture,
- "replies" : replies,
- "retweets" : retweets,
- "likes":likes,
- "is_retweet" : is_retweet,
- "posted_time" : posted_time,
- "content" : content,
- "hashtags" : hashtags,
- "mentions" : mentions,
- "images" : images,
- "videos" : videos,
- "tweet_url" : tweet_url,
- "link" : link
- }
-
- Utilities._Utilities__scroll_down(self.__driver)
- Utilities._Utilities__wait_until_completion(self.__driver)
- Utilities._Utilities__wait_until_tweets_appear(self.__driver)
- present_tweets = Finder._Finder__fetch_all_tweets(self.__driver)
- present_tweets = [post for post in present_tweets if post not in all_ready_fetched_posts]
- self.__check_tweets_presence(present_tweets)
- all_ready_fetched_posts.extend(present_tweets)
- if self.__check_retry() is True:
- break
-
- except Exception as ex:
- print("Error at method scrap on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
-
- def scrap(self):
- try:
- self.__start_driver()
- self.__driver.get(self.URL)
- Utilities._Utilities__wait_until_completion(self.__driver)
- Utilities._Utilities__wait_until_tweets_appear(self.__driver)
- self.__fetch_and_store_data()
-
- self.__close_driver()
- data = dict(list(self.posts_data.items())[0:int(self.tweets_count)])
- return json.dumps(data)
-
- except Exception as ex:
- self.__close_driver()
- print("Error at method scrap on line no. {} : {}".format(frameinfo.f_lineno,ex))
-
-
-def json_to_csv(filename,json_data,directory):
- os.chdir(directory) #change working directory to given directory
- #headers of the CSV file
- fieldnames = ['tweet_id','username','name','profile_picture','replies',
- 'retweets','likes','is_retweet'
- ,'posted_time','content','hashtags','mentions',
- 'images', 'videos', 'tweet_url', 'link']
- #open and start writing to CSV files
- with open("{}.csv".format(filename),'w',newline='',encoding="utf-8") as data_file:
- writer = csv.DictWriter(data_file,fieldnames=fieldnames) #instantiate DictWriter for writing CSV fi
- writer.writeheader() #write headers to CSV file
- #iterate over entire dictionary, write each posts as a row to CSV file
- for key in json_data:
- #parse post in a dictionary and write it as a single row
- row = {
- "tweet_id" : key,
- "username" : json_data[key]['username'],
- "name" : json_data[key]['name'],
- "profile_picture" : json_data[key]['profile_picture'],
- "replies" : json_data[key]['replies'],
- "retweets" : json_data[key]['retweets'],
- "likes":json_data[key]['likes'],
- "is_retweet" : json_data[key]['is_retweet'],
- "posted_time" : json_data[key]['posted_time'],
- "content" : json_data[key]['content'],
- "hashtags" : json_data[key]['hashtags'],
- "mentions" : json_data[key]['mentions'],
- "images" : json_data[key]['images'],
- "videos" : json_data[key]['videos'],
- "tweet_url" : json_data[key]['tweet_url'],
- "link": json_data[key]['link']
-
- }
- writer.writerow(row) #write row to CSV fi
- data_file.close() #after writing close the file
-
-
-
-
-def scrap_keyword(keyword,browser="firefox",until=datetime.today().strftime('%Y-%m-%d'),
- since=(datetime.today() - timedelta(days=1)).strftime("%Y-%m-%d"), proxy=None, tweets_count=10, output_format="json", filename="", directory=os.getcwd()):
- """
- Returns tweets data in CSV or JSON.
-
- Parameters:
- keyword(string): Keyword to search on twitter.
-
- browser(string): Which browser to use for scraping?, Only 2 are supported Chrome and Firefox,default is set to Firefox.
-
- until(string): Optional parameter,Until date for scraping,a end date from where search ends. Format for date is YYYY-MM-DD.
-
- since(string): Optional parameter,Since date for scraping,a past date from where to search from. Format for date is YYYY-MM-DD.
-
- proxy(string): Optional parameter, if user wants to use proxy for scraping. If the proxy is authenticated proxy then the proxy format is username:password@host:port
-
- tweets_count(int): Number of posts to scrap. Default is 10.
-
- output_format(string): The output format, whether JSON or CSV. Default is JSON.
-
- filename(string): If output parameter is set to CSV, then it is necessary for filename parameter to passed. If not passed then the filename will be same as keyword passed.
-
- directory(string): If output parameter is set to CSV, then it is valid for directory parameter to be passed. If not passed then CSV file will be saved in current working directory.
-
- """
- keyword_bot = Keyword(keyword,browser=browser,until=until,since=since,proxy=proxy,tweets_count=tweets_count)
- data = keyword_bot.scrap()
- if output_format == "json":
- return data
- elif output_format.lower() == "csv":
- if filename == "":
- filename = keyword
- json_to_csv(filename=filename, json_data=json.loads(data), directory=directory)
+ """this class needs to be instantiated in order to find something
+ on twitter related to keywords"""
+
+ def __init__(self, keyword, browser, proxy, tweets_count, url, headless):
+ self.keyword = keyword
+ self.URL = url
+ self.driver = ""
+ self.browser = browser
+ self.proxy = proxy
+ self.tweets_count = tweets_count
+ self.posts_data = {}
+ self.retry = 10
+ self.headless = headless
+
+ def __start_driver(self):
+ """changes the class member __driver value to driver on call"""
+ self.__driver = Initializer(
+ self.browser, self.headless, self.proxy).init()
+
+ def __close_driver(self):
+ self.__driver.close()
+ self.__driver.quit()
+
+ def __check_tweets_presence(self, tweet_list):
+ if len(tweet_list) <= 0:
+ self.retry -= 1
+
+ def __check_retry(self):
+ return self.retry <= 0
+
+ def __fetch_and_store_data(self):
+ try:
+ all_ready_fetched_posts = []
+ present_tweets = Finder._Finder__fetch_all_tweets(self.__driver)
+ self.__check_tweets_presence(present_tweets)
+ all_ready_fetched_posts.extend(present_tweets)
+
+ while len(self.posts_data) < self.tweets_count:
+ for tweet in present_tweets:
+ name = Finder._Finder__find_name_from_post(tweet)
+ status, tweet_url = Finder._Finder__find_status(tweet)
+ replies = Finder._Finder__find_replies(tweet)
+ retweets = Finder._Finder__find_shares(tweet)
+ username = tweet_url.split("/")[3]
+ status = status[-1]
+ is_retweet = Finder._Finder__is_retweet(tweet)
+ posted_time = Finder._Finder__find_timestamp(tweet)
+ content = Finder._Finder__find_content(tweet)
+ likes = Finder._Finder__find_like(tweet)
+ images = Finder._Finder__find_images(tweet)
+ videos = Finder._Finder__find_videos(tweet)
+ hashtags = re.findall(r"#(\w+)", content)
+ mentions = re.findall(r"@(\w+)", content)
+ profile_picture = "https://twitter.com/{}/photo".format(
+ username)
+ link = Finder._Finder__find_external_link(tweet)
+
+ self.posts_data[status] = {
+ "tweet_id": status,
+ "username": username,
+ "name": name,
+ "profile_picture": profile_picture,
+ "replies": replies,
+ "retweets": retweets,
+ "likes": likes,
+ "is_retweet": is_retweet,
+ "posted_time": posted_time,
+ "content": content,
+ "hashtags": hashtags,
+ "mentions": mentions,
+ "images": images,
+ "videos": videos,
+ "tweet_url": tweet_url,
+ "link": link
+ }
+
+ Utilities._Utilities__scroll_down(self.__driver)
+ Utilities._Utilities__wait_until_completion(self.__driver)
+ Utilities._Utilities__wait_until_tweets_appear(self.__driver)
+ present_tweets = Finder._Finder__fetch_all_tweets(
+ self.__driver)
+ present_tweets = [
+ post for post in present_tweets if post not in all_ready_fetched_posts]
+ self.__check_tweets_presence(present_tweets)
+ all_ready_fetched_posts.extend(present_tweets)
+ if self.__check_retry() is True:
+ break
+
+ except Exception as ex:
+ print("Error at method scrap on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ def scrap(self):
+ try:
+ self.__start_driver()
+ self.__driver.get(self.URL)
+ Utilities._Utilities__wait_until_completion(self.__driver)
+ Utilities._Utilities__wait_until_tweets_appear(self.__driver)
+ self.__fetch_and_store_data()
+
+ self.__close_driver()
+ data = dict(list(self.posts_data.items())
+ [0:int(self.tweets_count)])
+ return json.dumps(data)
+
+ except Exception as ex:
+ self.__close_driver()
+ print(ex)
+ #print("Error at method scrap on line no. {} : {}".format(frameinfo.f_lineno,ex))
+
+
+def json_to_csv(filename, json_data, directory):
+ os.chdir(directory) # change working directory to given directory
+ # headers of the CSV file
+ fieldnames = ['tweet_id', 'username', 'name', 'profile_picture', 'replies',
+ 'retweets', 'likes', 'is_retweet', 'posted_time', 'content', 'hashtags', 'mentions',
+ 'images', 'videos', 'tweet_url', 'link']
+ # open and start writing to CSV files
+ with open("{}.csv".format(filename), 'w', newline='', encoding="utf-8") as data_file:
+ # instantiate DictWriter for writing CSV fi
+ writer = csv.DictWriter(data_file, fieldnames=fieldnames)
+ writer.writeheader() # write headers to CSV file
+ # iterate over entire dictionary, write each posts as a row to CSV file
+ for key in json_data:
+ # parse post in a dictionary and write it as a single row
+ row = {
+ "tweet_id": key,
+ "username": json_data[key]['username'],
+ "name": json_data[key]['name'],
+ "profile_picture": json_data[key]['profile_picture'],
+ "replies": json_data[key]['replies'],
+ "retweets": json_data[key]['retweets'],
+ "likes": json_data[key]['likes'],
+ "is_retweet": json_data[key]['is_retweet'],
+ "posted_time": json_data[key]['posted_time'],
+ "content": json_data[key]['content'],
+ "hashtags": json_data[key]['hashtags'],
+ "mentions": json_data[key]['mentions'],
+ "images": json_data[key]['images'],
+ "videos": json_data[key]['videos'],
+ "tweet_url": json_data[key]['tweet_url'],
+ "link": json_data[key]['link']
+
+ }
+ writer.writerow(row) # write row to CSV fi
+ data_file.close() # after writing close the file
+
+
+def scrap_keyword(keyword, browser="firefox", until=None,
+ since=None, since_id=None, max_id=None, within_time=None,
+ proxy=None, tweets_count=10, output_format="json",
+ filename="", directory=os.getcwd(), headless=True):
+ """
+ Returns tweets data in CSV or JSON.
+
+ Parameters:
+ keyword(string): Keyword to search on twitter.
+
+ browser(string): Which browser to use for scraping?, Only 2 are supported Chrome and Firefox,default is set to Firefox.
+
+ until(string): Optional parameter,Until date for scraping,a end date from where search ends. Format for date is YYYY-MM-DD or unix timestamp in seconds.
+
+ since(string): Optional parameter,Since date for scraping,a past date from where to search from. Format for date is YYYY-MM-DD or unix timestamp in seconds..
+
+ proxy(string): Optional parameter, if user wants to use proxy for scraping. If the proxy is authenticated proxy then the proxy format is username:password@host:port
+
+ tweets_count(int): Number of posts to scrap. Default is 10.
+
+ output_format(string): The output format, whether JSON or CSV. Default is JSON.
+
+ filename(string): If output parameter is set to CSV, then it is necessary for filename parameter to passed. If not passed then the filename will be same as keyword passed.
+
+ directory(string): If output parameter is set to CSV, then it is valid for directory parameter to be passed. If not passed then CSV file will be saved in current working directory.
+
+ since_id(integer): After (NOT inclusive) a specified Snowflake ID.
+
+ max_id(integer): At or before (inclusive) a specified Snowflake ID.
+
+ within_time(string): Search within the last number of days, hours, minutes, or seconds.
+ """
+ URL = Scraping_utilities._Scraping_utilities__url_generator(keyword, since=since, until=until,
+ since_id=since_id, max_id=max_id, within_time=within_time)
+ keyword_bot = Keyword(keyword, browser=browser, url=URL,
+ proxy=proxy, tweets_count=tweets_count, headless=headless)
+ data = keyword_bot.scrap()
+ if output_format == "json":
+ return data
+ elif output_format.lower() == "csv":
+ if filename == "":
+ filename = keyword
+ json_to_csv(filename=filename, json_data=json.loads(
+ data), directory=directory)
diff --git a/twitter_scraper_selenium/profile.py b/twitter_scraper_selenium/profile.py
index d5b14e4..a7be932 100644
--- a/twitter_scraper_selenium/profile.py
+++ b/twitter_scraper_selenium/profile.py
@@ -1,188 +1,198 @@
#!/usr/bin/env python3
try:
- from .driver_initialization import Initializer
- from .driver_utils import Utilities
- from inspect import currentframe
- from .element_finder import Finder
- from .driver_utils import Utilities
- import re,json,csv,os
+ from .driver_initialization import Initializer
+ from .driver_utils import Utilities
+ from inspect import currentframe
+ from .element_finder import Finder
+ from .driver_utils import Utilities
+ import re
+ import json
+ import csv
+ import os
except Exception as ex:
- frameinfo = currentframe()
- print("Error on line no. {} : {}".format(frameinfo.f_lineno,ex))
+ frameinfo = currentframe()
+ print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
frameinfo = currentframe()
-class Profile:
- """this class needs to be instantiated in orer to scrape post of some
- twitter profile"""
-
- def __init__(self, twitter_username, browser, proxy, tweets_count):
- self.twitter_username = twitter_username
- self.URL = "https://twitter.com/{}".format(twitter_username.lower())
- self.__driver = ""
- self.browser = browser
- self.proxy = proxy
- self.tweets_count = tweets_count
- self.posts_data = {}
- self.retry = 10
-
- def __start_driver(self):
- """changes the class member __driver value to driver on call"""
- self.__driver = Initializer(self.browser,self.proxy).init()
-
- def __close_driver(self):
- self.__driver.close()
- self.__driver.quit()
-
- def __check_tweets_presence(self,tweet_list):
- if len(tweet_list) <= 0:
- self.retry -= 1
-
- def __check_retry(self):
- return self.retry <= 0
-
- def __fetch_and_store_data(self):
- try:
- all_ready_fetched_posts = []
- present_tweets = Finder._Finder__fetch_all_tweets(self.__driver)
- self.__check_tweets_presence(present_tweets)
- all_ready_fetched_posts.extend(present_tweets)
-
- while len(self.posts_data) < self.tweets_count:
- for tweet in present_tweets:
- status,tweet_url = Finder._Finder__find_status(tweet)
- replies = Finder._Finder__find_replies(tweet)
- retweets = Finder._Finder__find_shares(tweet)
- status = status[-1]
- username = tweet_url.split("/")[3]
- is_retweet = True if self.twitter_username.lower() != username.lower() else False
- name = Finder._Finder__find_name_from_post(tweet,is_retweet)
- retweet_link = tweet_url if is_retweet is True else ""
- posted_time = Finder._Finder__find_timestamp(tweet)
- content = Finder._Finder__find_content(tweet)
- likes = Finder._Finder__find_like(tweet)
- images = Finder._Finder__find_images(tweet)
- videos = Finder._Finder__find_videos(tweet)
- hashtags = re.findall(r"#(\w+)", content)
- mentions = re.findall(r"@(\w+)", content)
- profile_picture = "https://twitter.com/{}/photo".format(username)
- link = Finder._Finder__find_external_link(tweet)
- self.posts_data[status] = {
- "tweet_id" : status,
- "username" : username,
- "name" : name,
- "profile_picture" : profile_picture,
- "replies" : replies,
- "retweets" : retweets,
- "likes":likes,
- "is_retweet" : is_retweet,
- "retweet_link" : retweet_link,
- "posted_time" : posted_time,
- "content" : content,
- "hashtags" : hashtags,
- "mentions" : mentions,
- "images" : images,
- "videos" : videos,
- "tweet_url" : tweet_url,
- "link" : link
- }
-
- Utilities._Utilities__scroll_down(self.__driver)
- Utilities._Utilities__wait_until_completion(self.__driver)
- Utilities._Utilities__wait_until_tweets_appear(self.__driver)
- present_tweets = Finder._Finder__fetch_all_tweets(self.__driver)
- present_tweets = [post for post in present_tweets if post not in all_ready_fetched_posts]
- self.__check_tweets_presence(present_tweets)
- all_ready_fetched_posts.extend(present_tweets)
- if self.__check_retry() is True:
- break
-
- except Exception as ex:
- print("Error at method scrap on line no. {} : {}".format(
- frameinfo.f_lineno, ex))
-
-
- def scrap(self):
- try:
- self.__start_driver()
- self.__driver.get(self.URL)
- Utilities._Utilities__wait_until_completion(self.__driver)
- Utilities._Utilities__wait_until_tweets_appear(self.__driver)
- self.__fetch_and_store_data()
- self.__close_driver()
- data = dict(list(self.posts_data.items())[0:int(self.tweets_count)])
- return json.dumps(data)
- except Exception as ex:
- self.__close_driver()
- print("Error at method scrap on line no. {} : {}".format(frameinfo.f_lineno,ex))
-
-
-
-
-def json_to_csv(filename,json_data,directory):
- os.chdir(directory) #change working directory to given directory
- #headers of the CSV file
- fieldnames = ['tweet_id','username','name','profile_picture','replies',
- 'retweets','likes','is_retweet'
- ,'retweet_link','posted_time','content','hashtags','mentions',
- 'images', 'videos', 'tweet_url', 'link']
- #open and start writing to CSV files
- with open("{}.csv".format(filename),'w',newline='',encoding="utf-8") as data_file:
- writer = csv.DictWriter(data_file,fieldnames=fieldnames) #instantiate DictWriter for writing CSV fi
- writer.writeheader() #write headers to CSV file
- #iterate over entire dictionary, write each posts as a row to CSV file
- for key in json_data:
- #parse post in a dictionary and write it as a single row
- row = {
- "tweet_id" : key,
- "username" : json_data[key]['username'],
- "name" : json_data[key]['name'],
- "profile_picture" : json_data[key]['profile_picture'],
- "replies" : json_data[key]['replies'],
- "retweets" : json_data[key]['retweets'],
- "likes":json_data[key]['likes'],
- "is_retweet" : json_data[key]['is_retweet'],
- "retweet_link" : json_data[key]['retweet_link'],
- "posted_time" : json_data[key]['posted_time'],
- "content" : json_data[key]['content'],
- "hashtags" : json_data[key]['hashtags'],
- "mentions" : json_data[key]['mentions'],
- "images" : json_data[key]['images'],
- "videos" : json_data[key]['videos'],
- "tweet_url" : json_data[key]['tweet_url'],
- "link": json_data[key]['link']
- }
- writer.writerow(row) #write row to CSV fi
- data_file.close() #after writing close the file
-
-
-def scrap_profile(twitter_username,browser="firefox",proxy=None, tweets_count=10, output_format="json",filename="",directory=os.getcwd()):
- """
- Returns tweets data in CSV or JSON.
-
- Parameters:
- twitter_username(string): twitter username of the account.
-
- browser(string): Which browser to use for scraping?, Only 2 are supported Chrome and Firefox. Default is set to Firefox
-
- proxy(string): Optional parameter, if user wants to use proxy for scraping. If the proxy is authenticated proxy then the proxy format is username:password@host:port
-
- tweets_count(int): Number of posts to scrap. Default is 10.
-
- output_format(string): The output format, whether JSON or CSV. Default is JSON.
-
- filename(string): If output_format parameter is set to CSV, then it is necessary for filename parameter to passed. If not passed then the filename will be same as keyword passed.
-
- directory(string): If output_format parameter is set to CSV, then it is valid for directory parameter to be passed. If not passed then CSV file will be saved in current working directory.
-
-
- """
- profile_bot = Profile(twitter_username, browser, proxy, tweets_count)
- data = profile_bot.scrap()
- if output_format == "json":
- return data
- elif output_format.lower() == "csv":
- if filename == "":
- filename = twitter_username
- json_to_csv(filename=filename, json_data=json.loads(data), directory=directory)
+class Profile:
+ """this class needs to be instantiated in orer to scrape post of some
+ twitter profile"""
+
+ def __init__(self, twitter_username, browser, proxy, tweets_count, headless):
+ self.twitter_username = twitter_username
+ self.URL = "https://twitter.com/{}".format(twitter_username.lower())
+ self.__driver = ""
+ self.browser = browser
+ self.proxy = proxy
+ self.tweets_count = tweets_count
+ self.posts_data = {}
+ self.retry = 10
+ self.headless = headless
+
+ def __start_driver(self):
+ """changes the class member __driver value to driver on call"""
+ self.__driver = Initializer(
+ self.browser, self.headless, self.proxy).init()
+
+ def __close_driver(self):
+ self.__driver.close()
+ self.__driver.quit()
+
+ def __check_tweets_presence(self, tweet_list):
+ if len(tweet_list) <= 0:
+ self.retry -= 1
+
+ def __check_retry(self):
+ return self.retry <= 0
+
+ def __fetch_and_store_data(self):
+ try:
+ all_ready_fetched_posts = []
+ present_tweets = Finder._Finder__fetch_all_tweets(self.__driver)
+ self.__check_tweets_presence(present_tweets)
+ all_ready_fetched_posts.extend(present_tweets)
+
+ while len(self.posts_data) < self.tweets_count:
+ for tweet in present_tweets:
+ status, tweet_url = Finder._Finder__find_status(tweet)
+ replies = Finder._Finder__find_replies(tweet)
+ retweets = Finder._Finder__find_shares(tweet)
+ status = status[-1]
+ username = tweet_url.split("/")[3]
+ is_retweet = True if self.twitter_username.lower() != username.lower() else False
+ name = Finder._Finder__find_name_from_post(
+ tweet, is_retweet)
+ retweet_link = tweet_url if is_retweet is True else ""
+ posted_time = Finder._Finder__find_timestamp(tweet)
+ content = Finder._Finder__find_content(tweet)
+ likes = Finder._Finder__find_like(tweet)
+ images = Finder._Finder__find_images(tweet)
+ videos = Finder._Finder__find_videos(tweet)
+ hashtags = re.findall(r"#(\w+)", content)
+ mentions = re.findall(r"@(\w+)", content)
+ profile_picture = "https://twitter.com/{}/photo".format(
+ username)
+ link = Finder._Finder__find_external_link(tweet)
+ self.posts_data[status] = {
+ "tweet_id": status,
+ "username": username,
+ "name": name,
+ "profile_picture": profile_picture,
+ "replies": replies,
+ "retweets": retweets,
+ "likes": likes,
+ "is_retweet": is_retweet,
+ "retweet_link": retweet_link,
+ "posted_time": posted_time,
+ "content": content,
+ "hashtags": hashtags,
+ "mentions": mentions,
+ "images": images,
+ "videos": videos,
+ "tweet_url": tweet_url,
+ "link": link
+ }
+
+ Utilities._Utilities__scroll_down(self.__driver)
+ Utilities._Utilities__wait_until_completion(self.__driver)
+ Utilities._Utilities__wait_until_tweets_appear(self.__driver)
+ present_tweets = Finder._Finder__fetch_all_tweets(
+ self.__driver)
+ present_tweets = [
+ post for post in present_tweets if post not in all_ready_fetched_posts]
+ self.__check_tweets_presence(present_tweets)
+ all_ready_fetched_posts.extend(present_tweets)
+ if self.__check_retry() is True:
+ break
+
+ except Exception as ex:
+ print("Error at method scrap on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+ def scrap(self):
+ try:
+ self.__start_driver()
+ self.__driver.get(self.URL)
+ Utilities._Utilities__wait_until_completion(self.__driver)
+ Utilities._Utilities__wait_until_tweets_appear(self.__driver)
+ self.__fetch_and_store_data()
+ self.__close_driver()
+ data = dict(list(self.posts_data.items())
+ [0:int(self.tweets_count)])
+ return json.dumps(data)
+ except Exception as ex:
+ self.__close_driver()
+ print("Error at method scrap on line no. {} : {}".format(
+ frameinfo.f_lineno, ex))
+
+
+def json_to_csv(filename, json_data, directory):
+ os.chdir(directory) # change working directory to given directory
+ # headers of the CSV file
+ fieldnames = ['tweet_id', 'username', 'name', 'profile_picture', 'replies',
+ 'retweets', 'likes', 'is_retweet', 'retweet_link', 'posted_time', 'content', 'hashtags', 'mentions',
+ 'images', 'videos', 'tweet_url', 'link']
+ # open and start writing to CSV files
+ with open("{}.csv".format(filename), 'w', newline='', encoding="utf-8") as data_file:
+ # instantiate DictWriter for writing CSV fi
+ writer = csv.DictWriter(data_file, fieldnames=fieldnames)
+ writer.writeheader() # write headers to CSV file
+ # iterate over entire dictionary, write each posts as a row to CSV file
+ for key in json_data:
+ # parse post in a dictionary and write it as a single row
+ row = {
+ "tweet_id": key,
+ "username": json_data[key]['username'],
+ "name": json_data[key]['name'],
+ "profile_picture": json_data[key]['profile_picture'],
+ "replies": json_data[key]['replies'],
+ "retweets": json_data[key]['retweets'],
+ "likes": json_data[key]['likes'],
+ "is_retweet": json_data[key]['is_retweet'],
+ "retweet_link": json_data[key]['retweet_link'],
+ "posted_time": json_data[key]['posted_time'],
+ "content": json_data[key]['content'],
+ "hashtags": json_data[key]['hashtags'],
+ "mentions": json_data[key]['mentions'],
+ "images": json_data[key]['images'],
+ "videos": json_data[key]['videos'],
+ "tweet_url": json_data[key]['tweet_url'],
+ "link": json_data[key]['link']
+ }
+ writer.writerow(row) # write row to CSV fi
+ data_file.close() # after writing close the file
+
+
+def scrap_profile(twitter_username, browser="firefox", proxy=None, tweets_count=10, output_format="json", filename="", directory=os.getcwd(), headless=True):
+ """
+ Returns tweets data in CSV or JSON.
+
+ Parameters:
+ twitter_username(string): twitter username of the account.
+
+ browser(string): Which browser to use for scraping?, Only 2 are supported Chrome and Firefox. Default is set to Firefox
+
+ proxy(string): Optional parameter, if user wants to use proxy for scraping. If the proxy is authenticated proxy then the proxy format is username:password@host:port
+
+ tweets_count(int): Number of posts to scrap. Default is 10.
+
+ output_format(string): The output format, whether JSON or CSV. Default is JSON.
+
+ filename(string): If output_format parameter is set to CSV, then it is necessary for filename parameter to passed. If not passed then the filename will be same as keyword passed.
+
+ directory(string): If output_format parameter is set to CSV, then it is valid for directory parameter to be passed. If not passed then CSV file will be saved in current working directory.
+
+
+ """
+ profile_bot = Profile(twitter_username, browser,
+ proxy, tweets_count, headless)
+ data = profile_bot.scrap()
+ if output_format == "json":
+ return data
+ elif output_format.lower() == "csv":
+ if filename == "":
+ filename = twitter_username
+ json_to_csv(filename=filename, json_data=json.loads(
+ data), directory=directory)
diff --git a/twitter_scraper_selenium/scraping_utilities.py b/twitter_scraper_selenium/scraping_utilities.py
index e2fd5e0..1e0f3d9 100644
--- a/twitter_scraper_selenium/scraping_utilities.py
+++ b/twitter_scraper_selenium/scraping_utilities.py
@@ -1,26 +1,56 @@
#!/usr/bin/env python3
try:
- from inspect import currentframe
- import re
+ from inspect import currentframe
+ import re
+ from urllib.parse import quote
except Exception as ex:
- frameinfo = currentframe()
- print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
+ frameinfo = currentframe()
+ print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
frameinfo = currentframe()
+
class Scraping_utilities:
- @staticmethod
- def __parse_name(string):
- try:
- return string.split("(")[0].strip()
- except:
- print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
-
- @staticmethod
- def __extract_digits(string):
- try:
- return int(re.search(r'\d+', string).group(0))
- except Exception as ex:
- print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
+ @staticmethod
+ def __parse_name(string):
+ try:
+ return string.split("(")[0].strip()
+ except:
+ print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __extract_digits(string):
+ try:
+ return int(re.search(r'\d+', string).group(0))
+ except Exception as ex:
+ print("Error on line no. {} : {}".format(frameinfo.f_lineno, ex))
+
+ @staticmethod
+ def __set_value_or_none(value, string):
+ return string+str(value)+" " if value is not None else None
+
+ @staticmethod
+ def __url_generator(keyword, since=None, until=None,
+ since_id=None, max_id=None, within_time=None):
+ base_url = "https://twitter.com/search?q="
+ if within_time is None:
+ words = [Scraping_utilities.__set_value_or_none(since, "since:"),
+ Scraping_utilities.__set_value_or_none(
+ until, "until:"),
+ Scraping_utilities.__set_value_or_none(
+ since_id, "since_id:"), Scraping_utilities.__set_value_or_none(max_id, "max_id:")]
+ query = ""
+ for word in words:
+ if word is not None:
+ query += word
+ query += keyword
+ query = quote(query)
+ base_url = base_url + query + "&src=typed_query&f=live"
+ else:
+ word = Scraping_utilities.__set_value_or_none(
+ within_time, "within_time:")
+ query = keyword + " " + word
+ base_url = base_url + quote(query) + "&src=typed_query&f=live"
+ return base_url