NOTE:
The StreamPipes wrapper for python is currently under development. Thus, the processor model description still needs to be implemented externally in Java.
Apache StreamPipes (incubating) enables flexible modeling of stream processing pipelines by providing a graphical modeling editor on top of existing stream processing frameworks.
It leverages non-technical users to quickly define and execute processing pipelines based on an easily extensible toolbox of data sources, data processors and data sinks. StreamPipes has an exchangeable runtime execution layer and executes pipelines using one of the provided wrappers, e.g., for Apache Flink or Apache Kafka Streams.
Pipeline elements in StreamPipes can be installed at runtime - the built-in SDK allows to easily implement new pipeline elements according to your needs. Pipeline elements are standalone microservices that can run anywhere - centrally on your server, in a large-scale cluster or close at the edge.
NOTE: Only works in combination with Java!
from streampipes.core import StandaloneModelSubmitter
from streampipes.manager import Declarer
from streampipes.model.pipeline_element_config import Config
from streampipes.core import EventProcessor
class HelloWorldProcessor(EventProcessor):
def on_invocation(self):
pass
def on_event(self, event):
event['greeting'] = 'hello world'
return event
def on_detach(self):
pass
def main():
# Configurations to be stored in key-value store (consul)
config = Config(app_id='pe/org.apache.streampipes.processor.python')
config.register(type='host',
env_key='SP_HOST',
default='processor-python',
description='processor hostname')
config.register(type='port',
env_key='SP_PORT',
default=8090,
description='processor port')
config.register(type='service',
env_key='SP_SERVICE_NAME',
default='Python Processor',
description='processor service name')
processors = {
'org.apache.streampipes.processors.python.helloworld': HelloWorldProcessor,
}
# Declarer
# add the dict of processors to the Declarer
# This is an abstract class that holds the specified processors
Declarer.add(processors=processors)
# StandaloneModelSubmitter
# Initializes the REST api
StandaloneModelSubmitter.init(config=config)
if __name__ == '__main__':
main()
Apache StreamPipes is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation status is not necessarily a reflection of the completeness or stability of the code, it does indicate that the project has yet to be fully endorsed by the ASF.
Some of the incubating project’s releases may not be fully compliant with ASF policy. For example, releases may have incomplete or un-reviewed licensing conditions. What follows is a list of known issues the project is currently aware of (note that this list, by definition, is likely to be incomplete): (currently no issues are known)
If you are planning to incorporate this work into your product/project, please be aware that you will need to conduct a thorough licensing review to determine the overall implications of including this work. For the current status of this project through the Apache Incubator visit: https://incubator.apache.org/projects/streampipes.html