-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathinference_job.py
62 lines (53 loc) · 2.34 KB
/
inference_job.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from typing import Any, Optional
import pandas as pd
import torch
import numpy as np
from transformers import pipeline
class InferenceJob():
def __init__(self, bucket: str, final_bucket: str, minio_client: Any, column: str, task: str):
self.bucket = bucket
self.final_bucket = final_bucket
self.minio_client=minio_client
self.column = column
self.classifier = None
self.date = None
self.task = task
self.labels = ["a real animal",
"a toy",
"a print of an animal",
"an object",
"a faux animal",
"an animal body part",
"an faux animal body part"]
self.hypothesis_template = 'This product advertisement is about {}.'
def perform_clf(self):
files = self.minio_client.list_objects_names(self.bucket, None)
for file in files:
filename = file.split(".")[0]
df = self.minio_client.read_df_parquet(bucket= self.bucket, file_name=file)
if not df.empty:
df = self.get_inference(df=df)
self.minio_client.save_df_parquet(self.final_bucket, filename, df)
else:
continue
def get_inference(self, df: pd.DataFrame) -> pd.DataFrame:
classifier = self.maybe_load_classifier(task=None)
## Product column
df[self.column] = df[self.column].fillna("")
not_empty_filter = (df[self.column] != "")
inputs = df[not_empty_filter][self.column].to_list()
results = classifier(inputs, self.labels, hypothesis_template=self.hypothesis_template)
# Set the results to a new column in the dataframe
labels = [result['labels'][0] if result is not None else np.nan for result in results]
scores = [result['scores'][0] if result is not None else np.nan for result in results]
df.loc[not_empty_filter, "label"] = labels
df.loc[not_empty_filter, "score"] = scores
return df
def maybe_load_classifier(self, model: Optional[str] = 'facebook/bart-large-mnli'):
if not self.classifier:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.classifier = pipeline(self.task,
model=model,
device=device)
print(type(self.classifier))
return self.classifier