Skip to content

Commit

Permalink
Merge branch 'HLA-810-remove-data' into HLA-812-testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jacob720 committed Jan 6, 2025
2 parents b0a9094 + 01ae6a4 commit 9950edb
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 1 deletion.
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ dev = [
]

[project.scripts]
aa-remove-data-after = "aa_remove_data.remove_data:aa_remove_data_after"
aa-remove-data-before = "aa_remove_data.remove_data:aa_remove_data_before"
aa-reduce-data-by-factor = "aa_remove_data.remove_data:aa_reduce_by_factor"
aa-reduce-data-freq = "aa_remove_data.remove_data:aa_reduce_freq"
aa-remove-data = "aa_remove_data.__main__:main"
aa-remove-data-every-nth = "aa_remove_data.remove_data:aa_remove_every_nth"
aa-print-header = "aa_remove_data.pb_utils:print_header"
PB_2_TXT = "aa_remove_data.pb_utils:pb_2_txt"

Expand Down
203 changes: 202 additions & 1 deletion src/aa_remove_data/remove_data.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import argparse
import time
from collections.abc import Callable
from datetime import datetime
from pathlib import Path
from typing import Any

from aa_remove_data.pb_utils import PBUtils


def get_nano_diff(sample1: type, sample2: type) -> int:
"""Get the difference in nano seconds between two samples.
Expand Down Expand Up @@ -163,7 +168,7 @@ def keep_every_nth(samples: list, n: int, block_size: int = 1) -> list:
list: Reduced list of samples.
"""
if block_size == 1:
return samples[::n]
return samples[n - 1 :: n]
else:
return [
item
Expand Down Expand Up @@ -193,3 +198,199 @@ def remove_every_nth(samples: list, n: int, block_size: int = 1) -> list:
for i, item in enumerate(samples)
if (i + block_size) // block_size % n != 0
]


def aa_reduce_freq():
parser = argparse.ArgumentParser()
parser.add_argument("filename", type=str)
parser.add_argument("period", type=float)
parser.add_argument("--new_filename", type=str, default=None)
parser.add_argument("--backup_filename", type=str, default=None)
parser.add_argument("--write_txt", action="store_true")
args = parser.parse_args()

assert args.filename.endswith(".pb")

if args.new_filename is None:
new_pb = Path(args.filename)
else:
assert args.new_filename.endswith(".pb")
new_pb = Path(args.new_filename)

if args.backup_filename is None:
backup_pb = Path(args.filename.strip(".pb") + "_backup.pb")
else:
assert args.backup_filename.endswith(".pb")
backup_pb = Path(args.backup_filename)

pb = PBUtils(Path(args.filename))
pb.write_pb(backup_pb)
pb.samples = reduce_freq(pb.samples, period=args.period)
pb.write_pb(new_pb)
if args.write_txt:
txt_filepath = Path(str(new_pb).strip(".pb") + ".txt")
pb.write_to_txt(txt_filepath)


def aa_reduce_by_factor():
parser = argparse.ArgumentParser()
parser.add_argument("filename", type=str)
parser.add_argument("factor", type=int)
parser.add_argument("--new_filename", type=str, default=None)
parser.add_argument("--backup_filename", type=str, default=None)
parser.add_argument("--write_txt", action="store_true")
parser.add_argument("--block", type=int, default=1)
args = parser.parse_args()

assert args.filename.endswith(".pb")

if args.new_filename is None:
new_pb = Path(args.filename)
else:
assert args.new_filename.endswith(".pb")
new_pb = Path(args.new_filename)

if args.backup_filename is None:
backup_pb = Path(args.filename.strip(".pb") + "_backup.pb")
else:
assert args.backup_filename.endswith(".pb")
backup_pb = Path(args.backup_filename)

pb = PBUtils(Path(args.filename))
pb.write_pb(backup_pb)
pb.samples = keep_every_nth(pb.samples, args.factor, block_size=args.block)
pb.write_pb(new_pb)
if args.write_txt:
txt_filepath = Path(str(new_pb).strip(".pb") + ".txt")
pb.write_to_txt(txt_filepath)


def aa_remove_every_nth():
parser = argparse.ArgumentParser()
parser.add_argument("filename", type=str)
parser.add_argument("n", type=int)
parser.add_argument("--new_filename", type=str, default=None)
parser.add_argument("--backup_filename", type=str, default=None)
parser.add_argument("--write_txt", action="store_true")
parser.add_argument("--block", type=int, default=1)
args = parser.parse_args()

