-
Notifications
You must be signed in to change notification settings - Fork 0
/
socket-integration-example.py
83 lines (75 loc) · 2.56 KB
/
socket-integration-example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import json
import logging
import os
from socketsync.core import Core
from socketsync.connectors.elastic import Elastic
from socketsync.connectors.bigquery import BigQuery
from socketsync.connectors.panther import Panther
from socketsync.connectors.csv import CSV
from socketsync.connectors.webhook import Webhook
from socketsync.connectors.slack import Slack
from datetime import datetime, timezone
start_date = "2024-09-10 10:00"
start_time = datetime.strptime(start_date, "%Y-%m-%d %H:%M").replace(tzinfo=timezone.utc)
end_time = datetime.now(timezone.utc)
from_time = int((end_time - start_time).total_seconds())
if __name__ == '__main__':
api_key = os.getenv("SOCKET_API_KEY") or exit(1)
# from_time = os.getenv("FROM_TIME") or 300
default_branches = [
"master",
"main"
]
core = Core(
api_key=api_key,
from_time=from_time,
request_timeout=300
)
# logging.basicConfig(level=logging.DEBUG)
# core.set_log_level(logging.DEBUG)
issue_data = core.get_issues()
# CSV Example
csv_file = "example.csv"
csv = CSV(
file=csv_file
)
csv.write_csv(issue_data)
# Elasticsearch Example
elastic_token = os.getenv('ELASTIC_TOKEN') or exit(1)
elastic_cloud_id = os.getenv('ELASTIC_CLOUD_ID') or exit(1)
elastic_index = os.getenv('ELASTIC_ID') or exit(1)
es = Elastic(
api_key=elastic_token,
cloud_id=elastic_cloud_id
)
for issue in issue_data:
es.add_document(issue, elastic_index)
# Big Query Example
bigquery_table = os.getenv('GOOGLE_TABLE') or exit(1)
bigquery = BigQuery(bigquery_table)
errors = bigquery.add_dataset(issue_data, streaming=True)
# Panther SIEM Integration
panther_url = os.getenv('PANTHER_URL') or exit(1)
panther_token = os.getenv('PANTHER_TOKEN') or exit(1)
panther = Panther(
token=panther_token,
url=panther_url
)
for issue in issue_data:
issue_json = json.loads(str(issue))
panther.send(str(issue))
print(f"Processed issue id: {issue.id}")
# Webhook Example
webhook_url = os.getenv("WEBHOOK_URL") or exit(1)
webhook_auth_headers = os.getenv("WEBHOOK_AUTH_HEADERS") or {
'Authorization': 'Bearer EXAMPLE'
}
webhook = Webhook(webhook_url)
for issue in issue_data:
issue_json = json.loads(str(issue))
webhook.send(issue_json)
slack_url = os.getenv("SLACK_WEBHOOK_URL") or exit(1)
slack = Slack(slack_url)
for issue in issue_data:
issue_json = json.loads(str(issue))
slack.send(issue_json)