-
Notifications
You must be signed in to change notification settings - Fork 220
/
Copy patherpnext_sync.py
332 lines (299 loc) · 16.3 KB
/
erpnext_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
import local_config as config
import requests
import datetime
import json
import os
import sys
import time
import logging
from logging.handlers import RotatingFileHandler
import pickledb
from zk import ZK, const
EMPLOYEE_NOT_FOUND_ERROR_MESSAGE = "No Employee found for the given employee field value"
EMPLOYEE_INACTIVE_ERROR_MESSAGE = "Transactions cannot be created for an Inactive Employee"
DUPLICATE_EMPLOYEE_CHECKIN_ERROR_MESSAGE = "This employee already has a log with the same timestamp"
allowlisted_errors = [EMPLOYEE_NOT_FOUND_ERROR_MESSAGE, EMPLOYEE_INACTIVE_ERROR_MESSAGE, DUPLICATE_EMPLOYEE_CHECKIN_ERROR_MESSAGE]
if hasattr(config,'allowed_exceptions'):
allowlisted_errors_temp = []
for error_number in config.allowed_exceptions:
allowlisted_errors_temp.append(allowlisted_errors[error_number-1])
allowlisted_errors = allowlisted_errors_temp
device_punch_values_IN = getattr(config, 'device_punch_values_IN', [0,4])
device_punch_values_OUT = getattr(config, 'device_punch_values_OUT', [1,5])
ERPNEXT_VERSION = getattr(config, 'ERPNEXT_VERSION', 14)
# possible area of further developemt
# Real-time events - setup getting events pushed from the machine rather then polling.
#- this is documented as 'Real-time events' in the ZKProtocol manual.
# Notes:
# Status Keys in status.json
# - lift_off_timestamp
# - mission_accomplished_timestamp
# - <device_id>_pull_timestamp
# - <device_id>_push_timestamp
# - <shift_type>_sync_timestamp
def main():
"""Takes care of checking if it is time to pull data based on config,
then calling the relevent functions to pull data and push to EPRNext.
"""
try:
last_lift_off_timestamp = _safe_convert_date(status.get('lift_off_timestamp'), "%Y-%m-%d %H:%M:%S.%f")
if (last_lift_off_timestamp and last_lift_off_timestamp < datetime.datetime.now() - datetime.timedelta(minutes=config.PULL_FREQUENCY)) or not last_lift_off_timestamp:
status.set('lift_off_timestamp', str(datetime.datetime.now()))
info_logger.info("Cleared for lift off!")
for device in config.devices:
device_attendance_logs = None
info_logger.info("Processing Device: "+ device['device_id'])
dump_file = get_dump_file_name_and_directory(device['device_id'], device['ip'])
if os.path.exists(dump_file):
info_logger.error('Device Attendance Dump Found in Log Directory. This can mean the program crashed unexpectedly. Retrying with dumped data.')
with open(dump_file, 'r') as f:
file_contents = f.read()
if file_contents:
device_attendance_logs = list(map(lambda x: _apply_function_to_key(x, 'timestamp', datetime.datetime.fromtimestamp), json.loads(file_contents)))
try:
pull_process_and_push_data(device, device_attendance_logs)
status.set(f'{device["device_id"]}_push_timestamp', str(datetime.datetime.now()))
if os.path.exists(dump_file):
os.remove(dump_file)
info_logger.info("Successfully processed Device: "+ device['device_id'])
except:
error_logger.exception('exception when calling pull_process_and_push_data function for device'+json.dumps(device, default=str))
if hasattr(config,'shift_type_device_mapping'):
update_shift_last_sync_timestamp(config.shift_type_device_mapping)
status.set('mission_accomplished_timestamp', str(datetime.datetime.now()))
info_logger.info("Mission Accomplished!")
except:
error_logger.exception('exception has occurred in the main function...')
def pull_process_and_push_data(device, device_attendance_logs=None):
""" Takes a single device config as param and pulls data from that device.
params:
device: a single device config object from the local_config file
device_attendance_logs: fetching from device is skipped if this param is passed. used to restart failed fetches from previous runs.
"""
attendance_success_log_file = '_'.join(["attendance_success_log", device['device_id']])
attendance_failed_log_file = '_'.join(["attendance_failed_log", device['device_id']])
attendance_success_logger = setup_logger(attendance_success_log_file, '/'.join([config.LOGS_DIRECTORY, attendance_success_log_file])+'.log')
attendance_failed_logger = setup_logger(attendance_failed_log_file, '/'.join([config.LOGS_DIRECTORY, attendance_failed_log_file])+'.log')
if not device_attendance_logs:
device_attendance_logs = get_all_attendance_from_device(device['ip'], device_id=device['device_id'], clear_from_device_on_fetch=device['clear_from_device_on_fetch'])
if not device_attendance_logs:
return
# for finding the last successfull push and restart from that point (or) from a set 'config.IMPORT_START_DATE' (whichever is later)
index_of_last = -1
last_line = get_last_line_from_file('/'.join([config.LOGS_DIRECTORY, attendance_success_log_file])+'.log')
import_start_date = _safe_convert_date(config.IMPORT_START_DATE, "%Y%m%d")
if last_line or import_start_date:
last_user_id = None
last_timestamp = None
if last_line:
last_user_id, last_timestamp = last_line.split("\t")[4:6]
last_timestamp = datetime.datetime.fromtimestamp(float(last_timestamp))
if import_start_date:
if last_timestamp:
if last_timestamp < import_start_date:
last_timestamp = import_start_date
last_user_id = None
else:
last_timestamp = import_start_date
for i, x in enumerate(device_attendance_logs):
if last_user_id and last_timestamp:
if last_user_id == str(x['user_id']) and last_timestamp == x['timestamp']:
index_of_last = i
break
elif last_timestamp:
if x['timestamp'] >= last_timestamp:
index_of_last = i
break
for device_attendance_log in device_attendance_logs[index_of_last+1:]:
punch_direction = device['punch_direction']
if punch_direction == 'AUTO':
if device_attendance_log['punch'] in device_punch_values_OUT:
punch_direction = 'OUT'
elif device_attendance_log['punch'] in device_punch_values_IN:
punch_direction = 'IN'
else:
punch_direction = None
erpnext_status_code, erpnext_message = send_to_erpnext(device_attendance_log['user_id'], device_attendance_log['timestamp'], device['device_id'], punch_direction)
if erpnext_status_code == 200:
attendance_success_logger.info("\t".join([erpnext_message, str(device_attendance_log['uid']),
str(device_attendance_log['user_id']), str(device_attendance_log['timestamp'].timestamp()),
str(device_attendance_log['punch']), str(device_attendance_log['status']),
json.dumps(device_attendance_log, default=str)]))
else:
attendance_failed_logger.error("\t".join([str(erpnext_status_code), str(device_attendance_log['uid']),
str(device_attendance_log['user_id']), str(device_attendance_log['timestamp'].timestamp()),
str(device_attendance_log['punch']), str(device_attendance_log['status']),
json.dumps(device_attendance_log, default=str)]))
if not(any(error in erpnext_message for error in allowlisted_errors)):
raise Exception('API Call to ERPNext Failed.')
def get_all_attendance_from_device(ip, port=4370, timeout=30, device_id=None, clear_from_device_on_fetch=False):
# Sample Attendance Logs [{'punch': 255, 'user_id': '22', 'uid': 12349, 'status': 1, 'timestamp': datetime.datetime(2019, 2, 26, 20, 31, 29)},{'punch': 255, 'user_id': '7', 'uid': 7, 'status': 1, 'timestamp': datetime.datetime(2019, 2, 26, 20, 31, 36)}]
zk = ZK(ip, port=port, timeout=timeout)
conn = None
attendances = []
try:
conn = zk.connect()
x = conn.disable_device()
# device is disabled when fetching data
info_logger.info("\t".join((ip, "Device Disable Attempted. Result:", str(x))))
attendances = conn.get_attendance()
info_logger.info("\t".join((ip, "Attendances Fetched:", str(len(attendances)))))
status.set(f'{device_id}_push_timestamp', None)
status.set(f'{device_id}_pull_timestamp', str(datetime.datetime.now()))
if len(attendances):
# keeping a backup before clearing data incase the programs fails.
# if everything goes well then this file is removed automatically at the end.
dump_file_name = get_dump_file_name_and_directory(device_id, ip)
with open(dump_file_name, 'w+') as f:
f.write(json.dumps(list(map(lambda x: x.__dict__, attendances)), default=datetime.datetime.timestamp))
if clear_from_device_on_fetch:
x = conn.clear_attendance()
info_logger.info("\t".join((ip, "Attendance Clear Attempted. Result:", str(x))))
x = conn.enable_device()
info_logger.info("\t".join((ip, "Device Enable Attempted. Result:", str(x))))
except:
error_logger.exception(str(ip)+' exception when fetching from device...')
raise Exception('Device fetch failed.')
finally:
if conn:
conn.disconnect()
return list(map(lambda x: x.__dict__, attendances))
def send_to_erpnext(employee_field_value, timestamp, device_id=None, log_type=None):
"""
Example: send_to_erpnext('12349',datetime.datetime.now(),'HO1','IN')
"""
endpoint_app = "hrms" if ERPNEXT_VERSION > 13 else "erpnext"
url = f"{config.ERPNEXT_URL}/api/method/{endpoint_app}.hr.doctype.employee_checkin.employee_checkin.add_log_based_on_employee_field"
headers = {
'Authorization': "token "+ config.ERPNEXT_API_KEY + ":" + config.ERPNEXT_API_SECRET,
'Accept': 'application/json'
}
data = {
'employee_field_value' : employee_field_value,
'timestamp' : timestamp.__str__(),
'device_id' : device_id,
'log_type' : log_type
}
response = requests.request("POST", url, headers=headers, json=data)
if response.status_code == 200:
return 200, json.loads(response._content)['message']['name']
else:
error_str = _safe_get_error_str(response)
if EMPLOYEE_NOT_FOUND_ERROR_MESSAGE in error_str:
error_logger.error('\t'.join(['Error during ERPNext API Call.', str(employee_field_value), str(timestamp.timestamp()), str(device_id), str(log_type), error_str]))
# TODO: send email?
else:
error_logger.error('\t'.join(['Error during ERPNext API Call.', str(employee_field_value), str(timestamp.timestamp()), str(device_id), str(log_type), error_str]))
return response.status_code, error_str
def update_shift_last_sync_timestamp(shift_type_device_mapping):
"""
### algo for updating the sync_current_timestamp
- get a list of devices to check
- check if all the devices have a non 'None' push_timestamp
- check if the earliest of the pull timestamp is greater than sync_current_timestamp for each shift name
- then update this min of pull timestamp to the shift
"""
for shift_type_device_map in shift_type_device_mapping:
all_devices_pushed = True
pull_timestamp_array = []
for device_id in shift_type_device_map['related_device_id']:
if not status.get(f'{device_id}_push_timestamp'):
all_devices_pushed = False
break
pull_timestamp_array.append(_safe_convert_date(status.get(f'{device_id}_pull_timestamp'), "%Y-%m-%d %H:%M:%S.%f"))
if all_devices_pushed:
min_pull_timestamp = min(pull_timestamp_array)
if isinstance(shift_type_device_map['shift_type_name'], str): # for backward compatibility of config file
shift_type_device_map['shift_type_name'] = [shift_type_device_map['shift_type_name']]
for shift in shift_type_device_map['shift_type_name']:
try:
sync_current_timestamp = _safe_convert_date(status.get(f'{shift}_sync_timestamp'), "%Y-%m-%d %H:%M:%S.%f")
if (sync_current_timestamp and min_pull_timestamp > sync_current_timestamp) or (min_pull_timestamp and not sync_current_timestamp):
response_code = send_shift_sync_to_erpnext(shift, min_pull_timestamp)
if response_code == 200:
status.set(f'{shift}_sync_timestamp', str(min_pull_timestamp))
except:
error_logger.exception('Exception in update_shift_last_sync_timestamp, for shift:'+shift)
def send_shift_sync_to_erpnext(shift_type_name, sync_timestamp):
url = config.ERPNEXT_URL + "/api/resource/Shift Type/" + shift_type_name
headers = {
'Authorization': "token "+ config.ERPNEXT_API_KEY + ":" + config.ERPNEXT_API_SECRET,
'Accept': 'application/json'
}
data = {
"last_sync_of_checkin" : str(sync_timestamp)
}
try:
response = requests.request("PUT", url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
info_logger.info("\t".join(['Shift Type last_sync_of_checkin Updated', str(shift_type_name), str(sync_timestamp.timestamp())]))
else:
error_str = _safe_get_error_str(response)
error_logger.error('\t'.join(['Error during ERPNext Shift Type API Call.', str(shift_type_name), str(sync_timestamp.timestamp()), error_str]))
return response.status_code
except:
error_logger.exception("\t".join(['exception when updating last_sync_of_checkin in Shift Type', str(shift_type_name), str(sync_timestamp.timestamp())]))
def get_last_line_from_file(file):
# concerns to address(may be much later):
# how will last line lookup work with log rotation when a new file is created?
#- will that new file be empty at any time? or will it have a partial line from the previous file?
line = None
if os.stat(file).st_size < 5000:
# quick hack to handle files with one line
with open(file, 'r') as f:
for line in f:
pass
else:
# optimized for large log files
with open(file, 'rb') as f:
f.seek(-2, os.SEEK_END)
while f.read(1) != b'\n':
f.seek(-2, os.SEEK_CUR)
line = f.readline().decode()
return line
def setup_logger(name, log_file, level=logging.INFO, formatter=None):
if not formatter:
formatter = logging.Formatter('%(asctime)s\t%(levelname)s\t%(message)s')
handler = RotatingFileHandler(log_file, maxBytes=10000000, backupCount=50)
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
if not logger.hasHandlers():
logger.addHandler(handler)
return logger
def get_dump_file_name_and_directory(device_id, device_ip):
return config.LOGS_DIRECTORY + '/' + device_id + "_" + device_ip.replace('.', '_') + '_last_fetch_dump.json'
def _apply_function_to_key(obj, key, fn):
obj[key] = fn(obj[key])
return obj
def _safe_convert_date(datestring, pattern):
try:
return datetime.datetime.strptime(datestring, pattern)
except:
return None
def _safe_get_error_str(res):
try:
error_json = json.loads(res._content)
if 'exc' in error_json: # this means traceback is available
error_str = json.loads(error_json['exc'])[0]
else:
error_str = json.dumps(error_json)
except:
error_str = str(res.__dict__)
return error_str
# setup logger and status
if not os.path.exists(config.LOGS_DIRECTORY):
os.makedirs(config.LOGS_DIRECTORY)
error_logger = setup_logger('error_logger', '/'.join([config.LOGS_DIRECTORY, 'error.log']), logging.ERROR)
info_logger = setup_logger('info_logger', '/'.join([config.LOGS_DIRECTORY, 'logs.log']))
status = pickledb.load('/'.join([config.LOGS_DIRECTORY, 'status.json']), True)
def infinite_loop(sleep_time=15):
print("Service Running...")
while True:
try:
main()
time.sleep(sleep_time)
except BaseException as e:
print(e)
if __name__ == "__main__":
infinite_loop()