assert args.filename.endswith(".pb")

if args.new_filename is None:
new_pb = Path(args.filename)
else:
assert args.new_filename.endswith(".pb")
new_pb = Path(args.new_filename)

if args.backup_filename is None:
backup_pb = Path(args.filename.strip(".pb") + "_backup.pb")
else:
assert args.backup_filename.endswith(".pb")
backup_pb = Path(args.backup_filename)

pb = PBUtils(Path(args.filename))
pb.write_pb(backup_pb)
pb.samples = remove_every_nth(pb.samples, args.n, block_size=args.block)
pb.write_pb(new_pb)
if args.write_txt:
txt_filepath = Path(str(new_pb).strip(".pb") + ".txt")
pb.write_to_txt(txt_filepath)


def aa_remove_data_before():
parser = argparse.ArgumentParser()
parser.add_argument("filename", type=str)
parser.add_argument("--ts", nargs="+", type=int, required=True)
parser.add_argument("--new_filename", type=str, default=None)
parser.add_argument("--backup_filename", type=str, default=None)
parser.add_argument("--write_txt", action="store_true")
parser.add_argument("--block", type=int, default=1)
args = parser.parse_args()

assert args.filename.endswith(".pb")

if args.new_filename is None:
new_pb = Path(args.filename)
else:
assert args.new_filename.endswith(".pb")
new_pb = Path(args.new_filename)

if args.backup_filename is None:
backup_pb = Path(args.filename.strip(".pb") + "_backup.pb")
else:
assert args.backup_filename.endswith(".pb")
backup_pb = Path(args.backup_filename)

pb_header = PBUtils(Path(args.filename), chunk_size=0)
year = pb_header.header.year
timestamp = args.ts
assert len(timestamp) <= 6, (
"Give timestamp in the form 'month.day.hour.minute.second.nanosecond'. "
+ "Month is required. All must be integers."
)
if len(timestamp) == 6:
nano = timestamp.pop(5)
else:
nano = 0
if len(timestamp) == 1:
timestamp.append(1)

diff = datetime(*([year] + timestamp)) - datetime(year, 1, 1)
seconds = int(diff.total_seconds())
pb = PBUtils(Path(args.filename))
pb.write_pb(backup_pb)
pb.samples = remove_before_ts(pb.samples, seconds, nano=nano)
pb.write_pb(new_pb)
if args.write_txt:
txt_filepath = Path(str(new_pb).strip(".pb") + ".txt")
pb.write_to_txt(txt_filepath)


def aa_remove_data_after():
parser = argparse.ArgumentParser()
parser.add_argument("filename", type=str)
parser.add_argument("--ts", nargs="+", type=int, required=True)
parser.add_argument("--new_filename", type=str, default=None)
parser.add_argument("--backup_filename", type=str, default=None)
parser.add_argument("--write_txt", action="store_true")
parser.add_argument("--block", type=int, default=1)
args = parser.parse_args()

assert args.filename.endswith(".pb")

if args.new_filename is None:
new_pb = Path(args.filename)
else:
assert args.new_filename.endswith(".pb")
new_pb = Path(args.new_filename)

if args.backup_filename is None:
backup_pb = Path(args.filename.strip(".pb") + "_backup.pb")
else:
assert args.backup_filename.endswith(".pb")
backup_pb = Path(args.backup_filename)

pb_header = PBUtils(Path(args.filename), chunk_size=0)
year = pb_header.header.year
timestamp = args.ts
assert len(timestamp) <= 6, (
"Give timestamp in the form 'month.day.hour.minute.second.nanosecond'. "
+ "Month is required. All must be integers."
)
if len(timestamp) == 6:
nano = timestamp.pop(5)
else:
nano = 0
if len(timestamp) == 1:
timestamp.append(1)

diff = datetime(*([year] + timestamp)) - datetime(year, 1, 1)
seconds = int(diff.total_seconds())
pb = PBUtils(Path(args.filename))
pb.write_pb(backup_pb)
pb.samples = remove_after_ts(pb.samples, seconds, nano=nano)
pb.write_pb(new_pb)
if args.write_txt:
txt_filepath = Path(str(new_pb).strip(".pb") + ".txt")
pb.write_to_txt(txt_filepath)

0 comments on commit 9950edb

Please sign in to comment.