Split Filter Operator
Like the Split Filter
operator, the Split Filter-Map
operator extends the functionality of a traffic splitter with the filter-map transformation. The operator canselectively filter out what entries enter what sinks while also applying mapping functionality. The following example is an extension of the filter map example. We will implement a dataflow that can detect the whether or not an input is a valid addition or substraction statement, compute the equation, and send it to the right sink.
Prerequisites
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions at here.
Transformation
The filter-map
transformation is placed in the sink section.
sinks:
- type: topic
id: (...)
transforms:
- operator: filter-map
run: |
(... filter map function ...)
(... more topics ...)
The function defined should take in the input and return a Result<Option<...>>
. The Result<Option<...>>
is either a Some(...)
if the input is not filtered or a None
if the input should be filtered. The following are our transformations. The code is shortened for brevity but the full example is below. The regex and calculation is just replaced for the dosubtraction
topic compared to the filter map example.
sinks:
- type: topic
id: doaddition
transforms:
- operator: filter-map
dependencies:
- name: regex
version: "1"
run: |
fn do_addition(input: String) -> Result<Option<String> > {
(...)
}
- type: topic
id: dosubtraction
transforms:
- operator: filter-map
dependencies:
- name: regex
version: "1"
run: |
fn do_substraction(input: String) -> Result<Option<String> > {
let re = regex::Regex::new(r"^(\d+)-(\d+)=$").unwrap();
if let Some(num) = re.captures(&input) {
(...)
return Ok(Some(format!("{}{}",input,(a-b))));
} else{
return Ok(None);
}
}
Running the Example
Copy and paste following config and save it as dataflow.yaml
.
# dataflow.yaml
apiVersion: 0.5.0
meta:
name: split-filter-map-example
version: 0.1.0
namespace: examples
config:
converter: raw
topics:
sentences:
schema:
value:
type: string
doaddition:
schema:
value:
type: string
dosubtraction:
schema:
value:
type: string
services:
filter-map-service:
sources:
- type: topic
id: sentences
sinks:
- type: topic
id: doaddition
transforms:
- operator: filter-map
dependencies:
- name: regex
version: "1"
run: |
fn do_addition(input: String) -> Result<Option<String> > {
let re = regex::Regex::new(r"^(\d+)\+(\d+)=$").unwrap();
if let Some(num) = re.captures(&input) {
let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
return Ok(Some(format!("{}{}",input,(a+b))));
} else{
return Ok(None);
}
}
- type: topic
id: dosubtraction
transforms:
- operator: filter-map
dependencies:
- name: regex
version: "1"
run: |
fn do_substraction(input: String) -> Result<Option<String> > {
let re = regex::Regex::new(r"^(\d+)-(\d+)=$").unwrap();
if let Some(num) = re.captures(&input) {
let a: i32 = num.get(1).unwrap().as_str().parse().unwrap();
let b: i32 = num.get(2).unwrap().as_str().parse().unwrap();
return Ok(Some(format!("{}{}",input,(a-b))));
} else{
return Ok(None);
}
}
To run example:
$ sdf run --ephemeral
Produce sentences to in sentence
topic:
$ echo "Hello world" | fluvio produce sentences
$ echo "9999+1=" | fluvio produce sentences
$ echo "9999-1=" | fluvio produce sentences
Consume topic doaddition
to retrieve the result in another terminal:
$ fluvio consume doaddition -Bd
9999+1=10000
Consume the other topic dosubtraction
$ fluvio consume dosubtraction -Bd
9999-1=9998
We can see the first entry Hello World
is discarded, but the other two are sent to the right topic with the respective mapping calculation done.
Cleanup
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
$ sdf clean --force
Conclusion
In this example, we covered how to use split traffic with the filter map operator.