Skip to content

Commit

Permalink
fix(data-warehouse): Parse the datetime for vitally sources correctly (
Browse files Browse the repository at this point in the history
  • Loading branch information
Gilbert09 authored Dec 31, 2024
1 parent 729c8ab commit 9b81d45
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions posthog/temporal/data_imports/pipelines/vitally/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
from datetime import datetime
from dateutil import parser
from typing import Any, Optional
import dlt
Expand Down Expand Up @@ -31,7 +32,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down Expand Up @@ -60,7 +61,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down Expand Up @@ -88,7 +89,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down Expand Up @@ -116,7 +117,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down Expand Up @@ -144,7 +145,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down Expand Up @@ -172,7 +173,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down Expand Up @@ -200,7 +201,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down Expand Up @@ -228,7 +229,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down Expand Up @@ -256,7 +257,7 @@ def get_resource(name: str, is_incremental: bool) -> EndpointResource:
"type": "incremental",
"cursor_path": "updatedAt",
"initial_value": "1970-01-01", # type: ignore
"convert": lambda x: parser.parse(x).timestamp(),
"convert": lambda x: parser.parse(x).timestamp() if not isinstance(x, datetime) else x,
}
if is_incremental
else None,
Expand Down

0 comments on commit 9b81d45

Please sign in to comment.