Customization
When developing services, we are inevitably faced with the need to implement custom business logic. Wiredflow allows flexible implementation of custom logic at various stages of development using simple primitives - Python functions.
The integration will not require the implementation of classes with specific methods, inheritance, etc. Everything is pretty transparent - a custom function will have one entry point and one exit point.
In this document you will find detailed instructions describing how to implement and integrate "self-made" functions into wiredflow-based service. And we also will continue improving services which was configured during the previous stages of the tutorial.
Stage customization capability
Wiredflow consists of several key blocks (stages). Some stages do not involve default implementations and require custom implementations in any case:
- Configuration - Configure parameters for next stage in the pipeline;
- Core logic - General abstraction to implement custom logic.
The following stages have default implementations, but they can easily be replaced with custom ones
- HTTP connector - Retrieving data using
HTTP
requests - active requester; - MQTT connector - Retrieving data using
MQTT
protocol - passive subscriber; - Storage - Client to database;
- Send - Send messages to external services.
NB: It is up to you to decide in which block to implement the custom functions. Depending on the task description above, feel free to choose the right architecture for application. However, it's always possible, for example, to implement completely custom pipelines with only core logic blocks.
To insert a custom function into a declared stage, simply pass its
implementation to the parameter configuration
(first argument at each method):
from wiredflow.main.build import FlowBuilder
def custom_function_first(**kwargs):
print('Hello')
def custom_function_second(**kwargs):
print('World!')
def wiredflow_simple_customization():
"""
Quick example, which show how to configure and launch very simple ETL pipeline
"""
flow_builder = FlowBuilder()
flow_builder.add_pipeline('my_logic')\
.with_core_logic(custom_function_first)\
.with_http_connector(custom_function_second)
# Configure service and launch it
flow = flow_builder.build()
flow.launch_flow(execution_seconds=30)
if __name__ == '__main__':
wiredflow_simple_customization()
Usage example
We will continue to modify the service, which has already become quite complex. Let's take a look at the updated technical requirements:
- There is a need to 1) obtain random numbers via HTTP get request and 2) get ordered numbers via MQTT;
- Save data into MongoDB into collections
http_demo
andmqtt_demo
using MongoDB Atlas service; - Every 15 seconds load all available data from MongoDB collections, merge lists with values (sorted list), sort united listed and subtract from the sum of the numbers received via MQTT the sum of the numbers received via HTTP (calculated value);
- Send messages via MQTT protocol to topic
demo/sorted
with sorted list and todemo/subtraction
with calculated value. Configure MQTT broker locally on port 1883.
As can be seen from the explanation above, we will need to integrate a custom business logic that will allow sorting lists and calculating according to desired equation.
Custom logic
To implement custom business logic, there is a need to prepare a function - Python callable.
Functions in Python can use return
or yield
operators.
Depending on how exactly the data needs to be processed, we can choose the appropriate option.
Function (return
something)
Suppose the function is quite capable of handling the volume of data it receives.
In this case we don't need to complicate things, just use return
in the implementation.
from copy import deepcopy
from wiredflow.main.build import FlowBuilder
from wiredflow.mocks.demo_bindings_threads import launch_demo_for_complex_case
def order_and_calculate(**kwargs):
# Through storages we can obtain data from other pipelines
db_connectors = kwargs['db_connectors']
# Load all numbers from databases
numbers_via_http = list(db_connectors['http_numbers'].load())
numbers_via_mqtt = list(db_connectors['mqtt_numbers'].load())
http_numbers = list(map(lambda x: int(x['Generated random number']), numbers_via_http))
mqtt_numbers = list(map(lambda x: int(x['Generated number']), numbers_via_mqtt))
# Create united list and sort it
full_list = deepcopy(http_numbers)
full_list.extend(mqtt_numbers)
full_list.sort()
# Calculate the value
response_value = sum(http_numbers) - sum(mqtt_numbers)
return {'sorted list': str(full_list), 'calculated value': response_value}
def launch_custom_flow():
"""
Example of how to configure and launch flow with several pipelines, MongoDB storages and
custom core
"""
mongo_url = 'mongodb+srv://clusterdreamlone.ryubwzt.mongodb.net/?retryWrites=true&w=majority'
mongo_user = 'dreamlone'
mongo_password = 'mydreamlonepassword'
flow_builder = FlowBuilder()
# Pipeline for HTTP numbers processing
flow_builder.add_pipeline('http_numbers', timedelta_seconds=10)\
.with_http_connector(source='http://localhost:8027',
headers={'accept': 'application/json',
'apikey': 'custom_key_1234'}) \
.with_storage('mongo', source=mongo_url, database_name='wiredflow', collection_name='http_numbers',
username=mongo_user, password=mongo_password)
# Pipeline for MQTT data processing
flow_builder.add_pipeline('mqtt_numbers') \
.with_mqtt_connector(source='localhost', port=1883,
topic='/demo/integers',
username='wiredflow', password='wiredflow') \
.with_storage('mongo', source=mongo_url, database_name='wiredflow', collection_name='mqtt_numbers',
username=mongo_user, password=mongo_password)
# Add pipeline with custom function and send stages
flow_builder.add_pipeline('custom_logic', timedelta_seconds=15)\
.with_core_logic(order_and_calculate)\
.send(destination='localhost', port=1883, topic='demo/sorted', label_to_send='sorted list') \
.send(destination='localhost', port=1883, topic='demo/subtraction', label_to_send='calculated value')
flow = flow_builder.build()
# Or simply flow.launch_flow()
# if there is no need to launch local demo http server
launch_demo_for_complex_case(flow, execution_seconds=30)
if __name__ == '__main__':
launch_custom_flow()
As can be seen, the number of arguments in a custom function is unlimited.
All of them will be passed to the body of the function as a dictionary. You can always access all available database connectors through the db_connectors
field.
The syntax for these connectors will differ depending on which database you are using.
Generator (yield
something)
Sometimes the amount of data to be processed can be pretty large.
Then you may want to divide your data into small portions and process them iteratively.
At the same time, make sure that all subsequent stages in the pipeline
are also applied in the declared order beforehand. Then just use the yield
construction:
def order_and_calculate_by_batches(**kwargs):
# Set processing limit
max_number_of_items = 5
db_connectors = kwargs['db_connectors']
http_numbers = list(map(lambda x: int(x['Generated random number']), list(db_connectors['http_numbers'].load())))
mqtt_numbers = list(map(lambda x: int(x['Generated number']), list(db_connectors['mqtt_numbers'].load())))
if len(http_numbers) > max_number_of_items:
# Process by batches
start_batch_id = 0
for i in range(max_number_of_items, len(http_numbers), max_number_of_items):
print(f'Processing items from {start_batch_id} to {i}')
batch_http_numbers = deepcopy(http_numbers[start_batch_id: i])
batch_http_numbers.extend(mqtt_numbers)
batch_http_numbers.sort()
# Update the start id of item
start_batch_id = i
yield {'sorted list': str(batch_http_numbers),
'calculated value': sum(batch_http_numbers) - sum(mqtt_numbers)}
else:
full_list = deepcopy(http_numbers)
full_list.extend(mqtt_numbers)
full_list.sort()
yield {'sorted list': str(full_list), 'calculated value': sum(http_numbers) - sum(mqtt_numbers)}
If we replace the core logic with order_and_calculate_by_batches
function, we will obtain the following output in terminal:
2023-04-26 10:31:06.715 | DEBUG | wiredflow.main.actions.stages.core_stage:launch:17 - Launch core logic stage
Processing items from 0 to 5
2023-04-26 10:31:06.925 | DEBUG | wiredflow.main.actions.stages.send_stage:send_to_subscribers:57 - Successfully send messages to topic demo/sorted via MQTT protocol
2023-04-26 10:31:06.982 | DEBUG | wiredflow.main.actions.stages.send_stage:send_to_subscribers:57 - Successfully send messages to topic demo/subtraction via MQTT protocol
Processing items from 5 to 10
2023-04-26 10:31:07.059 | DEBUG | wiredflow.main.actions.stages.send_stage:send_to_subscribers:57 - Successfully send messages to topic demo/sorted via MQTT protocol
2023-04-26 10:31:07.146 | DEBUG | wiredflow.main.actions.stages.send_stage:send_to_subscribers:57 - Successfully send messages to topic demo/subtraction via MQTT protocol
Processing items from 10 to 15
2023-04-26 10:31:07.223 | DEBUG | wiredflow.main.actions.stages.send_stage:send_to_subscribers:57 - Successfully send messages to topic demo/sorted via MQTT protocol
2023-04-26 10:31:07.301 | DEBUG | wiredflow.main.actions.stages.send_stage:send_to_subscribers:57 - Successfully send messages to topic demo/subtraction via MQTT protocol
Let's take a closer look at how such a design is used. Running core logic. Then the function "understands" that there is too much data and splits the list into parts. For each part is calculated according to the business logic. As soon as the calculation is finished, the generated data structure is sent via senders. That is, all subsequent stages are applied to each batch iteratively as they become available.
Another example: Imagine that the task for the designed service is to copy data from database 1 to database 2. We plan to repeat this procedure once a day. Limitation - we cannot copy more than 10 rows from database 1 at once.
In this case, if during the day in the database 1 appeared 100 new rows, then we need to repeat the operation to upload and save data in the database 2 10 times (10 rows each). Using the generator and yield operator allows to adapt the original logic natively to compute in a batch mode. So, for example, if the pipeline consists of two stages "core logic - storage", then using the generator we will apply save action on each data partition in the cycle. All of the actions described above will take place within one launch (the run in the current example is repeated once a day). The structure of the pipeline won't be any different when using a simple custom function - only the custom function itself will change!
Send stages
Below are a few words about senders. Senders are stages that allow sending notifications, data, various messages
to third-party services. For example, it could be a message sent via MQTT protocol, or a small data sample via HTTP PUT method.
To add a send stage to the structure of the pipelines, the send()
method is used.
Parameters for configuration:
-
configuration
name of sender to apply or custom implementation. Possible options:mqtt
to send messages via MQTT. It is required to setport
andtopic
if mqtt option is usedhttp_post
to send message via HTTP post requesthttp_put
to send message via HTTP put request
-
destination
- URL of destination - where to send messages label_to_send
- name of data aggregation to send
All the stages used are run sequentially in the pipeline, so that each sends only the required chunk of data (determined by label to send).