Skip to content

Commit

Permalink
Dispatch streams coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
cjohnhanson committed Nov 15, 2024
1 parent bed07e0 commit 4215ce2
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 1 deletion.
156 changes: 155 additions & 1 deletion tap_service_titan/streams/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from singer_sdk.helpers.jsonpath import extract_jsonpath
from singer_sdk.pagination import BaseAPIPaginator

from tap_service_titan.client import ServiceTitanStream
from tap_service_titan.client import ServiceTitanStream, ServiceTitanExportStream

if t.TYPE_CHECKING:
import requests
Expand Down Expand Up @@ -104,3 +104,157 @@ def prepare_request_payload(
def path(self) -> str:
"""Return the API path for the stream."""
return f"/dispatch/v2/tenant/{self._tap.config['tenant_id']}/capacity"


class ArrivalWindowsStream(ServiceTitanStream):
"""Define arrival windows stream."""

name = "arrival_windows"
primary_keys: t.ClassVar[list[str]] = ["id"]

schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("start", th.StringType),
th.Property("duration", th.StringType),
th.Property("businessUnitIds", th.ArrayType(th.IntegerType)),
th.Property("active", th.BooleanType),
).to_dict()

@cached_property
def path(self) -> str:
"""Return the API path for the stream."""
return f"/dispatch/v2/tenant/{self._tap.config['tenant_id']}/arrival-windows"


class AppointmentAssignmentsStream(ServiceTitanExportStream):
"""Define appointment assignments stream."""

name = "appointment_assignments"
primary_keys: t.ClassVar[list[str]] = ["id"]
replication_key: str = "assignedOn"

schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("technicianId", th.IntegerType),
th.Property("technicianName", th.StringType),
th.Property("assignedById", th.IntegerType),
th.Property("assignedOn", th.DateTimeType),
th.Property("status", th.StringType),
th.Property("isPaused", th.BooleanType),
th.Property("jobId", th.IntegerType),
th.Property("appointmentId", th.IntegerType),
).to_dict()

@cached_property
def path(self) -> str:
"""Return the API path for the stream."""
return f"/dispatch/v2/tenant/{self._tap.config['tenant_id']}/export/appointment-assignments"


class NonJobAppointmentsStream(ServiceTitanStream):
"""Define non job appointments stream."""

name = "non_job_appointments"
primary_keys: t.ClassVar[list[str]] = ["id"]
replication_key: str = "createdOn"

schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("technicianId", th.IntegerType),
th.Property("start", th.DateTimeType),
th.Property("name", th.StringType),
th.Property("duration", th.StringType),
th.Property("timesheetCodeId", th.IntegerType),
th.Property("summary", th.StringType),
th.Property("clearDispatchBoard", th.BooleanType),
th.Property("clearTechnicianView", th.BooleanType),
th.Property("removeTechnicianFromCapacityPlanning", th.BooleanType),
th.Property("allDay", th.BooleanType),
th.Property("showOnTechnicianSchedule", th.BooleanType),
th.Property("active", th.BooleanType),
th.Property("createdOn", th.DateTimeType),
th.Property("createdById", th.IntegerType),
).to_dict()

@cached_property
def path(self) -> str:
"""Return the API path for the stream."""
return (
f"/dispatch/v2/tenant/{self._tap.config['tenant_id']}/non-job-appointments"
)


class TeamsStream(ServiceTitanStream):
"""Define teams stream."""

name = "teams"
primary_keys: t.ClassVar[list[str]] = ["id"]
replication_key: str = "modifiedOn"

schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("active", th.BooleanType),
th.Property("name", th.StringType),
th.Property("createdBy", th.IntegerType),
th.Property("createdOn", th.DateTimeType),
th.Property("modifiedOn", th.DateTimeType),
).to_dict()

@cached_property
def path(self) -> str:
"""Return the API path for the stream."""
return f"/dispatch/v2/tenant/{self._tap.config['tenant_id']}/teams"


class TechnicianShiftsStream(ServiceTitanStream):
"""Define technician shifts stream."""

name = "technician_shifts"
primary_keys: t.ClassVar[list[str]] = ["id"]

schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("shiftType", th.StringType),
th.Property("title", th.StringType),
th.Property("note", th.StringType),
th.Property("active", th.BooleanType),
th.Property("technicianId", th.IntegerType),
th.Property("start", th.DateTimeType),
th.Property("end", th.DateTimeType),
th.Property("timesheetCodeId", th.IntegerType),
).to_dict()

@cached_property
def path(self) -> str:
"""Return the API path for the stream."""
return f"/dispatch/v2/tenant/{self._tap.config['tenant_id']}/technician-shifts"


class ZonesStream(ServiceTitanStream):
"""Define zones stream."""

name = "zones"
primary_keys: t.ClassVar[list[str]] = ["id"]
replication_key: str = "modifiedOn"

schema = th.PropertiesList(
th.Property("id", th.IntegerType),
th.Property("active", th.BooleanType),
th.Property("name", th.StringType),
th.Property("zips", th.ArrayType(th.StringType)),
th.Property("cities", th.ArrayType(th.StringType)),
th.Property("territoryNumbers", th.ArrayType(th.StringType)),
th.Property("locnNumbers", th.ArrayType(th.StringType)),
th.Property("createdBy", th.IntegerType),
th.Property("createdOn", th.DateTimeType),
th.Property("modifiedOn", th.DateTimeType),
th.Property("serviceDaysEnabled", th.BooleanType),
th.Property("serviceDays", th.ArrayType(th.StringType)),
th.Property("businessUnits", th.ArrayType(th.IntegerType)),
th.Property("technicians", th.ArrayType(th.IntegerType)),
).to_dict()

@cached_property
def path(self) -> str:
"""Return the API path for the stream."""
return f"/dispatch/v2/tenant/{self._tap.config['tenant_id']}/zones"
7 changes: 7 additions & 0 deletions tap_service_titan/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ def discover_streams(self) -> list[ServiceTitanStream]:
streams.crm.LocationContactsStream(self),
streams.crm.LocationsStream(self),
streams.dispatch.CapacitiesStream(self),
streams.dispatch.AppointmentAssignmentsStream(self),
streams.dispatch.ArrivalWindowsStream(self),
streams.dispatch.AppointmentAssignmentsStream(self),
streams.dispatch.NonJobAppointmentsStream(self),
streams.dispatch.TeamsStream(self),
streams.dispatch.TechnicianShiftsStream(self),
streams.dispatch.ZonesStream(self),
streams.inventory.AdjustmentsStream(self),
streams.inventory.ReturnTypesStream(self),
streams.inventory.TransfersStream(self),
Expand Down

0 comments on commit 4215ce2

Please sign in to comment.