Skip to content

Commit

Permalink
Merge pull request #1012 from Johann-PLW/main
Browse files Browse the repository at this point in the history
Update does_column_exist_in_db function
  • Loading branch information
Johann-PLW authored Jan 9, 2025
2 parents 89fe120 + bbf1d4d commit dce59f3
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 86 deletions.
10 changes: 5 additions & 5 deletions scripts/artifacts/burner.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def get_burner_accounts(files_found, report_folder, seeker, wrap_text, timezone_
db = open_sqlite_db_readonly(file_found)
cursor = db.cursor()

if does_column_exist_in_db(db, 'ZBURNER', 'ZUSER'):
if does_column_exist_in_db(file_found, 'ZBURNER', 'ZUSER'):
cursor.execute('''
SELECT
U.Z_PK,
Expand Down Expand Up @@ -302,7 +302,7 @@ def get_burner_numbers(files_found, report_folder, seeker, wrap_text, timezone_o
report_file = file_found
db = open_sqlite_db_readonly(file_found)
cursor = db.cursor()
if does_column_exist_in_db(db, 'ZBURNER', 'ZUSER'):
if does_column_exist_in_db(file_found, 'ZBURNER', 'ZUSER'):
cursor.execute('''
SELECT
B.Z_PK,
Expand Down Expand Up @@ -428,17 +428,17 @@ def get_burner_messages(files_found, report_folder, seeker, wrap_text, timezone_
report_file = file_found if (file_found == 'Unknown') else report_file + ', ' + file_found
db = open_sqlite_db_readonly(file_found)
# 5.4.11
if does_column_exist_in_db(db, 'ZMESSAGE', 'ZCONVERSATIONID'):
if does_column_exist_in_db(file_found, 'ZMESSAGE', 'ZCONVERSATIONID'):
extra_join_thread = ' OR (M.ZMESSAGETHREAD IS NULL AND M.ZBURNERID = MT.ZBURNERID AND M.ZCONVERSATIONID = MT.ZCONVERSATIONID)'
# 5.3.8
elif does_column_exist_in_db(db, 'ZMESSAGE', 'M.ZCONTACTID'):
elif does_column_exist_in_db(file_found, 'ZMESSAGE', 'M.ZCONTACTID'):
extra_join_thread = ' OR (M.ZMESSAGETHREAD IS NULL AND M.ZBURNERID = MT.ZBURNERID AND coalesce(M.ZCONTACTID, M.ZCONTACTPHONENUMBER) = MT.ZCONTACTPHONENUMBER)'
# 4.0.18, 4.3.3
else:
extra_join_thread = '' # OR (M.ZMESSAGETHREAD IS NULL AND M.ZBURNERID = MT.ZBURNERID AND M.ZCONTACTPHONENUMBER = MT.ZCONTACTPHONENUMBER)'

# 5.3.8, 5.4.11
if does_column_exist_in_db(db, 'ZMESSAGE', 'ZCONTACTID'):
if does_column_exist_in_db(file_found, 'ZMESSAGE', 'ZCONTACTID'):
extra_join_contact = ' OR (MT.ZCONTACT IS NULL AND coalesce(M.ZCONTACTID, M.ZCONTACTPHONENUMBER) = C.ZPHONENUMBER)'
# 4.0.18 e 4.3.3
else:
Expand Down
2 changes: 1 addition & 1 deletion scripts/artifacts/calendarAll.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def calendarEvents(files_found, report_folder, seeker, wrap_text, timezone_offse
GROUP BY Attachment.owner_id) AS 'Attachments',
'''

conference_url_detected_exists = does_column_exist_in_db(db, 'CalendarItem', 'conference_url_detected')
conference_url_detected_exists = does_column_exist_in_db(file_found, 'CalendarItem', 'conference_url_detected')
conference_url = f"CalendarItem.conference_url{'_detected' if conference_url_detected_exists else ''} AS 'Conference URL',"


Expand Down
2 changes: 1 addition & 1 deletion scripts/artifacts/chrome.py
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ def chromeDownloads(files_found, report_folder, seeker, wrap_text, timezone_offs

#Downloads
# check for last_access_time column, an older version of chrome db (32) does not have it
if does_column_exist_in_db(db, 'downloads', 'last_access_time') == True:
if does_column_exist_in_db(file_found, 'downloads', 'last_access_time') == True:
last_access_time_query = '''
CASE last_access_time
WHEN "0"
Expand Down
4 changes: 2 additions & 2 deletions scripts/artifacts/knowledgeC.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def knowledgeC_DevicePluginStatus(files_found, report_folder, seeker, wrap_text,
cursor = db.cursor()

does_adapteriswireless_exist = does_column_exist_in_db(
db, 'ZSTRUCTUREDMETADATA', 'Z_DKDEVICEISPLUGGEDINMETADATAKEY__ADAPTERISWIRELESS')
db_file, 'ZSTRUCTUREDMETADATA', 'Z_DKDEVICEISPLUGGEDINMETADATAKEY__ADAPTERISWIRELESS')
if does_adapteriswireless_exist:
adapter_is_wireless = '''
CASE ZSTRUCTUREDMETADATA.Z_DKDEVICEISPLUGGEDINMETADATAKEY__ADAPTERISWIRELESS
Expand Down Expand Up @@ -173,7 +173,7 @@ def knowledgeC_MediaPlaying(files_found, report_folder, seeker, wrap_text, timez
cursor = db.cursor()

does_airplayvideo_exist = does_column_exist_in_db(
db, 'ZSTRUCTUREDMETADATA', 'Z_DKNOWPLAYINGMETADATAKEY__ISAIRPLAYVIDEO')
db_file, 'ZSTRUCTUREDMETADATA', 'Z_DKNOWPLAYINGMETADATAKEY__ISAIRPLAYVIDEO')
if does_airplayvideo_exist:
is_airplay_video = '''
CASE ZSTRUCTUREDMETADATA.Z_DKNOWPLAYINGMETADATAKEY__ISAIRPLAYVIDEO
Expand Down
6 changes: 3 additions & 3 deletions scripts/artifacts/notes.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_notes(files_found, report_folder, seeker, wrap_text, timezone_offset):
db = open_sqlite_db_readonly(file_found)
cursor = db.cursor()

if does_column_exist_in_db(db, 'ZICCLOUDSYNCINGOBJECT','ZACCOUNT4') == True and does_column_exist_in_db(db, 'ZICCLOUDSYNCINGOBJECT','ZCREATIONDATE3') == True:
if does_column_exist_in_db(file_found, 'ZICCLOUDSYNCINGOBJECT','ZACCOUNT4') == True and does_column_exist_in_db(file_found, 'ZICCLOUDSYNCINGOBJECT','ZCREATIONDATE3') == True:

cursor.execute('''
SELECT
Expand Down Expand Up @@ -59,7 +59,7 @@ def get_notes(files_found, report_folder, seeker, wrap_text, timezone_offset):
LEFT JOIN ZICNOTEDATA TabF on TabF.ZNOTE = TabA.Z_PK
''')

elif does_column_exist_in_db(db, 'ZICCLOUDSYNCINGOBJECT','ZACCOUNT4') == True and does_column_exist_in_db(db, 'ZICCLOUDSYNCINGOBJECT','ZCREATIONDATE3') == False:
elif does_column_exist_in_db(file_found, 'ZICCLOUDSYNCINGOBJECT','ZACCOUNT4') == True and does_column_exist_in_db(file_found, 'ZICCLOUDSYNCINGOBJECT','ZCREATIONDATE3') == False:
cursor.execute('''
SELECT
DATETIME(TabA.ZCREATIONDATE1+978307200,'UNIXEPOCH'),
Expand Down Expand Up @@ -246,7 +246,7 @@ def ReadLengthField(blob):
data_length = int(blob[skip])
length = ((data_length & 0x7F) << (skip * 7)) + length
except (IndexError, ValueError):
log.exception('Error trying to read length field in note data blob')
logfunc('Error trying to read length field in note data blob')
skip += 1
return length, skip

Expand Down
4 changes: 2 additions & 2 deletions scripts/artifacts/reminders.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def get_reminders(files_found, report_folder, seeker, wrap_text, timezone_offset

if file_found.endswith('.sqlite'):
db = open_sqlite_db_readonly(file_found)
answer = does_column_exist_in_db(db, 'ZREMCDOBJECT', 'ZLASTMODIFIEDDATE')
answer = does_column_exist_in_db(file_found, 'ZREMCDOBJECT', 'ZLASTMODIFIEDDATE')

if answer:
#db = open_sqlite_db_readonly(file_found)
Expand All @@ -40,7 +40,7 @@ def get_reminders(files_found, report_folder, seeker, wrap_text, timezone_offset
createdate = convert_utc_human_to_timezone(createdate,timezone_offset)

moddate = convert_ts_human_to_utc(row[1])
moddate = convert_utc_human_to_timezone(moddate,timezone)
moddate = convert_utc_human_to_timezone(moddate,timezone_offset)
data_list.append((createdate, row[3], row[2], moddate, location_file_found))

dir_file_found = dirname(sqlite_file).split('Stores', 1)[0] + 'Stores'
Expand Down
6 changes: 3 additions & 3 deletions scripts/artifacts/tcc.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def get_tcc(files_found, report_folder, seeker, wrap_text, timezone_offset):
db = open_sqlite_db_readonly(file_found)
cursor = db.cursor()

if does_column_exist_in_db(db, 'access', 'last_modified'):
if does_column_exist_in_db(file_found, 'access', 'last_modified'):
last_modified_timestamp = "datetime(last_modified,'unixepoch') as 'Last Modified Timestamp'"
else:
last_modified_timestamp = ""

if does_column_exist_in_db(db, 'access', 'auth_value'):
if does_column_exist_in_db(file_found, 'access', 'auth_value'):
access = '''
case auth_value
when 0 then 'Not allowed'
Expand All @@ -53,7 +53,7 @@ def get_tcc(files_found, report_folder, seeker, wrap_text, timezone_offset):
end as "Access"
'''

prompt_count = does_column_exist_in_db(db, 'access', 'prompt_count')
prompt_count = does_column_exist_in_db(file_found, 'access', 'prompt_count')

cursor.execute(f'''
select {last_modified_timestamp if last_modified_timestamp else "''"},
Expand Down
23 changes: 3 additions & 20 deletions scripts/artifacts/vipps.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,11 @@
import io
import nska_deserialize as nd
import sqlite3
import plistlib
import json
import sys

from scripts.artifact_report import ArtifactHtmlReport
from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly

def does_column_exist_in_db(db, table_name, col_name):
'''Checks if a specific column exists in a table'''
col_name = col_name.lower()
try:
db.row_factory = sqlite3.Row # For fetching columns by name
query = f"pragma table_info('{table_name}');"
cursor = db.cursor()
cursor.execute(query)
all_rows = cursor.fetchall()
for row in all_rows:
if row['name'].lower() == col_name:
return True
except sqlite3.Error as ex:
logfunc(f"Query error, query={query} Error={str(ex)}")
pass
return False
from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly, does_column_exist_in_db

def get_vipps(files_found, report_folder, seeker, wrap_text, time_offset):
for file_found in files_found:
Expand Down Expand Up @@ -90,7 +73,7 @@ def get_vipps(files_found, report_folder, seeker, wrap_text, time_offset):
name = row1[0]
else:
# Check if ZPHONENUMBERS exists, if so, use it instead
if does_column_exist_in_db(db, 'ZCONTACTMODEL', 'ZPHONENUMBERS'):
if does_column_exist_in_db(file_found, 'ZCONTACTMODEL', 'ZPHONENUMBERS'):
cursor1.execute(f'''
SELECT
ZNAME,
Expand Down
23 changes: 3 additions & 20 deletions scripts/artifacts/vippsContacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os

from scripts.artifact_report import ArtifactHtmlReport
from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly, media_to_html
from scripts.ilapfuncs import logfunc, tsv, timeline, is_platform_windows, open_sqlite_db_readonly, does_column_exist_in_db

def relative_paths(source, splitter):
splitted_a = source.split(splitter)
Expand All @@ -16,23 +16,6 @@ def relative_paths(source, splitter):
splitted_b = source.split(report_folder)
return '.'+ splitted_b[1]

def does_column_exist_in_db(db, table_name, col_name):
'''Checks if a specific column exists in a table'''
col_name = col_name.lower()
try:
db.row_factory = sqlite3.Row # For fetching columns by name
query = f"pragma table_info('{table_name}');"
cursor = db.cursor()
cursor.execute(query)
all_rows = cursor.fetchall()
for row in all_rows:
if row['name'].lower() == col_name:
return True
except sqlite3.Error as ex:
logfunc(f"Query error, query={query} Error={str(ex)}")
pass
return False

def get_vippsContacts(files_found, report_folder, seeker, wrap_text, timezone_offset):
for file_found in files_found:
file_found = str(file_found)
Expand All @@ -46,9 +29,9 @@ def get_vippsContacts(files_found, report_folder, seeker, wrap_text, timezone_of
cursor = db.cursor()

# Check if ZRAWPHONENUMBERS exists, if not, check ZPHONENUMBERS
if does_column_exist_in_db(db, 'ZCONTACTMODEL', 'ZRAWPHONENUMBERS'):
if does_column_exist_in_db(file_found, 'ZCONTACTMODEL', 'ZRAWPHONENUMBERS'):
phone_column = 'ZRAWPHONENUMBERS'
elif does_column_exist_in_db(db, 'ZCONTACTMODEL', 'ZPHONENUMBERS'):
elif does_column_exist_in_db(file_found, 'ZCONTACTMODEL', 'ZPHONENUMBERS'):
phone_column = 'ZPHONENUMBERS'
else:
logfunc('Neither ZRAWPHONENUMBERS nor ZPHONENUMBERS exist in ZCONTACTMODEL table.')
Expand Down
79 changes: 51 additions & 28 deletions scripts/artifacts/wire.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,90 @@
__artifacts_v2__ = {
"wireAccount": {
"name": "Wire Account",
"name": "Wire Secure Messenger Account",
"description": "Wire account details",
"author": "Elliot Glendye",
"creation_date": "2024-01-21",
"last_update_date": "2025-01-03",
"requirements": "",
"category": "Wire",
"category": "Business",
"notes": "",
"paths": ('*/mobile/Containers/Shared/AppGroup/*/AccountData/*/store/store.wiredatabase*'),
"output_types": "all",
"artifact_icon": "user"
},
"wireMessages": {
"name": "Wire Messages",
"name": "Wire Secure Messenger Messages",
"description": "Wire messages, including message sender, associated user identifiers and message type",
"author": "Elliot Glendye",
"creation_date": "2024-01-21",
"last_update_date": "2025-01-03",
"requirements": "",
"category": "Wire",
"category": "Business",
"notes": "",
"paths": ('*/mobile/Containers/Shared/AppGroup/*/AccountData/*/store/store.wiredatabase*'),
"output_types": "standard",
"artifact_icon": "message-circle"
}
}

from scripts.ilapfuncs import artifact_processor, get_file_path, get_sqlite_db_records, convert_cocoa_core_data_ts_to_utc
from scripts.ilapfuncs import artifact_processor, get_file_path, get_sqlite_db_records, does_column_exist_in_db, convert_cocoa_core_data_ts_to_utc

@artifact_processor
def wireAccount(files_found, report_folder, seeker, wrap_text, timezone_offset):
source_path = get_file_path(files_found, "store.wiredatabase")
data_list = []

query = '''
SELECT
DISTINCT ZUSER.ZHANDLE AS 'User ID',
ZUSER.ZNAME AS 'Display Name',
ZUSERCLIENT.ZACTIVATIONDATE AS 'Activation Date',
ZUSER.ZPHONENUMBER AS 'Phone Number',
ZUSER.ZEMAILADDRESS AS 'Email Address',
ZUSERCLIENT.ZACTIVATIONLOCATIONLATITUDE AS 'Activation Latitude',
ZUSERCLIENT.ZACTIVATIONLOCATIONLONGITUDE AS 'Activation Longitude'
FROM ZUSER
LEFT JOIN ZUSERCLIENT ON ZUSER.Z_PK = ZUSERCLIENT.ZUSER;
'''

data_headers = (
'User ID',
'Display Name',
('Activation Date', 'datetime'),
('Phone Number', 'phonenumber'),
'Email Address',
'Latitude',
'Longitude'
)
has_location_data = does_column_exist_in_db(source_path, 'ZUSER', 'ZUSERCLIENT.ZACTIVATIONLOCATIONLATITUDE')

if has_location_data:
query = '''
SELECT
DISTINCT ZUSER.ZHANDLE AS 'User ID',
ZUSER.ZNAME AS 'Display Name',
ZUSERCLIENT.ZACTIVATIONDATE AS 'Activation Date',
ZUSER.ZPHONENUMBER AS 'Phone Number',
ZUSER.ZEMAILADDRESS AS 'Email Address',
ZUSERCLIENT.ZACTIVATIONLOCATIONLATITUDE AS 'Activation Latitude',
ZUSERCLIENT.ZACTIVATIONLOCATIONLONGITUDE AS 'Activation Longitude'
FROM ZUSER
LEFT JOIN ZUSERCLIENT ON ZUSER.Z_PK = ZUSERCLIENT.ZUSER;
'''
data_headers = (
'User ID',
'Display Name',
('Activation Date', 'datetime'),
('Phone Number', 'phonenumber'),
'Email Address',
'Latitude',
'Longitude'
)
else:
query = '''
SELECT
DISTINCT ZUSER.ZHANDLE AS 'User ID',
ZUSER.ZNAME AS 'Display Name',
ZUSERCLIENT.ZACTIVATIONDATE AS 'Activation Date',
ZUSER.ZPHONENUMBER AS 'Phone Number',
ZUSER.ZEMAILADDRESS AS 'Email Address'
FROM ZUSER
LEFT JOIN ZUSERCLIENT ON ZUSER.Z_PK = ZUSERCLIENT.ZUSER;
'''
data_headers = (
'User ID',
'Display Name',
('Activation Date', 'datetime'),
('Phone Number', 'phonenumber'),
'Email Address'
)

db_records = get_sqlite_db_records(source_path, query)

for record in db_records:
activation_date = convert_cocoa_core_data_ts_to_utc(record[2])
data_list.append((record[0], record[1], activation_date, record[3], record[4], record[5], record[6]))
if has_location_data:
data_list.append((record[0], record[1], activation_date, record[3], record[4], record[5], record[6]))
else:
data_list.append((record[0], record[1], activation_date, record[3], record[4]))

return data_headers, data_list, source_path

Expand Down
3 changes: 2 additions & 1 deletion scripts/ilapfuncs.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,9 @@ def get_sqlite_db_records(path, query, attach_query=None):
logfunc(f" - {str(e)}")
return []

def does_column_exist_in_db(db, table_name, col_name):
def does_column_exist_in_db(path, table_name, col_name):
'''Checks if a specific col exists'''
db = open_sqlite_db_readonly(path)
col_name = col_name.lower()
try:
db.row_factory = sqlite3.Row # For fetching columns by name
Expand Down

0 comments on commit dce59f3

Please sign in to comment.