From a9ddc41a7230e7ad19639c24738fbc4300031a43 Mon Sep 17 00:00:00 2001 From: davidh Date: Sun, 23 Jun 2024 07:55:35 +0000 Subject: [PATCH 1/3] loaders --- benchmarking/PQ_Arrow_Loaders.ipynb | 845 ++++++++++++++++++++++++++++ 1 file changed, 845 insertions(+) create mode 100644 benchmarking/PQ_Arrow_Loaders.ipynb diff --git a/benchmarking/PQ_Arrow_Loaders.ipynb b/benchmarking/PQ_Arrow_Loaders.ipynb new file mode 100644 index 0000000..22ce85b --- /dev/null +++ b/benchmarking/PQ_Arrow_Loaders.ipynb @@ -0,0 +1,845 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 7, + "id": "608ae1bd-0b68-4e0e-b748-e7f89807724e", + "metadata": {}, + "outputs": [], + "source": [ + "import fog_x \n", + "import os\n", + "from logging import getLogger\n", + "import numpy as np\n", + "import os\n", + "os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # or any {'0', '1', '2'}\n", + "import tensorflow as tf\n", + "\n", + "class BaseLoader():\n", + " def __init__(self, data_path):\n", + " super(BaseLoader, self).__init__()\n", + " self.data_dir = data_path\n", + " self.logger = getLogger(__name__)\n", + " self.index = 0\n", + "\n", + "\n", + " def __len__(self):\n", + " raise NotImplementedError\n", + "\n", + " def __iter___(self):\n", + " raise NotImplementedError\n", + "\n", + "class BaseExporter():\n", + " def __init__(self):\n", + " super(BaseExporter, self).__init__()\n", + " self.logger = getLogger(__name__)\n", + "\n", + " def export(self, loader: BaseLoader, output_path: str):\n", + " raise NotImplementedError\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "fe6f0124-e66c-4529-8354-12fbc0769de5", + "metadata": {}, + "outputs": [], + "source": [ + "class RTXLoader(BaseLoader):\n", + " def __init__(self, data_path, split):\n", + " super(RTXLoader, self).__init__(data_path)\n", + " import tensorflow_datasets as tfds\n", + "\n", + " builder = tfds.builder_from_directory(data_path)\n", + "\n", + " self.ds = builder.as_dataset(split=split)\n", + " # https://www.determined.ai/blog/tf-dataset-the-bad-parts\n", + "\n", + " def __len__(self):\n", + " return len(self.ds)\n", + " \n", + " def __next__(self):\n", + " if self.index < len(self.ds):\n", + " self.index += 1\n", + " nested_dataset = self.ds.__iter__()\n", + " trajectory = list(nested_dataset)[0][\"steps\"]\n", + " ret = []\n", + " # Iterate through the outer dataset\n", + " for step_data in trajectory:\n", + " step = {}\n", + " for dataset_key, element in step_data.items():\n", + " # print(np.array(element))\n", + " if dataset_key == \"observation\":\n", + " step[\"observation\"] = {}\n", + " for obs_key, obs_element in element.items():\n", + " step[\"observation\"][obs_key] = np.array(obs_element)\n", + " elif dataset_key == \"action\":\n", + " step[\"action\"] = {}\n", + " for action_key, action_element in element.items():\n", + " step[\"action\"][action_key] = np.array(action_element)\n", + " else:\n", + " step[dataset_key] = np.array(element)\n", + " ret.append(step)\n", + " return ret\n", + " else:\n", + " raise StopIteration\n", + " \n", + "\n", + " \n", + " def __iter__(self):\n", + " return self\n", + "\n", + "\n", + "class RTXExporter(BaseExporter):\n", + " def __init__(self):\n", + " super(RTXExporter, self).__init__()\n", + "\n", + " def export(self, loader: BaseLoader, output_path: str):\n", + " raise NotImplementedError" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "9920acb4", + "metadata": {}, + "outputs": [], + "source": [ + "import av\n", + "import pickle \n", + "\n", + "class MKVLoader(BaseLoader):\n", + " def __init__(self, data_path):\n", + " super(MKVLoader, self).__init__(data_path)\n", + " self.files = [data_path + f for f in os.listdir(data_path) if f.endswith('.mkv')]\n", + " self.index = 0\n", + "\n", + " def __len__(self):\n", + " return len(self.files)\n", + " \n", + " def __iter__(self):\n", + " return self\n", + "\n", + " def __next__(self):\n", + " if self.index < len(self):\n", + " result = self.files[self.index]\n", + " self.index += 1\n", + " return self._parse_mkv_file(result)\n", + " else:\n", + " raise StopIteration\n", + " \n", + " def _parse_mkv_file(self, filename):\n", + " print(filename)\n", + " input_container = av.open(filename)\n", + " video_stream1 = input_container.streams.video[0] \n", + " video_stream1.thread_type = 'AUTO'\n", + " video_stream2 = input_container.streams.video[1] \n", + " video_stream2.thread_type = 'AUTO'\n", + " depth_stream = input_container.streams.video[2] \n", + " depth_stream.thread_type = 'AUTO'\n", + " data_stream = input_container.streams[3] \n", + "\n", + " decoded_stream_1 = []\n", + " decoded_stream_2 = []\n", + " decoded_stream_depth = []\n", + " decoded_stream_data = []\n", + "\n", + " pkt_counter = 0\n", + " for packet in input_container.demux(video_stream1, video_stream2, depth_stream, data_stream):\n", + " pkt_counter += 1\n", + " if packet.stream.index == video_stream1.index: \n", + " frame = packet.decode()\n", + " if frame:\n", + " for f in frame:\n", + " image = f.to_ndarray(format='rgb24')\n", + " decoded_stream_1.append(image)\n", + " elif packet.stream.index == video_stream2.index:\n", + " frame = packet.decode()\n", + " if frame:\n", + " for f in frame:\n", + " image = f.to_ndarray(format='rgb24')\n", + " decoded_stream_2.append(image)\n", + " elif packet.stream.index == depth_stream.index:\n", + " frame = packet.decode()\n", + " if frame:\n", + " for f in frame:\n", + " image = f.to_ndarray(format='gray')\n", + " decoded_stream_depth.append(image)\n", + " elif packet.stream.index == data_stream.index:\n", + " packet_in_bytes = bytes(packet)\n", + " if packet_in_bytes:\n", + " non_dict = pickle.loads(packet_in_bytes)\n", + " decoded_stream_data.append(non_dict)\n", + " else:\n", + " print(\"Unknown stream\")\n", + " print(pkt_counter, len(decoded_stream_1), len(decoded_stream_2), len(decoded_stream_depth), len(decoded_stream_data))\n", + " input_container.close()\n", + "\n", + " \n", + "\n", + "class MKVExporter(BaseExporter):\n", + " def __init__(self):\n", + " super(MKVExporter, self).__init__()\n", + "\n", + " # Function to create a frame from numpy array\n", + " def create_frame(self, image_array, stream):\n", + " frame = av.VideoFrame.from_ndarray(np.array(image_array), format='rgb24')\n", + " frame.pict_type = 'NONE'\n", + " frame.time_base = stream.time_base\n", + " return frame\n", + " \n", + " # Function to create a frame from numpy array\n", + " def create_frame_depth(self, image_array, stream):\n", + " image_array = np.array(image_array)\n", + " # if float, convert to uint8\n", + " if image_array.dtype == np.float32:\n", + " image_array = (image_array * 255).astype(np.uint8)\n", + " # if 3 dim, convert to 2 dim\n", + " if len(image_array.shape) == 3:\n", + " image_array = image_array[:,:,0]\n", + " frame = av.VideoFrame.from_ndarray(image_array, format='gray')\n", + " frame.pict_type = 'NONE'\n", + " frame.time_base = stream.time_base\n", + " return frame\n", + "\n", + " def export(self, loader: BaseLoader, output_path: str):\n", + " # Create an output container\n", + " i = -1\n", + " for trajectory in loader:\n", + " i += 1\n", + " output = av.open(f'{output_path}/output_{i}.mkv', mode='w')\n", + " print(f'{output_path}/output_{i}.mkv')\n", + " # Define video streams (assuming images are 640x480 RGB)\n", + " video_stream_1 = output.add_stream('libx264', rate=1)\n", + " video_stream_1.width = 640\n", + " video_stream_1.height = 480\n", + " video_stream_1.pix_fmt = 'yuv420p'\n", + "\n", + " video_stream_2 = output.add_stream('libx264', rate=1)\n", + " video_stream_2.width = 640\n", + " video_stream_2.height = 480\n", + " video_stream_2.pix_fmt = 'yuv420p'\n", + "\n", + " # Define custom data stream for vectors\n", + " depth_stream = output.add_stream('libx264', rate=1)\n", + "\n", + " data_stream = output.add_stream('rawvideo', rate=1)\n", + "\n", + " ts = 0\n", + " # convert step data to stream\n", + " for step in trajectory:\n", + " obesrvation = step[\"observation\"].copy()\n", + " obesrvation.pop(\"image\")\n", + " obesrvation.pop(\"hand_image\")\n", + " obesrvation.pop(\"image_with_depth\")\n", + " non_image_data_step = step.copy()\n", + " non_image_data_step[\"observation\"] = obesrvation\n", + "\n", + " non_image_data_bytes = pickle.dumps(non_image_data_step)\n", + " packet = av.Packet(non_image_data_bytes)\n", + " packet.stream = data_stream\n", + " packet.pts = ts\n", + " output.mux(packet)\n", + "\n", + " image =np.array(step[\"observation\"][\"image\"])\n", + " # Create a frame from the numpy array\n", + " frame = self.create_frame(image, video_stream_1)\n", + " frame.pts = ts\n", + " packet = video_stream_1.encode(frame)\n", + " \n", + " output.mux(packet)\n", + "\n", + " hand_image =np.array(step[\"observation\"][\"hand_image\"])\n", + " # Create a frame from the numpy array\n", + " frame = self.create_frame(hand_image, video_stream_2)\n", + " frame.pts = ts\n", + " packet = video_stream_2.encode(frame)\n", + " output.mux(packet)\n", + "\n", + " # # Create a frame from the numpy array\n", + " frame = self.create_frame_depth(step[\"observation\"][\"image_with_depth\"], depth_stream)\n", + " # frame.pts = ts\n", + " # Encode the frame\n", + " packet = depth_stream.encode(frame)\n", + " # Write the packet to the output file\n", + " output.mux(packet)\n", + "\n", + " ts += 1\n", + "\n", + " \n", + " # Flush the remaining frames\n", + " for packet in video_stream_1.encode():\n", + " output.mux(packet)\n", + "\n", + " for packet in video_stream_2.encode():\n", + " output.mux(packet)\n", + "\n", + " for packet in depth_stream.encode():\n", + " output.mux(packet)\n", + " output.close()\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "89164adb", + "metadata": {}, + "outputs": [], + "source": [ + "number_of_samples = 1\n", + "def iterate_dataset(loader: BaseLoader, number_of_samples):\n", + " for i, data in enumerate(loader): \n", + " list(dict(data)[\"steps\"])\n", + " if i == number_of_samples:\n", + " break" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "1f55f705", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "I 2024-06-23 06:05:40,410 dataset_info.py:595] Load dataset info from /home/davidh/datasets/berkeley_autolab_ur5/0.1.0\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "I 2024-06-23 06:05:40,525 logging_logger.py:49] Constructing tf.data.Dataset berkeley_autolab_ur5 for split train[:1], from /home/davidh/datasets/berkeley_autolab_ur5/0.1.0\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Exported /home/davidh/fog_x_fork/examples/dataloader/pq_output/output_0.parquet\n" + ] + } + ], + "source": [ + "import os\n", + "import pickle\n", + "import pandas as pd\n", + "import fastparquet as fp\n", + "import numpy as np\n", + "from logging import getLogger\n", + "import fog_x\n", + "\n", + "\n", + "class ParquetExporter(BaseExporter):\n", + " def __init__(self):\n", + " super(ParquetExporter, self).__init__()\n", + " self.logger = getLogger(__name__)\n", + "\n", + " def _serialize(self, data):\n", + " return pickle.dumps(np.array(data))\n", + "\n", + " def _step_data_to_df(self, step_data):\n", + " step = {}\n", + " for dataset_key, element in step_data.items():\n", + " if dataset_key == \"observation\":\n", + " for obs_key, obs_element in element.items():\n", + " step[obs_key] = self._serialize(obs_element)\n", + " elif dataset_key == \"action\":\n", + " for action_key, action_element in element.items():\n", + " step[action_key] = self._serialize(action_element)\n", + " else:\n", + " step[dataset_key] = self._serialize(element)\n", + " return step\n", + "\n", + " def export(self, loader: BaseLoader, output_path: str):\n", + " if not os.path.exists(output_path):\n", + " os.makedirs(output_path)\n", + " i = 0\n", + " for steps_data in loader:\n", + " step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data]\n", + " combined_df = pd.DataFrame(step_data_as_df)\n", + " output_file = os.path.join(output_path, f'output_{i}.parquet')\n", + " fp.write(output_file, combined_df)\n", + " print(f'Exported {output_file}')\n", + " i += 1\n", + "\n", + "\n", + "class ParquetLoader(BaseLoader):\n", + " def __init__(self, data_path):\n", + " super(ParquetLoader, self).__init__(data_path)\n", + " self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.parquet')]\n", + " self.index = 0\n", + "\n", + " def __len__(self):\n", + " return len(self.files)\n", + " \n", + " def __iter__(self):\n", + " return self\n", + "\n", + " def __next__(self):\n", + " if self.index < len(self.files):\n", + " result = self._load_file(self.files[self.index])\n", + " self.index += 1\n", + " return result\n", + " else:\n", + " raise StopIteration\n", + "\n", + " def _parse_parquet_file(self, filename):\n", + " df = fp.ParquetFile(filename).to_pandas()\n", + " steps = [dict(row) for _, row in df.iterrows()]\n", + " return steps\n", + "\n", + "number_of_samples = 1\n", + "\n", + "rtx_loader = RTXLoader(os.path.expanduser(\"~/datasets/berkeley_autolab_ur5/0.1.0\"), split=f'train[:{number_of_samples}]')\n", + "\n", + "output_path = \"/home/davidh/fog_x_fork/examples/dataloader/pq_output\"\n", + "os.makedirs(output_path, exist_ok=True)\n", + "\n", + "exporter = ParquetExporter()\n", + "exporter.export(rtx_loader, output_path)\n", + "\n", + "parquet_loader = ParquetLoader(output_path)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "6b68d1eb", + "metadata": {}, + "outputs": [ + { + "ename": "ImportError", + "evalue": "cannot import name 'BaseExporter' from 'fog_x' (/home/davidh/miniconda3/envs/fog_x_env/lib/python3.8/site-packages/fog_x/__init__.py)", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mImportError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[16], line 7\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mnumpy\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mnp\u001b[39;00m\n\u001b[1;32m 6\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mlogging\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m getLogger\n\u001b[0;32m----> 7\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mfog_x\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m BaseExporter, BaseLoader\n\u001b[1;32m 9\u001b[0m \u001b[38;5;28;01mclass\u001b[39;00m \u001b[38;5;21;01mFeatherExporter\u001b[39;00m(BaseExporter):\n\u001b[1;32m 10\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m__init__\u001b[39m(\u001b[38;5;28mself\u001b[39m):\n", + "\u001b[0;31mImportError\u001b[0m: cannot import name 'BaseExporter' from 'fog_x' (/home/davidh/miniconda3/envs/fog_x_env/lib/python3.8/site-packages/fog_x/__init__.py)" + ] + } + ], + "source": [ + "import os\n", + "import pickle\n", + "import pandas as pd\n", + "import pyarrow.feather as feather\n", + "import numpy as np\n", + "from logging import getLogger\n", + "\n", + "class FeatherExporter(BaseExporter):\n", + " def __init__(self):\n", + " super(FeatherExporter, self).__init__()\n", + " self.logger = getLogger(__name__)\n", + "\n", + " def _serialize(self, data):\n", + " return pickle.dumps(np.array(data))\n", + "\n", + " def _step_data_to_df(self, step_data):\n", + " step = {}\n", + " for dataset_key, element in step_data.items():\n", + " if dataset_key == \"observation\":\n", + " for obs_key, obs_element in element.items():\n", + " step[obs_key] = self._serialize(obs_element)\n", + " elif dataset_key == \"action\":\n", + " for action_key, action_element in element.items():\n", + " step[action_key] = self._serialize(action_element)\n", + " else:\n", + " step[dataset_key] = self._serialize(element)\n", + " return step\n", + "\n", + " def export(self, loader: BaseLoader, output_path: str):\n", + " if not os.path.exists(output_path):\n", + " os.makedirs(output_path)\n", + " i = 0\n", + " for steps_data in loader:\n", + " step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data]\n", + " combined_df = pd.DataFrame(step_data_as_df)\n", + " output_file = os.path.join(output_path, f'output_{i}.feather')\n", + " feather.write_feather(combined_df, output_file)\n", + " print(f'Exported {output_file}')\n", + " i += 1\n", + "import os\n", + "import pickle\n", + "import pandas as pd\n", + "import pyarrow.feather as feather\n", + "import numpy as np\n", + "from logging import getLogger\n", + "\n", + "class FeatherLoader(BaseLoader):\n", + " def __init__(self, data_path):\n", + " super(FeatherLoader, self).__init__(data_path)\n", + " self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.feather')]\n", + " self.index = 0\n", + "\n", + " def __len__(self):\n", + " return len(self.files)\n", + " \n", + " def __iter__(self):\n", + " return self\n", + "\n", + " def __next__(self):\n", + " if self.index < len(self.files):\n", + " result = self._load_file(self.files[self.index])\n", + " self.index += 1\n", + " return result\n", + " else:\n", + " raise StopIteration\n", + "\n", + " def _load_file(self, filename):\n", + " df = feather.read_feather(filename)\n", + " steps = [dict(row) for _, row in df.iterrows()]\n", + " return steps\n", + "import os\n", + "from fog_x import RTXLoader\n", + "\n", + "number_of_samples = 1\n", + "\n", + "rtx_loader = RTXLoader(os.path.expanduser(\"~/datasets/berkeley_autolab_ur5/0.1.0\"), split=f'train[:{number_of_samples}]')\n", + "\n", + "output_path = os.path.expanduser(\"~\") + \"/fog_x/examples/dataloader/feather_output/\"\n", + "os.makedirs(output_path, exist_ok=True)\n", + "exporter = FeatherExporter()\n", + "exporter.export(rtx_loader, output_path)\n", + "\n", + "feather_loader = FeatherLoader(output_path)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "7364020c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "218424901" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "os.path.getsize(\"/home/davidh/fog_x_fork/examples/dataloader/pq_output/output_0.parquet\")" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "d4566c75", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "I 2024-06-23 06:09:07,749 dataset_info.py:595] Load dataset info from /home/davidh/datasets/berkeley_autolab_ur5/0.1.0\n", + "I 2024-06-23 06:09:07,996 logging_logger.py:49] Constructing tf.data.Dataset berkeley_autolab_ur5 for split train[:1], from /home/davidh/datasets/berkeley_autolab_ur5/0.1.0\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Exported /home/davidh/fog_x/examples/dataloader/feather_output/output_0.feather\n" + ] + } + ], + "source": [ + "import os\n", + "import pickle\n", + "import pandas as pd\n", + "import pyarrow.feather as feather\n", + "import numpy as np\n", + "from logging import getLogger\n", + "\n", + "\n", + "class FeatherExporter(BaseExporter):\n", + " def __init__(self):\n", + " super(FeatherExporter, self).__init__()\n", + " self.logger = getLogger(__name__)\n", + "\n", + " def _serialize(self, data):\n", + " return pickle.dumps(np.array(data))\n", + "\n", + " def _step_data_to_df(self, step_data):\n", + " step = {}\n", + " for dataset_key, element in step_data.items():\n", + " if dataset_key == \"observation\":\n", + " for obs_key, obs_element in element.items():\n", + " step[obs_key] = self._serialize(obs_element)\n", + " elif dataset_key == \"action\":\n", + " for action_key, action_element in element.items():\n", + " step[action_key] = self._serialize(action_element)\n", + " else:\n", + " step[dataset_key] = self._serialize(element)\n", + " return step\n", + "\n", + " def export(self, loader: BaseLoader, output_path: str):\n", + " if not os.path.exists(output_path):\n", + " os.makedirs(output_path)\n", + " i = 0\n", + " for steps_data in loader:\n", + " step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data]\n", + " combined_df = pd.DataFrame(step_data_as_df)\n", + " output_file = os.path.join(output_path, f'output_{i}.feather')\n", + " feather.write_feather(combined_df, output_file)\n", + " print(f'Exported {output_file}')\n", + " i += 1\n", + "\n", + "class FeatherLoader(BaseLoader):\n", + " def __init__(self, data_path):\n", + " super(FeatherLoader, self).__init__(data_path)\n", + " self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.feather')]\n", + " self.index = 0\n", + "\n", + " def __len__(self):\n", + " return len(self.files)\n", + " \n", + " def __iter__(self):\n", + " return self\n", + "\n", + " def __next__(self):\n", + " if self.index < len(self.files):\n", + " result = self._load_file(self.files[self.index])\n", + " self.index += 1\n", + " return result\n", + " else:\n", + " raise StopIteration\n", + "\n", + " def _load_file(self, filename):\n", + " df = feather.read_feather(filename)\n", + " steps = [dict(row) for _, row in df.iterrows()]\n", + " return steps\n", + "\n", + "number_of_samples = 1\n", + "\n", + "rtx_loader = RTXLoader(os.path.expanduser(\"~/datasets/berkeley_autolab_ur5/0.1.0\"), split=f'train[:{number_of_samples}]')\n", + "\n", + "output_path = os.path.expanduser(\"~\") + \"/fog_x/examples/dataloader/feather_output/\"\n", + "os.makedirs(output_path, exist_ok=True)\n", + "\n", + "exporter = FeatherExporter()\n", + "exporter.export(rtx_loader, output_path)\n", + "\n", + "feather_loader = FeatherLoader(output_path)\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "53c1529a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "91502858" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "os.path.getsize(\"/home/davidh/fog_x/examples/dataloader/feather_output/output_0.feather\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1d56f44d", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "class ParquetLoader():\n", + " def __init__(self, data_path):\n", + " super(ParquetLoader, self).__init__()\n", + " self.data_dir = data_path\n", + " self.logger = getLogger(__name__)\n", + "\n", + " def __len__(self):\n", + " raise NotImplementedError\n", + "\n", + " def __iter___(self):\n", + " raise NotImplementedError\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "80867561", + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "exporter = MKVExporter()\n", + "output_path = \"/home/davidh/fog_x_fork/examples/dataloader/mkv_output/\"\n", + "exporter.export(rtx_loader, output_path)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "fcfc1bdb", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/home/davidh/fog_x_fork/examples/dataloader/mkv_output/output_11.mkv\n", + "408 101 101 101 101\n", + "/home/davidh/fog_x_fork/examples/dataloader/mkv_output/output_16.mkv\n", + "436 108 108 108 108\n" + ] + } + ], + "source": [ + "mkv_loader = MKVLoader(output_path)\n", + "def iterate_dataset(loader: BaseLoader, number_of_samples = 50):\n", + " for i, data in enumerate(loader): \n", + " if i == number_of_samples:\n", + " break\n", + "iterate_dataset(mkv_loader, number_of_samples)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0822a45a", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "30ba398b", + "metadata": {}, + "outputs": [ + { + "ename": "ImportError", + "evalue": "cannot import name 'RTXLoader' from 'fog_x' (/home/davidh/miniconda3/envs/fog_x_env/lib/python3.8/site-packages/fog_x/__init__.py)", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mImportError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[9], line 7\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mpyarrow\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mparquet\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mpq\u001b[39;00m\n\u001b[1;32m 6\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mnumpy\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mnp\u001b[39;00m\n\u001b[0;32m----> 7\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mfog_x\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m RTXLoader, BaseLoader, BaseExporter\n\u001b[1;32m 9\u001b[0m \u001b[38;5;28;01mclass\u001b[39;00m \u001b[38;5;21;01mParquetExporter\u001b[39;00m(BaseExporter):\n\u001b[1;32m 10\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21m__init__\u001b[39m(\u001b[38;5;28mself\u001b[39m):\n", + "\u001b[0;31mImportError\u001b[0m: cannot import name 'RTXLoader' from 'fog_x' (/home/davidh/miniconda3/envs/fog_x_env/lib/python3.8/site-packages/fog_x/__init__.py)" + ] + } + ], + "source": [ + "import os\n", + "import pickle\n", + "import pandas as pd\n", + "import pyarrow as pa\n", + "import pyarrow.parquet as pq\n", + "import numpy as np\n", + "from fog_x import RTXLoader, BaseLoader, BaseExporter\n", + "\n", + "class ParquetExporter(BaseExporter):\n", + " def __init__(self):\n", + " super(ParquetExporter, self).__init__()\n", + "\n", + " def _serialize(self, data):\n", + " return pickle.dumps(np.array(data))\n", + "\n", + " def _step_data_to_df(self, step_data):\n", + " step = {}\n", + " for dataset_key, element in step_data.items():\n", + " if dataset_key == \"observation\":\n", + " for obs_key, obs_element in element.items():\n", + " step[obs_key] = self._serialize(obs_element)\n", + " elif dataset_key == \"action\":\n", + " for action_key, action_element in element.items():\n", + " step[action_key] = self._serialize(action_element)\n", + " else:\n", + " step[dataset_key] = self._serialize(element)\n", + " return step\n", + "\n", + " def export(self, loader: BaseLoader, output_path: str):\n", + " for steps_data in loader:\n", + " step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data]\n", + " combined_df = pd.DataFrame(step_data_as_df)\n", + " table = pa.Table.from_pandas(combined_df)\n", + " pq.write_table(table, output_path)\n", + "\n", + "class ParquetLoader(BaseLoader):\n", + " def __init__(self, data_path):\n", + " super(ParquetLoader, self).__init__(data_path)\n", + " self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.parquet')]\n", + " self.index = 0\n", + "\n", + " def __len__(self):\n", + " return len(self.files)\n", + " \n", + " def __iter__(self):\n", + " return self\n", + "\n", + " def __next__(self):\n", + " if self.index < len(self.files):\n", + " result = self._load_file(self.files[self.index])\n", + " self.index += 1\n", + " return result\n", + " else:\n", + " raise StopIteration\n", + "\n", + " def _load_file(self, filename):\n", + " table = pq.read_table(filename)\n", + " df = table.to_pandas()\n", + " steps = [dict(row) for _, row in df.iterrows()]\n", + " return steps\n", + "\n", + "# Example usage\n", + "rtx_loader = RTXLoader(os.path.expanduser(\"~/datasets/berkeley_autolab_ur5/0.1.0\"), split='train[:51]')\n", + "exporter = ParquetExporter()\n", + "exporter.export(rtx_loader, \"output.parquet\")\n", + "\n", + "parquet_loader = ParquetLoader(\"/home/davidh/fog_x_fork/benchmarking/\")\n", + "print(parquet_loader._load_file(\"/home/davidh/fog_x_fork/benchmarking/output.parquet\"))\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "359bb163", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.19" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 36169c5f325314405591629b6e851d002517c222 Mon Sep 17 00:00:00 2001 From: davidh Date: Sun, 23 Jun 2024 07:56:28 +0000 Subject: [PATCH 2/3] loaders --- benchmarking/tf_record.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/benchmarking/tf_record.py b/benchmarking/tf_record.py index 4bbfe42..b1e4a84 100644 --- a/benchmarking/tf_record.py +++ b/benchmarking/tf_record.py @@ -3,7 +3,8 @@ import time import numpy as np -PATH = os.path.expanduser("~") +PATH = "/shared/ryanhoque/datasets/datasets/" +os.path.expanduser("~") sys.path.append(PATH + "/fog_x_fork") os.environ['TF_CPP_MIN_LOG_LEVEL'] = "2" From b980f7f0064b084e49d2c2b58b59b0c6632734d6 Mon Sep 17 00:00:00 2001 From: davidh Date: Sun, 23 Jun 2024 08:06:07 +0000 Subject: [PATCH 3/3] loaders refactor --- benchmarking/pq_arrow_loaders.py | 259 +++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) create mode 100644 benchmarking/pq_arrow_loaders.py diff --git a/benchmarking/pq_arrow_loaders.py b/benchmarking/pq_arrow_loaders.py new file mode 100644 index 0000000..9f3b3ea --- /dev/null +++ b/benchmarking/pq_arrow_loaders.py @@ -0,0 +1,259 @@ +import os +import time +from logging import getLogger +import numpy as np +import pickle +import av +import pandas as pd +import pyarrow.feather as feather +import pyarrow as pa +import pyarrow.parquet as pq +import tensorflow_datasets as tfds +import tensorflow as tf +import fastparquet as fp + +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # or any {'0', '1', '2'} + +class BaseLoader(): + def __init__(self, data_path): + super(BaseLoader, self).__init__() + self.data_dir = data_path + self.logger = getLogger(__name__) + self.index = 0 + + def __len__(self): + raise NotImplementedError + + def __iter___(self): + raise NotImplementedError + + +class BaseExporter(): + def __init__(self): + super(BaseExporter, self).__init__() + self.logger = getLogger(__name__) + + def export(self, loader: BaseLoader, output_path: str): + raise NotImplementedError + +class RTXLoader(BaseLoader): + def __init__(self, data_path, split): + super(RTXLoader, self).__init__(data_path) + builder = tfds.builder_from_directory(data_path) + self.ds = builder.as_dataset(split=split) + + def __len__(self): + return len(self.ds) + + def __next__(self): + if self.index < len(self.ds): + self.index += 1 + nested_dataset = self.ds.__iter__() + trajectory = list(nested_dataset)[0]["steps"] + ret = [] + for step_data in trajectory: + step = {} + for dataset_key, element in step_data.items(): + if dataset_key == "observation": + step["observation"] = {} + for obs_key, obs_element in element.items(): + step["observation"][obs_key] = np.array(obs_element) + elif dataset_key == "action": + step["action"] = {} + for action_key, action_element in element.items(): + step["action"][action_key] = np.array(action_element) + else: + step[dataset_key] = np.array(element) + ret.append(step) + return ret + else: + raise StopIteration + + def __iter__(self): + return self + +class RTXExporter(BaseExporter): + def __init__(self): + super(RTXExporter, self).__init__() + + def export(self, loader: BaseLoader, output_path: str): + raise NotImplementedError + + +class ParquetExporter(BaseExporter): + def __init__(self): + super(ParquetExporter, self).__init__() + self.logger = getLogger(__name__) + + def _serialize(self, data): + return pickle.dumps(np.array(data)) + + def _step_data_to_df(self, step_data): + step = {} + for dataset_key, element in step_data.items(): + if dataset_key == "observation": + for obs_key, obs_element in element.items(): + step[obs_key] = self._serialize(obs_element) + elif dataset_key == "action": + for action_key, action_element in element.items(): + step[action_key] = self._serialize(action_element) + else: + step[dataset_key] = self._serialize(element) + return step + + def export(self, loader: BaseLoader, output_path: str): + if not os.path.exists(output_path): + os.makedirs(output_path) + i = 0 + for steps_data in loader: + step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data] + combined_df = pd.DataFrame(step_data_as_df) + output_file = os.path.join(output_path, f'output_{i}.parquet') + fp.write(output_file, combined_df) + print(f'Exported {output_file}') + i += 1 + +class ParquetLoader(BaseLoader): + def __init__(self, data_path): + super(ParquetLoader, self).__init__(data_path) + self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.parquet')] + self.index = 0 + + def __len__(self): + return len(self.files) + + def __iter__(self): + return self + + def __next__(self): + if self.index < len(self.files): + result = self._load_file(self.files[self.index]) + self.index += 1 + return result + else: + raise StopIteration + + def _load_file(self, filename): + table = pq.read_table(filename) + df = table.to_pandas() + steps = [dict(row) for _, row in df.iterrows()] + return steps + +class FeatherExporter(BaseExporter): + def __init__(self): + super(FeatherExporter, self).__init__() + self.logger = getLogger(__name__) + + def _serialize(self, data): + return pickle.dumps(np.array(data)) + + def _step_data_to_df(self, step_data): + step = {} + for dataset_key, element in step_data.items(): + if dataset_key == "observation": + for obs_key, obs_element in element.items(): + step[obs_key] = self._serialize(obs_element) + elif dataset_key == "action": + for action_key, action_element in element.items(): + step[action_key] = self._serialize(action_element) + else: + step[dataset_key] = self._serialize(element) + return step + + def export(self, loader: BaseLoader, output_path: str): + if not os.path.exists(output_path): + os.makedirs(output_path) + i = 0 + for steps_data in loader: + step_data_as_df = [self._step_data_to_df(step_data) for step_data in steps_data] + combined_df = pd.DataFrame(step_data_as_df) + output_file = os.path.join(output_path, f'output_{i}.feather') + feather.write_feather(combined_df, output_file) + print(f'Exported {output_file}') + i += 1 + +class FeatherLoader(BaseLoader): + def __init__(self, data_path): + super(FeatherLoader, self).__init__(data_path) + self.files = [os.path.join(data_path, f) for f in os.listdir(data_path) if f.endswith('.feather')] + self.index = 0 + + def __len__(self): + return len(self.files) + + def __iter__(self): + return self + + def __next__(self): + if self.index < len(self.files): + result = self._load_file(self.files[self.index]) + self.index += 1 + return result + else: + raise StopIteration + + def _load_file(self, filename): + df = feather.read_feather(filename) + steps = [dict(row) for _, row in df.iterrows()] + return steps + +def iterate_parquet(rtx_loader, parquet_exporter, n): + read_time, write_time, data_size = 0, 0, 0 + path = os.path.expanduser("~/fog_x_fork/examples/dataloader/pq_output/") + os.makedirs(path, exist_ok=True) + + stop = time.time() + parquet_exporter.export(rtx_loader, path) + write_time += time.time() - stop + + parquet_loader = ParquetLoader(path) + + stop = time.time() + for i, data in enumerate(parquet_loader): + print(f"Reading trajectory {i}") + if i == n: break + read_time += time.time() - stop + + for i in range(n): + data_size += os.path.getsize(f"{path}/output_{i}.parquet") + + return read_time, write_time, data_size / 1024**2 + +def iterate_feather(rtx_loader, feather_exporter, n): + read_time, write_time, data_size = 0, 0, 0 + path = os.path.expanduser("~/fog_x/examples/dataloader/feather_output/") + os.makedirs(path, exist_ok=True) + + stop = time.time() + feather_exporter.export(rtx_loader, path) + write_time += time.time() - stop + + feather_loader = FeatherLoader(path) + + stop = time.time() + for i, data in enumerate(feather_loader): + print(f"Reading trajectory {i}") + if i == n: break + read_time += time.time() - stop + + for i in range(n): + data_size += os.path.getsize(f"{path}/output_{i}.feather") + + return read_time, write_time, data_size / 1024**2 + +# parquet use case +rtx_loader = RTXLoader(os.path.expanduser("~/datasets/berkeley_autolab_ur5/0.1.0"), split='train[:51]') +parquet_exporter = ParquetExporter() +rt, wt, mb = iterate_parquet(rtx_loader, parquet_exporter, 51) + +print(f"\nParquet Data: \nMem. size = {mb:.4f} MB; Num. traj = 51") +print(f"Read: latency = {rt:.4f} s; throughput = {mb / rt :.4f} MB/s, {51 / rt :.4f} traj/s") +print(f"Write: latency = {wt:.4f} s; throughput = {mb / wt :.4f} MB/s, {51 / wt :.4f} traj/s") + +# feather pyarrow version use case +feather_exporter = FeatherExporter() +rt, wt, mb = iterate_feather(rtx_loader, feather_exporter, 51) + +print(f"\nFeather Data: \nMem. size = {mb:.4f} MB; Num. traj = 51") +print(f"Read: latency = {rt:.4f} s; throughput = {mb / rt :.4f} MB/s, {51 / rt :.4f} traj/s") +print(f"Write: latency = {wt:.4f} s; throughput = {mb / wt :.4f} MB/s, {51 / wt :.4f} traj/s")