diff --git a/posthog/temporal/data_imports/pipelines/vitally/__init__.py b/posthog/temporal/data_imports/pipelines/vitally/__init__.py index 3f070c48653f2..5be4c5ce9ed2c 100644 --- a/posthog/temporal/data_imports/pipelines/vitally/__init__.py +++ b/posthog/temporal/data_imports/pipelines/vitally/__init__.py @@ -1,4 +1,5 @@ import base64 +from datetime import datetime from dateutil import parser from typing import Any, Optional import dlt @@ -31,7 +32,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None, @@ -60,7 +61,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None, @@ -88,7 +89,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None, @@ -116,7 +117,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None, @@ -144,7 +145,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None, @@ -172,7 +173,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None, @@ -200,7 +201,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None, @@ -228,7 +229,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None, @@ -256,7 +257,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource: "type": "incremental", "cursor_path": "updatedAt", "initial_value": "1970-01-01", # type: ignore - "convert": lambda x: parser.parse(x).timestamp(), + "convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x, } if is_incremental else None,