diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a0298e3..da692d2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,7 +43,7 @@ jobs: ecosystem-test: needs: unit-test - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: matrix: python-version: ['3.7', '3.8', '3.9', '3.10', '3.12'] diff --git a/CHANGELOG.md b/CHANGELOG.md index ef1d423..e05a469 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +## 0.1.12 (2025-01-23) + +- Add support for folder API +- Update `create_chunks` method to handle columnar format only (headers + inputs) +- Remove `None` values from the request payload (e.g., `Services.execute()`) +- Support extra metadata in the request payload (e.g., `Services.execute(extras={...})`) +- Update documentation and examples + ## 0.1.11 (2024-12-23) - Apply bug fixes and enhancements diff --git a/README.md b/README.md index 6563d52..d52ae13 100644 --- a/README.md +++ b/README.md @@ -235,8 +235,8 @@ OAuth2.0 Client Credentials flow: [ImpEx API](./docs/impex.md) - imports and exports Spark services: -- `Spark.impex.export(data)` exports Spark entities (versions, services, or folders). -- `Spark.impex.import_(data)` imports previously exported Spark entities into the platform. +- `Spark.impex.exp(data)` exports Spark entities (versions, services, or folders). +- `Spark.impex.imp(data)` imports previously exported Spark entities into the platform. [Other APIs](./docs/misc.md) - for other functionality: diff --git a/docs/batches.md b/docs/batches.md index ee44691..fd0889b 100644 --- a/docs/batches.md +++ b/docs/batches.md @@ -14,28 +14,25 @@ | `Spark.batches.of(id).dispose()` | [Close a batch pipeline](#close-a-batch-pipeline).| | `Spark.batches.of(id).cancel()` | [Cancel a batch pipeline](#cancel-a-batch-pipeline).| -The [Batches API][batch-apis] (in Beta) offers a set of endpoints that facilitate -executing a Spark service for a large volume of input data. Spark provides a dedicated -infrastructure specifically designed for parallel processing, capable of scaling up -or down based on the data volume that needs to be processed. - -Utilizing this API will ensure optimal performance and scalability for your data -processing tasks. +[Batch API][batch-apis] provides a series of endpoints for processing high-volume +data through Spark services. Built on a dedicated infrastructure, it enables efficient +parallel processing that automatically scales based on workload demands. This API delivers +optimal performance by dynamically adjusting computational resources to match your +data processing needs. > [!NOTE] -> It should be noted that the Batches API is highly recommended when dealing with -> datasets consisting of more than 10,000 records, with a calculation time longer than 500ms. -> Unless you have specific requirements or reasons to use a different approach, -> such as [Services API](./services.md) as an alternative, this API is the way to go. +> Batch API is the optimal choice for processing large datasets with over 10,000 records +> or calculations exceeding 500ms. While alternatives like the [Services API](./services.md) exist, +> the Batch APIs provide the most efficient solution for high-volume data processing unless +> specific requirements dictate otherwise. -For more information on the Batches API and its endpoints, refer to the [API reference][batch-apis]. +For more information on the Batch API and its endpoints, refer to the [API reference][batch-apis]. ## Describe batch pipelines across a tenant -This method retrieves detailed info on recently run batch pipelines across a tenant. -It helps you keep track of existing batches and their statuses. Remember that -this will only provide information about batches that are in progress or recently -completed (i.e., within the past hour). +This method provides a comprehensive overview of batch pipelines executed within a tenant. +It allows you to monitor and track both active and recently completed batches (within the +last hour). Use this method to efficiently manage and analyze your batch processing operations. ### Arguments @@ -95,19 +92,20 @@ been granted access (e.g., `supervisor:pf`) to view other users' batches. ## Create a new batch pipeline -This method allows you to start a new batch pipeline, which is a necessary step -before you can perform any operations on it. +This method initializes a new batch pipeline, a prerequisite step for any data +processing operations. > [!IMPORTANT] -> It is good practice to retain the `id` of the newly created pipeline. -> This identifier will be used to reference the pipeline in subsequent operations. +> Store the returned pipeline `id` securely. You'll need this unique identifier +> for all subsequent operations on this pipeline. ### Arguments -The method accepts a string or a `UriParams` object and optional keyword arguments, -which include metadata and other pipeline configurable settings (experimental). +The method requires a service identifier (either as a string or `UriParams` object) +and accepts optional configuration parameters for metadata and pipeline behavior +(experimental features). -For the first argument, the service URI locator can be a string or `UriParams` object: +For the first argument, provide the service URI as a string or `UriParams` object: | Property | Type | Description | | ----------- | -------------- | ------------------------------------------------ | @@ -123,8 +121,8 @@ spark.batches.create('my-folder/my-service') spark.batches.create(UriParams(folder='my-folder', service='my-service')) ``` -If needed, you can also provide additional keyword arguments to configure how the -pipeline, once created, will scale up or down and perform its operations. +For the second argument, you can provide optional parameters to customize the pipeline's behavior, +including scaling and operational configurations: | Property | Type | Description | | -------------------- | ------------- | ------------------------------------------------ | @@ -181,15 +179,15 @@ or the [`cancel` method](#cancel-a-batch-pipeline). ## Define a client-side batch pipeline by ID -This method performs no action on the pipeline (no API call). Instead, it -allows you to define a client-side reference for the pipeline using its -unique identifier (`id`), which can then be used to perform various operations -without having to specify it repeatedly in each method call. +This method creates a client-side reference to an existing batch pipeline using its +unique identifier (`id`). It serves as a convenient way to interact with a specific +pipeline without repeatedly providing its ID for each operation. ### Arguments -The expected argument is the pipeline's unique identifier as a string. -At this stage, no checks are performed to validate the provided `id`. +The method requires a single argument: the pipeline's unique identifier (`id`) as +a string. Note that this method does not validate the ID or make any API calls; it +simply establishes a local reference to the pipeline. ```py pipeline = spark.batches.of('uuid') @@ -197,14 +195,16 @@ pipeline = spark.batches.of('uuid') ### Returns -The method returns a batch `Pipeline` object that can be used to perform subsequent -actions on the pipeline. +The method returns a batch `Pipeline` object that serves as an interface for all +subsequent pipeline operations. + +This object provides more than just convenience in handling batch IDs. It maintains +valuable pipeline metrics and status information, enabling you to: -Apart from the convenience of not having to specify the batch ID repeatedly, some other -perks of using this object include the ability to build statistics and insights -about the pipeline usage. For instance, if you've built a mechanism for repeatedly -pushing and pulling data, you may retrieve details such as the total number of -records processed, the state of the pipeline, and so on. +- Track the pipeline's current state +- Monitor the total number of processed records +- Access processing statistics +- Manage chunk operations efficiently ```py print(pipeline.state) # 'open' (other values: 'closed' or 'cancelled') @@ -289,9 +289,13 @@ pipeline.get_status() ### Returns -The method returns a dictionary containing the current status of the pipeline -and other relevant details, such as the number of records processed, the time taken -to process the data, and the status of the pipeline. +The method returns a dictionary containing comprehensive pipeline metrics, including: + +- Current pipeline and batch status +- Record processing counts and progress +- Computation time metrics +- Buffer utilization statistics +- Worker allocation information ```json { @@ -327,15 +331,22 @@ Other available statuses (i.e., `batch_status`) are: ## Add input data to a batch pipeline -This method allows you to push input data in bulk to an existing pipeline. -It is also designed to facilitate data submission in different shapes and forms. +This method enables bulk submission of input data to an existing pipeline. It +supports various data formats and structures to provide flexible data ingestion +options. + +> [!WARNING] +> The SDK does **NOT** automatically convert regular JSON objects into JSON arrays +> when submitting to the pipeline. Submitting data in an incorrect format may result +> in unexpected behavior or processing errors, such as record count mismatches. +> Always ensure your input data is properly formatted as JSON arrays. ### Arguments The method accepts 4 mutually exclusive keyword arguments: -- `raw`: is the raw chunk data without no direct data manipulation. The raw dataset - can be of `string` or `bytes` as long as it is JSON serializable. +- `raw`: is the dataset in its most primitive shape and can be of `string` or `bytes` +as long as it is JSON serializable. ```py raw_string = """ @@ -347,8 +358,8 @@ raw_string = """ "data": { "inputs": [ ["input_1", "input_2", "input_N"], - [1, 2, 3], - [4, 5, 6] + [0, "zero", false], + [1, "one", true] ], "parameters": {"common_input": 0} } @@ -359,32 +370,32 @@ raw_string = """ pipeline.push(raw=raw_string) ``` -- `inputs`: a list of the records as input data. This is convenient when you have - a list of records that needs to be processed in a single chunk. That is, you are - in complete control of the data submission process: chunking and partitioning. +- `inputs`: is convenient when you have a list of records that needs to be processed + in one single chunk. That means, if you need to submit multiple chunks, you will + have to call the `push` method multiple times. ```py -pipeline.push(inputs=[{'value': 42}, {'value': 43}]) +pipeline.push(inputs=[['input_1', 'input_2', 'input_N'], [0, 'zero', False], [1, 'one', True]]) ``` -- `data`: an object of `ChunkData` type. Sometimes, you may want to perform certain - operations, such as applying aggregations to the output data post-processing. This +- `data`: is an object of `ChunkData` type. Sometimes, you may want to perform certain + operations, such as applying aggregations to the output data. This class lets you specify the `inputs`, `parameters` and `summary` separately. ```py from cspark.sdk import ChunkData data = ChunkData( - inputs=[{'value': 42}, {'value': 43}], + inputs=[['input_1', 'input_2', 'input_N'], [0, 'zero', False], [1, 'one', True]], parameters={'common': 40}, summary={'ignore_error': False, 'aggregation': [{'output_name': 'total', 'operator': 'SUM'}]}, ) pipeline.push(data=data) ``` -- `chunks`: an object of `BatchChunk` type. This gives you full control over the - chunk creation process, allowing you to specify the `inputs`, `parameters`, - and `summary`, and indicate the `id` and `size`. +- `chunks`: is an object of `BatchChunk` type. This gives you full control over the + chunk creation process, allowing you to indicate the `id` and `size` as well as + the `inputs`, `parameters`, and `summary`. ```py from cspark.sdk import BatchChunk, ChunkData @@ -393,7 +404,7 @@ chunk = BatchChunk( id='uuid', size=2, data=ChunkData( - inputs=[{'value': 42}, {'value': 43}], + inputs=[['input_1', 'input_2', 'input_N'], [0, 'zero', False], [1, 'one', True]], parameters={'common': 40}, summary={'ignore_error': False, 'aggregation': [{'output_name': 'total', 'operator': 'SUM'}]}, ), @@ -407,7 +418,10 @@ _evenly_ across the chunks. ```py from cspark.sdk import create_chunks -chunks = create_chunks(inputs=[{'value': 42}, {'value': 43}, {'value': 44}], chunk_size=2) +chunks = create_chunks( + inputs=[['input_1', 'input_2', 'input_N'], [0, 'zero', False], [1, 'one', True]], + chunk_size=2, +) pipeline.push(chunks=chunks) ``` @@ -440,14 +454,14 @@ reflecting the new data that was pushed. ## Retrieve the output data from a batch pipeline -Once you submit the input data, the pipeline will automatically start processing it. -Eventually, the pipeline will produce some output data, which can be pulled once available. +After submitting input data, the pipeline begins processing automatically. Output +data becomes available progressively and can be retrieved through the pull operation. > [!TIP] -> You do not have to wait for the previous chunk to be processed before submitting -> the next one. Spark will automatically queue and process the chunks once more -> compute resources are available. A good practice is monitoring the pipeline's -> status and ensuring the input and output buffers are not full. +> You can continue submitting chunks without waiting for previous ones to complete. +> Spark manages the queue and processing automatically, allocating compute resources +> as they become available. Best practice is to monitor the pipeline status periodically +> and ensure input/output buffers have sufficient capacity. ### Arguments @@ -512,13 +526,13 @@ Find out more about the output data structure in the ## Close a batch pipeline -Once you have finished processing all your input data, it is important to close -the pipeline to free up resources and ensure optimal performance. +Once all data processing is complete, it's essential to close the pipeline to release +system resources and maintain optimal performance. -After you close a batch, any pending chunks will still be processed and can be retrieved. -However, you won't be able to submit new chunks to a closed pipeline. The -SDK maintains an internal state of the pipeline and will generate an error -if you try to perform an unsupported operation on it. +After closing a batch, any pending chunks in the processing queue will still complete +and their results can be retrieved. However, the pipeline won't accept new chunk +submissions. The SDK tracks the pipeline's state internally and will throw an error +if you attempt any operations not supported on a closed pipeline. ### Arguments @@ -531,14 +545,13 @@ pipeline.dispose() > [!WARNING] > Do **NOT** use the `close()` method to close a pipeline. This method is reserved -> for closing the HTTP client of the `Pipeline` API resource and should not be -> used to close a pipeline. If that happens unintentionally, you will need to -> start over and build a new client-side pipeline using `Batches.of(id)`, which -> also means you'll lose the internal states handled by the old `Pipeline` object. +> for closing the HTTP client of the `Pipeline` API resource. If and when +> that happens unintentionally, you will need to start over and build a new client-side +> pipeline using `Batches.of(id)`, which also means you'll lose the internal states +> handled by the old `Pipeline` object. -Keep in mind that if the pipeline has been idle for longer than 30 minutes, -it will automatically be closed by the system to free up resources, i.e., disposing -of existing workers and buffers. +Note that pipelines automatically close after 30 minutes of inactivity to optimize +resource utilization. This automatic closure releases allocated workers and buffers. ### Returns @@ -622,17 +635,17 @@ To further illustrate the practical implementation of the Batches API, consider following example: the `create_and_run()` script. It's a self-contained script and should serve as a demonstration of how to harmoniously -use the various methods of the Batches API in one go. The script performs the following -tasks: +use the Batches API methods. The script performs the following tasks: -- reading a dataset (inputs) from a JSON file; -- pushing it in chunks to a newly created batch pipeline; -- checking the pipeline's status every 2 seconds; -- retrieving the output data from the pipeline when available; -- and finally, closing the pipeline. +- read a dataset (inputs) from a JSON file; +- push it in chunks to a newly created batch pipeline; +- check the pipeline's status every 2 seconds; +- retrieve the output data from the pipeline when available; +- and finally, close the pipeline. -The script will continue to interact with Spark till all data has been processed -unless an error occurs, which will force an early closure of the pipeline. +The script maintains continuous interaction with Spark until all data processing +is complete. If an error occurs during execution, the pipeline will be safely +terminated and all resources properly released. ```py import json @@ -680,16 +693,11 @@ def create_and_run(batches: Spark.Batches): results.extend(r['outputs']) time.sleep(2) - - except Spark.SparkSdkError as err: - print(err.message) - if err.cause: - print(err.details) - except Spark.SparkApiError as err: - logger.warning(err.message) + except Spark.SparkError as err: + logger.error(err.message) logger.info(err.details) except Exception as exc: - logger.fatal(f'Unknown error: {exc}') + logger.critical(f'Unknown error: {exc}') finally: if pipeline: pipeline.dispose() @@ -703,7 +711,7 @@ def create_and_run(batches: Spark.Batches): if __name__ == '__main__': load_dotenv() # load Spark settings from .env file - spark = Spark.Client(timeout=120_000) # create a Spark client + spark = Spark.Client(timeout=90_000, logger={'context': 'Async Batch'}) # create a Spark client with spark.batches as b: create_and_run(b) ``` @@ -718,11 +726,10 @@ if __name__ == '__main__': > to consider how you read and feed the input data to the pipeline and how to handle > the output data once it's available. -In the example above, the script assumes that the input dataset is cleansed and -formatted correctly. However, there may be occasions when the dataset is not in the -desired format. In such cases, you will need to preprocess it. The `BatchChunk` and -`ChunkData` classes can be used to manipulate the data and structure it in a way -that is compatible with the pipeline. +The example assumes pristine input data, which is rarely the case in real applications. +When working with raw data, you'll likely need preprocessing steps. The `BatchChunk` +and `ChunkData` classes provide flexible data manipulation capabilities to ensure +your data meets pipeline requirements before processing. [Back to top](#batches-api) or [Next: Log History API](./history.md) diff --git a/docs/history.md b/docs/history.md index d7ea887..9622155 100644 --- a/docs/history.md +++ b/docs/history.md @@ -89,7 +89,7 @@ when successful, this method returns: } ``` -Here's a full example how to harness this method: +Here's a full example of how to harness this method: ```python import cspark.sdk as Spark @@ -105,38 +105,38 @@ with spark.logs as logs: ## Download service execution logs -This method allows you to download the service execution logs as a CSV or JSON file -to your local machine. Unlike the `rehydrate` method, this one initiates a download -job and continuously checks the status until the job is completed and finally downloads -the zip file. It throws a `SparkError` if the download job fails to produce a downloadable -file. +This method allows you to export service execution logs in either CSV or JSON +format to your local machine. It streamlines the download process by handling the +complete workflow: initiating the download job, monitoring its status, and retrieving +the final zip file once ready. If the download process encounters any issues or +fails to generate a downloadable file, the method raises a `SparkError`. If you want to have more fine-grained control over the download process, you can use respectively the `Spark.logs.downloads.initiate(uri, [type])` and `Spark.logs.downloads.get_status(uri, [type])` methods to initiate a download job and check its status until it's finished. Do note that the status check is -subject to a timeout when it reaches the maximum number of retries. +subject to `RetryTimeoutError` when it reaches the maximum number of retries. ### Arguments This method accepts the following keyword arguments: -| Property | Type | Description | -| ----------------- | -------------------- | ---------------------------------------------------------------- | -| _folder_ | `str` | The folder name. | -| _service_ | `str` | The service name. | -| _version\_id_ | `None \| string` | The particular service version for the download. | -| _type_ | `csv \| json` | The file type (defaults to `json`). | -| _call\_ids_ | `None \| List[str]` | An array of call IDs to download logs for. | +| Property | Type | Description | +| ----------------- | -------------------- | ----------------------------------------------------- | +| _folder_ | `str` | The folder name. | +| _service_ | `str` | The service name. | +| _version\_id_ | `None \| string` | The particular service version for the download. | +| _type_ | `csv \| json` | The file type (defaults to `json`). | +| _call\_ids_ | `None \| List[str]` | An array of call IDs to download logs for. | | _start\_date_ | `None \| str \| int \| datetime` | The start date (format: `YYYY-MM-DD[THH:MM:SS.SSSZ]`).| | _end\_date_ | `None \| str \| int \| datetime` | The end date (format: `YYYY-MM-DD[THH:MM:SS.SSSZ]`). | -| _correration\_id_ | `string` | The correlation ID (possible fallback for `call_ids`). | -| _source\_system_ | `string` | The source system (possible fallback for `call_ids`). | +| _correration\_id_ | `string` | The correlation ID (possible fallback for `call_ids`).| +| _source\_system_ | `string` | The source system (possible fallback for `call_ids`). | | _max\_retries_ | `None \| int` | The number of retries to attempt (defaults to `Config.max_retries`).| | _retry\_interval_ | `None \| float` | The interval between retries in seconds (defaults to `Config.retry_interval`).| ```python -logs.download( +spark.logs.download( folder='my-folder', service='my-service', call_ids=['uuid1', 'uuid2', 'uuid3'], diff --git a/docs/hybrid.md b/docs/hybrid.md index 0c5c835..7124504 100644 --- a/docs/hybrid.md +++ b/docs/hybrid.md @@ -17,8 +17,8 @@ support the Hybrid Runner API. To install it, run: pip install cspark ``` -Obviously, a runner offers a smaller subset of functionality compared to the SaaS API, -however, extending `cspark.sdk` to support the Hybrid Runner API is a good way +Hybrid runners offer a smaller subset of functionality compared to the SaaS API. +However, extending `cspark.sdk` to support the Hybrid Runner API is a good way to keep the codebase consistent and maintainable. This also means that you may want to check its [documentation][cspark] to learn about its client options, error handling, and other features. @@ -52,7 +52,7 @@ Explore the [examples] and [docs] folders to find out more about its capabilitie [cspark]: https://pypi.org/project/cspark/ -[version-img]: https://badge.fury.io/py/cspark.svg +[version-img]: https://img.shields.io/pypi/v/cspark [version-url]: https://pypi.python.org/pypi/cspark [user-guide]: https://docs.coherent.global/hybrid-runner/introduction-to-the-hybrid-runner [hybrid-runner]: https://github.com/orgs/Coherent-Partners/packages/container/package/nodegen-server diff --git a/docs/impex.md b/docs/impex.md index 7ca86cf..0c5602d 100644 --- a/docs/impex.md +++ b/docs/impex.md @@ -2,10 +2,10 @@ # ImpEx API -| Verb | Description | -| ----------------------------- | --------------------------------------------------------------------------------- | -| `Spark.impex.export(data)` | [Export Spark entities (versions, services, or folders)](#export-spark-entities). | -| `Spark.impex.import_(data)`| [Import exported Spark entities into your workspace](#import-spark-entities). | +| Verb | Description | +| ----------------------- | --------------------------------------------------------------------------------- | +| `Spark.impex.exp(data)` | [Export Spark entities (versions, services, or folders)](#export-spark-entities). | +| `Spark.impex.imp(data)` | [Import exported Spark entities into your workspace](#import-spark-entities). | ## Export Spark entities @@ -23,7 +23,7 @@ The expected keyword arguments are as follows: | _folders_ | `None \| list[str]` | 1+ folder name(s). | | _services_ | `None \| list[str]` | 1+ service URI(s). | | _version\_ids_ | `None \| list[str]` | 1+ version UUID(s) of the desired service. | -| _file\_filter_ | `migrate \| onpremises` | For data migration or hybrid deployments (defaults to `migrate`). | +| _file\_filter_ | `'migrate' \| 'onpremises'` | For data migration or hybrid deployments (defaults to `migrate`). | | _version\_filter_ | `latest \| all` | Which version of the file to export (defaults to `latest`). | | _source\_system_ | `None \| str` | Source system name to export from (e.g., `Spark Python SDK`). | | _correlation\_id_ | `None \| str` | Correlation ID for the export (useful for tagging). | @@ -41,7 +41,7 @@ Check out the [API reference](https://docs.coherent.global/spark-apis/impex-apis for more information. ```python -spark.impex.export( +spark.impex.exp( services=['my-folder/my-service[0.4.2]', 'my-other-folder/my-service-2'], file_filter='onpremises', max_retries=5, @@ -54,16 +54,15 @@ spark.impex.export( When successful, this method returns an array of exported entities, where each entity is an `HttpResponse` object with the buffer containing the exported entity. -### Non-Transactional Methods - -This method is transactional. It will initiate an export job, poll its status -until it completes, and download the exported files. If you need more control over -these steps, consider using the `exports` resource directly. You may use the following -methods: - -- `Spark.impex.exports.initiate(data)` creates an export job. -- `Spark.impex.exports.get_status(job_id)` gets an export job's status. -- `Spark.impex.exports.download(urls)` downloads the exported files as a ZIP. +> [!TIP] +> This method is transactional. It will initiate an export job, poll its status +> until it completes, and download the exported files. If you need more control over +> these steps, consider using the `exports` resource directly. You may use the following +> methods: +> +> - `Spark.impex.exports.initiate(data)` creates an export job. +> - `Spark.impex.exports.get_status(job_id)` gets an export job's status. +> - `Spark.impex.exports.download(urls)` downloads the exported files as a ZIP. ## Import Spark entities @@ -78,7 +77,7 @@ The expected keyword arguments are as follows: | --------------- | ------------------ | ------------------------------------------------------------ | | _file_ | `BinaryIO` | The ZIP file containing the exported entities. | | _destination_ | `str \| List[str] \| Mapping[str, str] \| List[Mapping[str, str]]`| The destination service URI(s). | -| _if\_present_ | `abort \| replace \| add_version` | What to do if the entity already exists in the destination (defaults to `add_version`). | +| _if\_present_ | `'abort' \| 'replace' \| 'add_version'` | What to do if the entity already exists in the destination (defaults to `add_version`). | | _source\_system_ | `None \| str` | Source system name to export from (e.g., `Spark Python SDK`).| | _correlation\_id_ | `None \| str` | Correlation ID for the export (useful for tagging). | | _max\_retries_ | `None \| int` | Maximum number of retries when checking the export status. | @@ -96,13 +95,13 @@ any of the formats indicated below: | --------- | ------- | ------------------------------------------ | | _source_ | `str` | The service URI of the source tenant. | | _target_ | `str \| None`| The service URI of the destination tenant (defaults to `source`) | -| _upgrade_ | `major \| minor \| patch` | The version upgrade strategy (defaults to `minor`). | +| _upgrade_ | `'major' \| 'minor' \| 'patch'` | The version upgrade strategy (defaults to `minor`). | Check out the [API reference](https://docs.coherent.global/spark-apis/impex-apis/import#request-body) for more information. ```python -spark.impex.import_( +spark.impex.imp( destination={'source': 'my-folder/my-service', 'target': 'this-folder/my-service', 'upgrade': 'patch'}, file=open('exported.zip', 'rb'), max_retries=7, @@ -175,8 +174,6 @@ See the sample response below. } ``` -### Non-Transactional Methods - Being transactional, this method will create an import job, and poll its status continuously until it completes the import process. You may consider using the `imports` resource directly and control the import process manually: diff --git a/docs/misc.md b/docs/misc.md index 8c1dded..1146e47 100644 --- a/docs/misc.md +++ b/docs/misc.md @@ -12,17 +12,14 @@ This method helps you download a service's [WebAssembly](https://webassembly.org/) module. -Roughly speaking, WebAssembly (or WASM) is a binary instruction format -for a stack-based virtual machine. It's designed as a portable compilation target -for programming languages, enabling deployment on the web for client and server -applications. - -In the context of Spark, a WebAssembly module refers to a cohesive bundle of -files designed for portability and execution across web and Node.js environments. -This bundle typically includes the WebAssembly representation of the Spark service's -encapsulated logic along with associated JavaScript files. By packaging these -components together, a Spark WASM module becomes executable within both browser and -Node environments. +[WebAssembly](https://webassembly.org/) (WASM) is a low-level binary format for +executing code in a stack-based virtual machine. It serves as a compilation target +for high-level programming languages, enabling efficient execution across web platforms. + +In Spark's context, a WebAssembly module is a self-contained package that bundles +the compiled service logic with its supporting files. This modular approach ensures +consistent execution in both web browsers and Node.js environments, making Spark +services highly portable and performant. Check out the [API reference](https://docs.coherent.global/spark-apis/webassembly-module-api) for more information. @@ -31,9 +28,9 @@ for more information. You may pass in the service URI as `string` in the following format: -- `version/uuid` (e.g., `version/123e4567-e89b-12d3-a456-426614174000`) - **preferred** -- `service/uuid` (e.g., `service/123e4567-e89b-12d3-a456-426614174000`) -- `folder/service` (e.g., `my-folder/my-service`) +- `version/{version_id}` - **preferred** +- `service/{service_id}` +- `{folder}/{service}` ```python spark.wasm.download('version/uuid') @@ -46,7 +43,7 @@ Alternatively, you can pass in the following parameters as an `object`. | _folder_ | `str \| None` | The folder name. | | _service_ | `str \| None` | The service name. | | _version\_id_ | `str \| None` | The version UUID of the service. | -| _service\_id_ | `str \| None` | The service UUID. | +| _service\_id_ | `str \| None` | The service UUID (points to the latest version).| > [!NOTE] > As of now, only the `version_id` should be used to download the WebAssembly module. diff --git a/docs/readme.md b/docs/readme.md index 61a7aab..d563772 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -60,8 +60,8 @@ which provides an elegant, feature-rich HTTP module. The SDK built a layer on top of it to simplify the process of making HTTP requests to the Spark platform. Presently, only the synchronous HTTP methods are supported. Hence, all the methods -under `Spark.Client()` are synchronous and return an `HttpResponse` object with -the following properties: +under `Spark.Client()` are synchronous (i.e., blocking) and return an `HttpResponse` +object with the following properties: - `status`: HTTP status code - `data`: Data returned by the API if any (usually JSON) @@ -74,18 +74,19 @@ the following properties: > when accessing the response data. > > As a side note, we intend to leverage the asynchronous methods in the future -> to provide a more efficient way to interact with the Spark platform. +> to provide a more efficient (i.e., non-blocking) way to interact with the Spark platform. ## HTTP Error -When attempting to communicate with the API, the SDK will wrap any sort of failure +When attempting to communicate with the API, the SDK will wrap any failure (any error during the roundtrip) into a `SparkApiError`, which will include the HTTP `status` code of the response and the `request_id`, a unique identifier of the request. The most common errors are: - `UnauthorizedError`: when the user is not authenticated/authorized - `NotFoundError`: when the requested resource is not found -- `BadRequestError`: when the request or payload is invalid. +- `BadRequestError`: when the request or payload is invalid +- `RetryTimeoutError`: when the maximum number of retries is reached. The following properties are available in a `SparkApiError`: @@ -101,23 +102,18 @@ as well as the obtained response, if available. ## API Resource -The Spark platform offers a wide range of functionalities that can be accessed -programmatically via RESTful APIs. For now, the SDK only supports [Services API](./services.md) -and [Batches API](./batches.md). +The Spark platform provides extensive functionality through its RESTful APIs, +with over 60 endpoints available. While the SDK currently implements a subset of +these endpoints, it's designed to be extensible. -Since the SDK does not cover all the endpoints in the platform, it provides a way -to cover additional endpoints. So, if there's an API resource you would like to -consume that's not available in the SDK, you can always extend this `ApiResource` -to include it. +If you need to consume an API endpoint that's not yet available in the SDK, you +can easily extend the `ApiResource` class to implement it. Here's how: ```py -from cspark.sdk import Client, Config, ApiResource, Uri +from cspark.sdk import Client, ApiResource, Uri # 1. Prepare the additional API resource you want to consume (e.g., MyResource). class MyResource(ApiResource): - def __init__(self, config: Config): - super().__init__(config) - def fetch_data(self): url = Uri.of(base_url=self.config.base_url.full, version='api/v4', endpoint='my/resource') return self.request(url, method='GET') @@ -140,7 +136,8 @@ some other goodies like the `base_url`, which can be used to build other URLs supported by the Spark platform. The `Uri` class is also available to help you build the URL for your custom resource. -In this particular example, the built URL will be: `https://excel.my-env.coherent.global/my-tenant/api/v4/my/resource`. +In this particular example, the built URL will be: +`https://excel.my-env.coherent.global/my-tenant/api/v4/my/resource`. ### Error Handling diff --git a/docs/services.md b/docs/services.md index 1ca5063..5607da2 100644 --- a/docs/services.md +++ b/docs/services.md @@ -40,7 +40,7 @@ This method accepts the following keyword arguments: | _folder_ | `str` | The folder name. | | _file_ | `BinaryIO` | The binary file (e.g., `open('path/to/file.xlsx', 'rb')`).| | _file\_name_ | `None \| str` | The name of the Excel file (defaults to service `name`). | -| _versioning_ | `major \| minor \| patch`| How to increment the service version (defaults to `minor`).| +| _versioning_ | `'major' \| 'minor' \| 'patch'`| How to increment the service version (defaults to `minor`).| | _start\_date_ | `None \| str \| int \| datetime` | The effective start date (defaults to `datetime.now()` ).| | _end\_date_ | `None \| str \| int \| datetime` | The effective end date (defaults to 10 years later).| | _draft\_name_ | `None \| str` | This overrides the `service` name to a custom name. | @@ -159,7 +159,7 @@ to learn more about Services API. ### Arguments -The method accepts a string or a `UriParams` object and optional keyword arguments, +The method accepts a `str`ing or a `UriParams` object and optional keyword arguments, which include the input data and metadata. See the use cases below. - **Default inputs**: @@ -180,7 +180,7 @@ spark.services.execute(UriParams(folder='my-folder', service='my-service')) a keyword argument. ```py -spark.services.execute('my-folder/my-service', inputs={'my_input': 42}) +spark.services.execute('my-folder/my-service', inputs={'my_input': 13}) ``` - **Inputs with metadata**: metadata can be provided along with the `inputs` data. @@ -190,7 +190,7 @@ spark.services.execute('my-folder/my-service', inputs={'my_input': 42}) ```py spark.services.execute( 'my-folder/my-service', - inputs={'my_input': 13}, + inputs=[{'my_input': 13}, {'my_input': 14}], subservices=['sub1', 'sub2'], call_purpose='Demo', ) @@ -271,8 +271,8 @@ For the other keyword arguments: | Property | Type | Description | | -------------------- | ------------- | ------------------------------------------------ | | _inputs_ | `None \| str \| Dict \| List` | The input data (single or many). | -| _response\_format_ | `original \| alike` | Response data format to use (defaults to `alike`).| -| _encoding_ | `gzip \| deflate` | Compress the payload using this encoding. | +| _response\_format_ | `'original' \| 'alike'` | Response data format to use (defaults to `alike`).| +| _encoding_ | `'gzip' \| 'deflate'` | Compress the payload using this encoding. | | _active\_since_ | `None \| str` | The transaction date (helps pinpoint a version). | | _source\_system_ | `None \| str` | The source system (defaults to `Spark Python SDK`).| | _correlation\_id_ | `None \| str` | The correlation ID. | @@ -314,7 +314,7 @@ when the `response_format` is set to `alike`: } ``` -You may wonder why the output data is wrapped in an array for a single input. +You may wonder why the output data is wrapped in an array for single inputs. This is because the `alike` format is designed to work with both single and multiple inputs. This should help maintain consistency in the output data format. But if you prefer the original format emitted by the API, you can set the `response_format` @@ -344,8 +344,8 @@ may provide the following keyword arguments: | Property | Type | Description | | ------------- | ---------------- | ------------------------------------------------ | | _using_ | `None \| str` | The transform name (defaults to the service name if any).| -| _api\_version_| `v3 \| v4` | The target API version (defaults to `v3`). | -| _encoding_ | `gzip \| deflate`| Apply this content encoding between client and server. | +| _api\_version_| `'v3' \| 'v4'` | The target API version (defaults to `v3`). | +| _encoding_ | `'gzip' \| 'deflate'`| Apply this content encoding between client and server. | > NOTE: When using `encoding`, the SDK will automatically compress and decompress the > payload using the specified encoding. @@ -525,7 +525,7 @@ original Excel file or the configured version of it. | Property | Type | Description | | ------------ | ------------------------ | ----------------------------------------------------- | | _file\_name_ | `str \| None` | Save the downloaded file with a different name. | -| _type_ | `original \| configured` | The type of file to download (defaults to `original`).| +| _type_ | `'original' \| 'configured'` | The type of file to download (defaults to `original`).| ```python spark.services.download('my-folder/my-service', type='configured') @@ -563,10 +563,10 @@ using a specific compiler version, you must use additional parameters. | -------------- | -------------------------- | ---------------------------------------------------------- | | _version\_id_ | `str \| None` | The UUID of a particular version of the service. | | _compiler_ | `str \| None` | The compiler version to use (do not confuse with type). | -| _upgrade_ | `major \| minor \| patch` | which type of versioning to apply (defaults to `patch`). | +| _upgrade_ | `'major' \| 'minor' \| 'patch'` | which type of versioning to apply (defaults to `patch`). | | _label_ | `str \| None` | The version label. | | _release\_notes_ | `str \| None` | The release notes. | -| _tags_ | `str \| List[str] \| None` | The comma-separted tags to apply to the service if string. | +| _tags_ | `str \| List[str] \| None` | The comma-separated tags to apply to the service if string. | | _start\_date_ | `int \| str \| datetime \| None` | The effective start date in ISO format. | | _end\_date_ | `int \| str \| datetime \| None` | The effective end date in ISO format. | diff --git a/examples/batches.py b/examples/batches.py index 8137089..721b495 100644 --- a/examples/batches.py +++ b/examples/batches.py @@ -1,3 +1,4 @@ +# type: ignore import json import time @@ -28,34 +29,27 @@ def log_status(status, msg='status check'): pipeline = None try: batch = batches.create('my-folder/my-service') - pipeline = batches.of(batch.data['id']) # type: ignore + pipeline = batches.of(batch.data['id']) pipeline.push(chunks=chunks) time.sleep(5) status = pipeline.get_status().data - while status['records_completed'] < status['record_submitted']: # type: ignore + while status['records_completed'] < status['record_submitted']: status = pipeline.get_status().data log_status(status) - if status['records_available'] > 0: # type: ignore + if status['records_available'] > 0: result = pipeline.pull() - log_status(result.data['status'], 'data retrieval status') # type: ignore - - for r in result.data['data']: # type: ignore - results.extend(r['outputs']) # type: ignore + log_status(result.data['status'], 'data retrieval status') + results.extend(r['outputs'] for r in result.data['data']) time.sleep(2) - - except Spark.SparkSdkError as err: - print(err.message) - if err.cause: - print(err.details) - except Spark.SparkApiError as err: - logger.warning(err.message) + except Spark.SparkError as err: + logger.error(err.message) logger.info(err.details) except Exception as exc: - logger.fatal(f'Unknown error: {exc}') + logger.critical(f'Unknown error: {exc}') finally: if pipeline: pipeline.dispose() @@ -69,7 +63,7 @@ def log_status(status, msg='status check'): if __name__ == '__main__': load_dotenv() - spark = Spark.Client(timeout=120_000) - with spark.batches as b: - describe(b) - create_and_run(b) + spark = Spark.Client(timeout=90_000, logger={'context': 'Async Batch'}) + with spark.batches as batches: + describe(batches) + create_and_run(batches) diff --git a/examples/config.py b/examples/config.py index 1c05931..5c7ddd2 100644 --- a/examples/config.py +++ b/examples/config.py @@ -26,11 +26,7 @@ def retrieve_token(): try: retrieve_token() print_logs() - except Spark.SparkSdkError as err: - print(err.message) - if err.cause: - print(err.details) - except Spark.SparkApiError as err: + except Spark.SparkError as err: print(err.message) print(err.details) except Exception as exc: diff --git a/examples/folders.py b/examples/folders.py new file mode 100644 index 0000000..bad40d0 --- /dev/null +++ b/examples/folders.py @@ -0,0 +1,56 @@ +import cspark.sdk as Spark +from dotenv import load_dotenv + + +def list_categories(folders: Spark.Folders): + response = folders.categories.list() + print(response.data) + + +def save_category(folders: Spark.Folders): + response = folders.categories.save('My Category', key='my-category-key') + print(response.data) + + +def delete_category(folders: Spark.Folders): + response = folders.categories.delete('my-category-key') + print(response.data) + + +def list(folders: Spark.Folders): + response = folders.find('my-folder') + print(response.data) + + +def create(folder: Spark.Folders): + response = folder.create('my-folder', cover=open('cover.png', 'rb')) + print(response.data) + + +def update(folder: Spark.Folders): + response = folder.update('uuid', description='sample description') + print(response.data) + + +def delete(folder: Spark.Folders): + response = folder.delete('uuid') + print(response.data) + + +if __name__ == '__main__': + load_dotenv() + + try: + with Spark.Client().folders as folders: + list_categories(folders) + save_category(folders) + delete_category(folders) + list(folders) + create(folders) + update(folders) + delete(folders) + except Spark.SparkError as err: + print(err.message) + print(err.details) + except Exception as exc: + print(f'Unknown error: {exc}') diff --git a/examples/history.py b/examples/history.py index ca801fb..e8a5a72 100644 --- a/examples/history.py +++ b/examples/history.py @@ -26,16 +26,11 @@ def download(logs: Spark.History): if __name__ == '__main__': load_dotenv() - spark = Spark.Client(timeout=90_000) try: - with spark.logs as logs: + with Spark.Client(timeout=90_000).logs as logs: rehydrate(logs) download(logs) - except Spark.SparkSdkError as err: - print(err.message) - if err.cause: - print(err.details) - except Spark.SparkApiError as err: + except Spark.SparkError as err: print(err.message) print(err.details) except Exception as exc: diff --git a/examples/impex.py b/examples/impex.py index 99511a2..b1db139 100644 --- a/examples/impex.py +++ b/examples/impex.py @@ -19,14 +19,14 @@ def download(wasm: Spark.Wasm): def export_entities_with(impex: Spark.ImpEx): - downloadables = impex.export(services=['my-folder/my-service'], max_retries=5, retry_interval=3) + downloadables = impex.exp(services=['my-folder/my-service'], max_retries=5, retry_interval=3) for count, download in enumerate(downloadables): save_file(download.buffer, f'exported-{count}.zip') def import_entities_with(impex: Spark.ImpEx): destination = {'source': 'my-folder/my-service', 'target': 'this-folder/my-service', 'upgrade': 'patch'} - response = impex.import_(destination, file=open('exported.zip', 'rb'), max_retries=7, retry_interval=3) + response = impex.imp(destination, file=open('exported.zip', 'rb'), max_retries=7, retry_interval=3) print(response.data) @@ -42,11 +42,7 @@ def import_entities_with(impex: Spark.ImpEx): with spark.wasm as wasm: download(wasm) - except Spark.SparkSdkError as err: - print(err.message) - if err.cause: - print(err.details) - except Spark.SparkApiError as err: + except Spark.SparkError as err: print(err.message) print(err.details) except Exception as exc: diff --git a/examples/services.py b/examples/services.py index 0fb7cb2..25578d6 100644 --- a/examples/services.py +++ b/examples/services.py @@ -24,7 +24,7 @@ def execute(services: Spark.Services): def transform(services: Spark.Services): inputs = {} # inputs data - response = services.transform('my-folder/my-service', inputs=inputs, using='my-transform', encoding='gzip') + response = services.transform('my-folder/my-service', inputs=inputs, using='my-transform') print(response.data) @@ -75,8 +75,7 @@ def delete(services: Spark.Services): load_dotenv() try: - spark = Spark.Client() - with spark.services as services: + with Spark.Client().services as services: create(services) execute(services) transform(services) @@ -88,11 +87,7 @@ def delete(services: Spark.Services): recompile(services) validate(services) delete(services) - except Spark.SparkSdkError as err: - print(err.message) - if err.cause: - print(err.details) - except Spark.SparkApiError as err: + except Spark.SparkError as err: print(err.message) print(err.details) except Exception as exc: diff --git a/examples/usecases/service_promotion/main.py b/examples/usecases/service_promotion/main.py index 4a84a4a..7449ecf 100644 --- a/examples/usecases/service_promotion/main.py +++ b/examples/usecases/service_promotion/main.py @@ -34,7 +34,7 @@ def imp(settings: str, auth: str, file: typing.BinaryIO): destination = options.pop('services') spark = Spark.Client(**options, oauth=json.loads(auth)) - imported = spark.impex.import_(destination, file=file, source_system=CICD_HANDLER) + imported = spark.impex.imp(destination, file=file, source_system=CICD_HANDLER) outputs = imported.data.get('outputs', {}) if isinstance(imported.data, dict) else {} total = len(outputs['services']) if 'services' in outputs else 0 if total == 0: diff --git a/pyproject.toml b/pyproject.toml index 9c6133e..0c5587b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "cspark" -version = "0.1.11" +version = "0.1.12" description = "A Python SDK for interacting with Coherent Spark APIs" license = "Apache-2.0" authors = [{ name = "Coherent", email = "team@coherent.global" }] diff --git a/src/cspark/sdk/_client.py b/src/cspark/sdk/_client.py index e2d2c0e..b027320 100644 --- a/src/cspark/sdk/_client.py +++ b/src/cspark/sdk/_client.py @@ -53,6 +53,11 @@ def __init__( logger=logger, ) + @property + def folders(self) -> API.Folders: + """The resource to manage Folders API.""" + return API.Folders(self.config) + @property def services(self) -> API.Services: """The resource to manage Services API.""" diff --git a/src/cspark/sdk/_version.py b/src/cspark/sdk/_version.py index b964906..adc403c 100644 --- a/src/cspark/sdk/_version.py +++ b/src/cspark/sdk/_version.py @@ -2,7 +2,7 @@ __all__ = ['__version__', '__title__', 'sdk_version', 'about'] -__version__ = sdk_version = '0.1.11' +__version__ = sdk_version = '0.1.12' __title__ = 'Coherent Spark Python SDK' diff --git a/src/cspark/sdk/resources/__init__.py b/src/cspark/sdk/resources/__init__.py index 0aeba49..d1328c7 100644 --- a/src/cspark/sdk/resources/__init__.py +++ b/src/cspark/sdk/resources/__init__.py @@ -1,5 +1,6 @@ from ._base import * from ._batches import * +from ._folders import * from ._history import * from ._impex import * from ._oauth2 import * diff --git a/src/cspark/sdk/resources/_batches.py b/src/cspark/sdk/resources/_batches.py index 5b28f7e..ffa53a5 100644 --- a/src/cspark/sdk/resources/_batches.py +++ b/src/cspark/sdk/resources/_batches.py @@ -129,7 +129,8 @@ def create( 'max_output_size': max_output_size, 'acceptable_error_percentage': math.ceil((1 - min(accuracy or 1, 1.0)) * 100), } - return self.request(url, method='POST', body=body) + + return self.request(url, method='POST', body={k: v for k, v in body.items() if v is not None}) def of(self, batch_id: str) -> 'Pipeline': return Pipeline(batch_id, self.config) @@ -182,18 +183,22 @@ def push( url = Uri.of(None, endpoint=f'batch/{self._id}/chunks', **self._base_uri) body = self.__build_chunk_data(chunks, data, inputs, raw, if_chunk_id_duplicated) + response = self.request(url, method='POST', body=body) total = response.data['record_submitted'] if isinstance(response.data, dict) else 0 self.logger.info(f'pushed {total} records to batch pipeline <{self._id}>') + return response def pull(self, max_chunks: int = 100): self.__assert_state(['cancelled']) endpoint = f'batch/{self._id}/chunkresults?max_chunks={max_chunks}' + response = self.request(Uri.of(None, endpoint=endpoint, **self._base_uri)) total = response.data['status']['records_available'] if isinstance(response.data, dict) else 0 self.logger.info(f'{total} available records from batch pipeline <{self._id}>') + return response def dispose(self): @@ -201,18 +206,22 @@ def dispose(self): self.__assert_state(['closed', 'cancelled']) url = Uri.of(None, endpoint=f'batch/{self._id}', **self._base_uri) + response = self.request(url, method='PATCH', body={'batch_status': 'closed'}) self._state = 'closed' self.logger.info(f'batch pipeline <{self._id}> has been closed') + return response def cancel(self): self.__assert_state(['closed', 'cancelled']) url = Uri.of(None, endpoint=f'batch/{self._id}', **self._base_uri) + response = self.request(url, method='PATCH', body={'batch_status': 'cancelled'}) self._state = 'cancelled' self.logger.info(f'batch pipeline <{self._id}> has been cancelled') + return response def __assert_state(self, states: List[str], throwable: bool = True) -> bool: @@ -279,27 +288,32 @@ def __assess_chunks(self, chunks: List['BatchChunk'], is_duplicated: str): f'chunk id <{id}> is duplicated for this pipeline <{self._id}> ' f'and has been replaced with <{chunk.id}>' ) - self._chunks[chunk.id] = chunk.size or len(chunk.data.inputs) + self._chunks[chunk.id] = chunk.size or len(chunk.data.inputs) - 1 assessed.append(chunk.to_dict()) return assessed def create_chunks( - dataset: List[Any], # input dataset + dataset: List[Any], # input dataset in json array format *, + headers: Optional[List[str]] = None, # headers for the input dataset chunk_size: int = 200, parameters: Optional[Dict[str, Any]] = None, summary: Optional[Dict[str, Any]] = None, ) -> List[BatchChunk]: - """Creates a list of batch chunks from a given input dataset.""" + """Creates a list of batch chunks from a JSON array dataset.""" length = len(dataset) chunk_size = max(1, chunk_size) batch_size = math.ceil(length / chunk_size) + headers = headers or (length > 0 and cast(List[str], dataset.pop(0))) or [] + if not headers: + raise SparkError.sdk('missing headers for the input dataset', cause=dataset) + chunks = [] for i in range(batch_size): start = i * chunk_size end = min(start + chunk_size, length) - inputs = dataset[start:end] - chunks.append(BatchChunk(id=get_uuid(), data=ChunkData(inputs, parameters or {}, summary), size=len(inputs))) + inputs = [headers] + dataset[start:end] + chunks.append(BatchChunk(get_uuid(), ChunkData(inputs, parameters or {}, summary), size=len(inputs) - 1)) return chunks diff --git a/src/cspark/sdk/resources/_folders.py b/src/cspark/sdk/resources/_folders.py new file mode 100644 index 0000000..24f3e97 --- /dev/null +++ b/src/cspark/sdk/resources/_folders.py @@ -0,0 +1,135 @@ +from datetime import datetime +from typing import Any, BinaryIO, Optional, Union + +from .._config import Config +from .._constants import SPARK_SDK +from .._errors import SparkApiError, SparkError +from .._utils import DateUtils, get_uuid +from ._base import ApiResource, Uri + +__all__ = ['Folders'] + + +class Folders(ApiResource): + def __init__(self, config: Config): + super().__init__(config) + self._base_uri = {'base_url': self.config.base_url.value, 'version': 'api/v1'} + + @property + def categories(self) -> 'Categories': + return Categories(self.config) + + def find( + self, + name: Optional[str] = None, + *, + favorite: Optional[bool] = None, + page: int = 1, + page_size: int = 100, + sort: str = '-updated', + **params, + ): + url = Uri.of(None, endpoint='product/list', **self._base_uri) + search = [{'field': k, 'value': v} for k, v in {**params, 'name': name, 'isstarred': favorite}.items() if v] + body = {'search': search, 'page': page, 'pageSize': page_size, 'sort': sort} + body.update({'shouldFetchActiveServicesCount': True}) + + return self.request(url, method='POST', body=body) + + def create( + self, + name: str, + *, + description: Optional[str] = None, + category: Optional[str] = None, + start_date: Union[None, str, int, datetime] = None, + launch_date: Union[None, str, int, datetime] = None, + status: Optional[str] = None, + cover: Optional[BinaryIO] = None, + ): + url = Uri.of(None, endpoint='product/create', **self._base_uri) + startdate, launchdate = DateUtils.parse(start_date, launch_date) + form = { + 'Name': name, + 'Description': description or f'Created by {SPARK_SDK}', + 'Category': category or 'Other', + 'StartDate': startdate.isoformat(), + 'LaunchDate': launchdate.isoformat(), + 'Status': status or 'Design', + } + response = self.request(url, method='POST', form=form) + + if response.status == 200 and isinstance(response.data, dict) and response.data['status'] == 'Success': + if cover: + self.upload_cover(response.data['data']['folderId'], cover) + return self.request(response.data['data']['get_product_url']) + + cause = SparkApiError.to_cause(request=response.raw_request, response=response.raw_response) + if response.data['errorCode'] == 'PRODUCT_ALREADY_EXISTS': # type: ignore + error = SparkError.api(409, {'message': f'folder name <{name}> already exists', 'cause': cause}) + else: + error = SparkError.api(response.status, {'message': f'failed to create folder <{name}>', 'cause': cause}) + self.logger.error(error.message) + raise error + + def update( + self, + id: str, + *, + description: Optional[str] = None, + category: Optional[str] = None, + start_date: Union[None, str, int, datetime] = None, + launch_date: Union[None, str, int, datetime] = None, + cover: Optional[BinaryIO] = None, + ): + url = Uri.of(None, endpoint=f'product/update/{id}', **self._base_uri) + if cover: + self.upload_cover(id, cover) + + body: dict[str, Any] = {'shouldTrackUserAction': True} + if description: + body['description'] = description + if category: + body['category'] = category + if DateUtils.is_date(start_date): + body['startDate'] = DateUtils.to_datetime(start_date).isoformat() # type: ignore + if DateUtils.is_date(launch_date): + body['launchDate'] = DateUtils.to_datetime(launch_date).isoformat() # type: ignore + return self.request(url, method='POST', body=body) + + def delete(self, id: str): + url = Uri.of(None, endpoint=f'product/delete/{id}', **self._base_uri) + self.logger.warning('deleting folder will also delete all its services') + return self.request(url, method='DELETE') + + def upload_cover(self, id: str, image: BinaryIO, filename: Optional[str] = None): + url = Uri.of(None, endpoint='product/uploadcoverimage', **self._base_uri) + files = {'coverImage': filename and (filename, image) or image} + return self.request(url, method='POST', form={'id': id}, files=files) + + +class Categories(ApiResource): + def __init__(self, config: Config): + super().__init__(config) + self._base_uri = {'base_url': self.config.base_url.value, 'version': 'api/v1'} + + def list(self): + return self.request(Uri.of(None, endpoint='lookup/getcategories', **self._base_uri)) + + def save(self, name: str, *, key: Optional[str] = None, icon: Optional[str] = None): + url = Uri.of(None, endpoint='lookup/savecategory', **self._base_uri) + body = {'value': name, 'key': key or get_uuid(), 'icon': icon or 'other.svg'} + response = self.request(url, method='POST', body=body) + return response.copy_with(data=self.__extract_data(response.data)) + + def delete(self, key: str): + url = Uri.of(None, endpoint=f'lookup/deletecategory/{key}', **self._base_uri) + response = self.request(url, method='DELETE') + return response.copy_with(data=self.__extract_data(response.data)) + + def __extract_data(self, data: Any): + if not isinstance(data, dict): + return data + + categories = data.get('data', {}).get('Metadata.ProductCategories', []) + return [{k.lower(): v for k, v in c.items()} for c in categories] diff --git a/src/cspark/sdk/resources/_impex.py b/src/cspark/sdk/resources/_impex.py index 5442822..3417d6f 100644 --- a/src/cspark/sdk/resources/_impex.py +++ b/src/cspark/sdk/resources/_impex.py @@ -31,7 +31,7 @@ def exports(self): def imports(self): return Import(self.config) - def export( + def exp( self, *, folders: Optional[List[str]] = None, @@ -68,7 +68,7 @@ def export( return exporter.download([f['file'] for f in files]) - def import_( + def imp( self, destination: Union[str, List[str], Mapping[str, str], List[Mapping[str, str]]], file: BinaryIO, @@ -105,6 +105,10 @@ def import_( return status + # aliases for export and import to avoid breaking changes. + export = exp + import_ = imp + class Export(ApiResource): def __init__(self, config: Config): diff --git a/src/cspark/sdk/resources/_services.py b/src/cspark/sdk/resources/_services.py index ffe0aea..904ecc1 100644 --- a/src/cspark/sdk/resources/_services.py +++ b/src/cspark/sdk/resources/_services.py @@ -147,6 +147,8 @@ def execute( tables_as_array: Union[None, str, List[str]] = None, selected_outputs: Union[None, str, List[str]] = None, outputs_filter: Optional[str] = None, + # extra metadata if needed + extras: Optional[Mapping[str, Any]] = None, ): uri = Uri.validate(uri) @@ -166,6 +168,7 @@ def execute( tables_as_array=tables_as_array, selected_outputs=selected_outputs, outputs_filter=outputs_filter, + extras=extras, ) if executable.is_batch: @@ -206,6 +209,8 @@ def transform( tables_as_array: Union[None, str, List[str]] = None, selected_outputs: Union[None, str, List[str]] = None, outputs_filter: Optional[str] = None, + # extra metadata if needed + extras: Optional[Mapping[str, Any]] = None, ): uri = Uri.validate(uri) @@ -224,6 +229,7 @@ def transform( tables_as_array=tables_as_array, selected_outputs=selected_outputs, outputs_filter=outputs_filter, + extras=extras, ) endpoint = f'transforms/{using or uri.service}/for/{uri.pick("folder", "service").encode()}' @@ -257,6 +263,8 @@ def validate( tables_as_array: Union[None, str, List[str]] = None, selected_outputs: Union[None, str, List[str]] = None, outputs_filter: Optional[str] = None, + # extra metadata if needed + extras: Optional[Mapping[str, Any]] = None, ): uri = Uri.validate(uri) validation_type = StringUtils.is_not_empty(validation_type) and str(validation_type).lower() or None @@ -277,6 +285,7 @@ def validate( tables_as_array=tables_as_array, selected_outputs=selected_outputs, outputs_filter=outputs_filter, + extras=extras, ) url = Uri.of(uri, base_url=self.config.base_url.full, endpoint='validation') body = { @@ -298,10 +307,14 @@ def get_schema( version_id: Optional[str] = None, ): uri = Uri.validate(Uri.to_params(uri) if uri else UriParams(folder, service, version_id=version_id)) - endpoint = f'product/{uri.folder}/engines/get/{uri.service}/{uri.version_id or ""}' - url = Uri.of(base_url=self.config.base_url.value, version='api/v1', endpoint=endpoint) + base, folder, service = self.config.base_url, uri.folder, uri.service + url = ( + Uri.partial(f'GetEngineDetailByVersionId/versionid/{uri.version_id}', base_url=base.full) + if StringUtils.is_not_empty(uri.version_id) + else Uri.of(base_url=base.value, version='api/v1', endpoint=f'product/{folder}/engines/get/{service}') + ) - return self.request(url) + return self.request(url, method='POST' if uri.version_id else 'GET') def get_metadata( self, @@ -497,6 +510,8 @@ def __init__( tables_as_array: Union[None, str, List[str]] = None, selected_outputs: Union[None, str, List[str]] = None, outputs_filter: Optional[str] = None, + # extra metadata if needed + extras: Optional[Mapping[str, Any]] = None, ): self._uri = uri self._is_batch = is_batch @@ -524,12 +539,13 @@ def __init__( self._tables_as_array = StringUtils.join(tables_as_array) self._selected_outputs = StringUtils.join(selected_outputs) self._outputs_filter = outputs_filter + self._extras = extras or {} @property def values(self) -> Dict[str, Any]: if self._is_batch: service_uri = self._uri.pick('folder', 'service', 'version').encode(long=False) - return { + values = { 'service': self._uri.service_id or service_uri or None, 'version_id': self._uri.version_id, 'version_by_timestamp': self._active_since, @@ -538,32 +554,38 @@ def values(self) -> Dict[str, Any]: 'call_purpose': self._call_purpose, 'source_system': self._source_system, 'correlation_id': self._correlation_id, + # extra metadata + **self._extras, + } + else: + values = { + # URI locator via metadata (v3 also supports URI in url path) + 'service_id': self._uri.service_id, + 'version_id': self._uri.version_id, + 'version': self._uri.version, + # v3 expects extra metadata + 'transaction_date': self._active_since, + 'call_purpose': self._call_purpose, + 'source_system': self._source_system, + 'correlation_id': self._correlation_id, + 'array_outputs': self._tables_as_array, + 'compiler_type': self._compiler_type, + 'debug_solve': self._debug_solve, + 'excel_file': self._downloadable, + 'requested_output': self._selected_outputs, + 'requested_output_regex': self._outputs_filter, + 'response_data_inputs': self._echo_inputs, + 'service_category': self._subservices, + # extra metadata + **self._extras, } - return { - # URI locator via metadata (v3 also supports URI in url path) - 'service_id': self._uri.service_id, - 'version_id': self._uri.version_id, - 'version': self._uri.version, - # v3 expects extra metadata - 'transaction_date': self._active_since, - 'call_purpose': self._call_purpose, - 'source_system': self._source_system, - 'correlation_id': self._correlation_id, - 'array_outputs': self._tables_as_array, - 'compiler_type': self._compiler_type, - 'debug_solve': self._debug_solve, - 'excel_file': self._downloadable, - 'requested_output': self._selected_outputs, - 'requested_output_regex': self._outputs_filter, - 'response_data_inputs': self._echo_inputs, - 'service_category': self._subservices, - } + return {k: v for k, v in values.items() if v is not None} @property def as_header(self) -> Dict[str, str]: # NOTE: this has to be a single line string: "'{\"call_purpose\":\"Single Execution\"}'" - value = json.dumps({k: v for k, v in self.values.items() if v is not None}, separators=(',', ':')) + value = json.dumps(self.values, separators=(',', ':')) return {'x-meta' if self._is_batch else 'x-request-meta': "'{}'".format(value)} diff --git a/test/resources/test_batches.py b/test/resources/test_batches.py index 4ba550e..17ba622 100644 --- a/test/resources/test_batches.py +++ b/test/resources/test_batches.py @@ -72,14 +72,16 @@ def test_batch_chunk_can_parse_raw_array_into_object(): def test_create_chunks_distribute_input_dataset_evenly(): + headers = ['sale_id', 'price', 'quantity'] inputs = [[1, 20, 65], [2, 74, 73], [3, 20, 65], [4, 34, 73], [5, 62, 62]] # 5 rows - chunks = create_chunks(inputs, chunk_size=3) + dataset = [headers] + inputs # expecting JSON array format. + chunks = create_chunks(dataset, chunk_size=3) assert len(chunks) == 2 assert chunks[0].size == 3 assert chunks[1].size == 2 - assert chunks[0].data.inputs == [[1, 20, 65], [2, 74, 73], [3, 20, 65]] - assert chunks[1].data.inputs == [[4, 34, 73], [5, 62, 62]] + assert chunks[0].data.inputs == [['sale_id', 'price', 'quantity'], [1, 20, 65], [2, 74, 73], [3, 20, 65]] + assert chunks[1].data.inputs == [['sale_id', 'price', 'quantity'], [4, 34, 73], [5, 62, 62]] assert all(chunk.data.parameters == {} for chunk in chunks) assert all(chunk.data.summary is None for chunk in chunks)