Skip to content

Commit

Permalink
Adding streams, pagination for non-export endpoitns
Browse files Browse the repository at this point in the history
  • Loading branch information
cjohnhanson committed Jul 17, 2024
1 parent 933d51e commit d792328
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 48 deletions.
48 changes: 26 additions & 22 deletions meltano.yml
Original file line number Diff line number Diff line change
@@ -1,41 +1,45 @@
version: 1
send_anonymous_usage_stats: true
project_id: "tap-service-titan"
project_id: tap-service-titan
default_environment: test
environments:
- name: test
plugins:
extractors:
- name: "tap-service-titan"
namespace: "tap_service_titan"
executable: "tap-service-titan"
- name: tap-service-titan
namespace: tap_service_titan
pip_url: -e .
executable: tap-service-titan
capabilities:
- state
- catalog
- discover
- about
- stream-maps
config:
api_url: 'https://api-integration.servicetitan.io'
auth_url: 'https://auth-integration.servicetitan.io/connect/token'
settings:
- name: client_id
kind: string
sensitive: true
- name: client_secret
kind: string
sensitive: true
- name: st_app_key
kind: string
sensitive: true
- name: tenant_id
kind: string
- name: api_url
kind: string
- name: auth_url
kind: string
- name: client_id
kind: string
sensitive: true
- name: client_secret
kind: string
sensitive: true
- name: st_app_key
kind: string
sensitive: true
- name: tenant_id
kind: string
- name: api_url
kind: string
- name: auth_url
kind: string
config:
api_url: https://api-integration.servicetitan.io
auth_url: https://auth-integration.servicetitan.io/connect/token
tenant_id: '1109320760'
loaders:
- name: target-jsonl
variant: andyh1203
pip_url: target-jsonl
- name: target-csv
variant: meltanolabs
pip_url: git+https://github.com/MeltanoLabs/target-csv.git
70 changes: 59 additions & 11 deletions tap_service_titan/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import requests
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseAPIPaginator # noqa: TCH002
from singer_sdk.pagination import BaseAPIPaginator, BasePageNumberPaginator # noqa: TCH002
from singer_sdk.streams import RESTStream

from tap_service_titan.auth import ServiceTitanAuthenticator
Expand All @@ -25,8 +25,8 @@
SCHEMAS_DIR = importlib_resources.files(__package__) / "schemas"


class ServiceTitanStream(RESTStream):
"""ServiceTitan stream class."""
class ServiceTitanBaseStream(RESTStream):
"""ServiceTitan base stream class."""

@property
def url_base(self) -> str:
Expand All @@ -35,8 +35,6 @@ def url_base(self) -> str:

records_jsonpath = "$.data[*]" # Or override `parse_response`.

next_page_token_jsonpath = "$.continueFrom" # noqa: S105

@cached_property
def authenticator(self) -> _Auth:
"""Return a new authenticator object.
Expand Down Expand Up @@ -74,6 +72,23 @@ def get_new_paginator(self) -> BaseAPIPaginator:
"""
return super().get_new_paginator()

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result records.
Args:
response: The HTTP ``requests.Response`` object.
Yields:
Each record from the source.
"""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())


class ServiceTitanExportStream(ServiceTitanBaseStream):
"""ServiceTitan stream class for export endpoints."""

next_page_token_jsonpath = "$.continueFrom" # noqa: S105

def get_url_params(
self,
context: dict | None,
Expand Down Expand Up @@ -104,13 +119,46 @@ def get_url_params(

return params

def parse_response(self, response: requests.Response) -> Iterable[dict]:
"""Parse the response and return an iterator of result records.

class ServiceTitanPaginator(BasePageNumberPaginator):
"""ServiceTitan paginator class."""

def has_more(self, response: Response) -> bool:
"""Return True if there are more pages available."""
return response.json().get("hasMore", False)


class ServiceTitanStream(ServiceTitanBaseStream):
"""ServiceTitan stream class for endpoints without export support."""

def get_url_params(
self,
context: dict | None,
next_page_token: Any | None, # noqa: ANN401
) -> dict[str, Any]:
"""Return a dictionary of values to be used in URL parameterization.
Args:
response: The HTTP ``requests.Response`` object.
context: The stream context.
next_page_token: The next page index or value.
Yields:
Each record from the source.
Returns:
A dictionary of URL query parameters.
"""
yield from extract_jsonpath(self.records_jsonpath, input=response.json())
params: dict = {}
starting_date = self.get_starting_timestamp(context)

# The Service Titan API uses the "from" param for both continuation tokens
# and for the starting timestamp for the first request of an export
if self.replication_key and starting_date:
# "from" param is inclusive of start date
# this prevents duplicating of single record in each run
starting_date += timedelta(milliseconds=1)
params["modifiedOnOrAfter"] = starting_date.isoformat()
params["page"] = next_page_token

return params

def get_new_paginator(self) -> ServiceTitanPaginator:
"""Create a new pagination helper instance."""
return ServiceTitanPaginator(start_value=1)
Loading

0 comments on commit d792328

Please sign in to comment.