-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path1_phrase_sampling.py
executable file
·129 lines (100 loc) · 3.98 KB
/
1_phrase_sampling.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python
# coding: utf-8
""" Sample tweets containing specific phrases
@inputs: list of keywords to search (required argument)
@outputs: list of tweets containing the keywords (.csv file)
@usage:
-----
```bash
python -m jobs.sampling.sample_phrases \
--input "data/PROJECT/FILEPATH.json" \
--output output/ \
--phrases \
"love" \
--limit 5 \
--lang "en" \
--preprocessing_choices 'stopwords' 'lowercase' 'stemmed'
```
"""
###############################################
# Import packages
###############################################
import pyspark.sql.functions as f
from shared.base_job import BaseJob
from shared.job_helpers import construct_output_filename
###############################################
# Define functions
###############################################
class SamplePhrasesJob(BaseJob):
def __init__(self):
# invoke `BaseJob` constructor
super().__init__(name="Sample Tweets by Phrase")
# support additional parameters
self.parser.add_argument(
"--phrases",
required=True,
nargs="+",
help="one or more words or phrases to search in `full_text`"
)
self.parser.add_argument(
"--limit",
type=int,
help="number of tweets to return"
)
self.parser.add_argument(
"--phrase_conditional",
choices=['AND', 'OR'],
default='OR',
help="conditional expressions for phrases"
)
# support for additional attributes
self.parser.add_argument(
"--additional_col_attributes", nargs="+",
help="Additional attributes to include in the output "
)
# Default pre-processing options
self.parser.set_defaults(preprocessing_choices=['lowercase', 'contractions', 'punctuation'])
def process(self):
# construct output filename from parameters
filename_components = [self.args.start_date, self.args.end_date,
self.args.dataset, "Sample", self.args.limit,
self.args.target_attr]
output = construct_output_filename(None, self.args.output, filename_components)
self.args.output = output + ".csv"
print("output file: ", self.args.output)
# OR conditional: one of the phrases must be present in the text
if self.args.phrase_conditional == 'OR':
PHRASE_REGEX = r'(^|\s)(' + '|'.join(self.args.phrases).lower() + r')(\s|$)'
# AND conditional - all of the phrases must be present in the text
# - https://regex101.com/r/i2jy0J/4
# - https://www.ocpsoft.org/tutorials/regular-expressions/and-in-regex/
elif self.args.phrase_conditional == 'AND':
PHRASE_REGEX = "".join([r"(?=.*\b" + w.lower() + r"\b)" for w in self.args.phrases])
# Finding entries matching regular expression
self.df = self.df.where(f.lower(f.col(self.args.target_attr)).rlike(PHRASE_REGEX))
# Limit number of tweets returned if optional argument is provided
if self.args.limit:
self.df = self.df \
.orderBy(f.rand()) \
.limit(self.args.limit) \
# if we need to add any additional attributes to the output
if self.args.additional_col_attributes:
# select appropriate default columns
cols = [
"id_str",
"date",
self.args.target_attr,
]
for attr in self.args.additional_col_attributes:
if attr in self.df.columns:
cols += [attr]
self.df = self.df.select(cols)
# write out default cols
else:
self.df = self.df.select("id_str", "date", self.args.target_attr)
###############################################
# Execute functions
###############################################
if __name__ == "__main__":
sp = SamplePhrasesJob()
sp.run()