Skip to main content

Getting Started with Stateful Dataflows

This guide will get you started with SDF, an utility that helps developers build, troubleshoot, and run full-featured event-driven dataflows.

Overview

This SDF release includes several new features and improvements. The main feature is composition, which allows you to create individual packages and import them to create dataflows.

Example Dataflow

As an example, we'll create a dataflow that splits sentences into words, and counts the number of characters in each word.

Preview Example

The dataflow reads from sentences topic and writes to words topics. This example is available for download in github.

Prerequisites

Building a Stateful Dataflow requires the following software:

  • Rust 1.80 or beyond - Install Rust
  • wasm32-wasip1 Rust target installed. Typically, installed with: rustup target add wasm32-wasip1

Installing Fluvio & Start a Cluster

SDF requires a Fluvio Cluster to consume, produce, and stream records between services.

Download and install the CLI.

$ curl -fsS https://hub.infinyon.cloud/install/install.sh | bash

This command will download the Fluvio Version Manager (fvm), Fluvio CLI (fluvio) and config files into $HOME/.fluvio, with the executables in $HOME/.fluvio/bin. To complete the installation, you must add the executables to your shell $PATH.

Start a Local cluster:

$ fluvio cluster start

If you prefer to run your cluster in InfinyOn Cloud follow the instructions here.

Run the following command to check the CLI and the Cluster platform versions:

$ fluvio version

Your Fluvio cluster is ready for use.

Install and Setup SDF

SDF is in beta and it requires the following image:

fvm install sdf-beta1

You can validate prerequisites with:

sdf setup
All pre-requisites are installed!

Your SDF environment is ready to go.

Use Composition to build a Stateful Dataflow

Composition has two main components, packages and dataflows. You can build and test a package independently, then import it into a dataflow. For additional information, check out the Composition section.

1. Building a Package

The package is a collection of services, functions, and states that can be defined, tested, then be imported into a dataflow.

Let's build one:

1.1 Create a Package File

Open the terminal, create a fresh project directory, say split-sentence, where we'll add the dataflow later on. Inside the project directory, create a packages directory and a subdirectory sentence for the package itself:

$ mkdir -p split-sentence/packages/sentence
$ cd split-sentence/packages/sentence

Inside the sentence directory and create an file called sdf-package.yaml and add the following content:

apiVersion: 0.5.0

meta:
name: sentence-pkg
version: 0.1.0
namespace: example

functions:

sentence-to-words:
operator: flat-map
inputs:
- name: sentence
type: string
output:
type: string

augment-count:
operator: map
inputs:
- name: word
type: string
output:
type: string

dev:
converter: raw

The package file instructs the generator to build two functions, sentence-to-words and augment-count, that we intend to implement. In addition, it tells the sdf test runtime that our inputs should be ingested as raw for testing our routines.

1.2 Generate the Package Project

The SDF generate command parses the sdf-package.yaml file and builds the project:

$ sdf generate

The generator created several directories and files that we'll edit next.

1.3 Add the Custom Code

First let's update the first function sentence-to-words. Open rust/sentence-to-words/src/lib.rs and update the function body with the following code:

fn sentence_to_words(sentence: String) -> Result<Vec<String>> {
Ok(sentence.split_whitespace().map(String::from).collect())
}

Next update augment_count. Open rust/augment-count/src/lib.rs and replace the function body:

fn augment_count(word: String) -> Result<String> {
Ok(format!("{}({})", word, word.chars().count()))
}

Let's add some tests as well:

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_augment_count() {
let input = "Hello".to_string();
let output = Component::augment_count(input);
assert_eq!(output.unwrap(), "Hello(5)");
}
}

We've implemented both functions, it's time to compile and test our work.

1.4 Build and Test the Package

To build the package, run:

$ sdf build

SDF has a built-in test interactive shell. Let's bring it up:

$ sdf test

In the test shell, you can view the functions available for testing:

>> show functions
sentence-to-words
augment-count

Let's test sentence-to-words first:

>> test function sentence-to-words --value "Hello World"
Hello
World

Next, test augment-count:

>> test function augment-count --value "Hello"
Hello(5)

You may also test the rust code via Cargo:

cd rust/augment-count
cargo test

The tests passed, and package now is ready to use in the dataflow file.

2. Build the Stateful Dataflow

We are building a dataflow that reads words from sentences topic, and publishes the result to words topic. Let's get started.

2.1 Create a Dataflow File

Navigate to the base project directory.

$ cd ../../

Create a file called dataflow.yaml and copy/paste the following content:

apiVersion: 0.5.0

meta:
name: split-sentence
version: 0.1.0
namespace: example

imports:
- pkg: example/sentence-pkg@0.1.0
path: ./packages/sentence
functions:
- name: sentence-to-words
- name: augment-count

topics:
sentence:
schema:
value:
type: string
converter: raw
words:
schema:
value:
type: string
converter: raw

services:
sentence-words:
sources:
- type: topic
id: sentence

transforms:
- operator: flat-map
uses: sentence-to-words
- operator: map
uses: augment-count

sinks:
- type: topic
id: words

dev:
imports:
- pkg: example/sentence-pkg@0.1.0
path: ./packages/sentence

This example focuses on composition, and the area of interest is the imports section:

  • package - is the name of the package we are importing (a composition of the meta fields of the package)
  • path - is the relative directory where this package can be found.
  • functions - the name of the functions we want to import.

The imported functions are then referenced them by name in the transforms section.

2.2 Run the Dataflow

Let's run the project:

$ sdf run --ui --ephemeral
Please visit http://127.0.0.1:8000 to view your workflow visualization

>>

Note:

  • The run command performs multiple operations:
    • imports and links all packages
    • compiles inline code (if needed)
    • looks-up the topics in the cluster and automatically creates them if they don't exist.
  • The --ui flag generates a visual representation of the dataflow at http://127.0.0.1:8000.
  • When you close the run intractive editor, the dataflow stops processing records.

2.3 Test the Dataflow

To test the dataflow, we'll generate to one topic and read from the other.

Produce a sentence in the sentence topic:

$ echo 'Hello from stateful dataflows' | fluvio produce sentence

Consume from the words topic:

$ fluvio consume words -Bd
Consuming records from 'words' starting from the beginning of log
Hello(5)
from(4)
stateful(8)
dataflows(9)

Note, the both functions have been applied, with the first function spliting the sentence into words and the second counting the number of characters in each word.

SDF also exposes execution metrics. To view the metrics, run:

>> show state
Namespace Keys Type
sentence-words/augment-count/metrics 1 table
sentence-words/sentence/topic.offset 1 offset
sentence-words/sentence-to-words/metrics 1 table
sentence-words/sentence/metrics 1 table

Let's pick a metric:

>> show state  sentence-words/augment-count/metrics
Key Window succeeded failed
stats * 4 0

🎉 Congratulations! You have successfully used compositino to build a Stateful Dataflow.

Other Examples

For additional examples, check out stateful-dataflows-examples in github. The examples cover additional functionality shipped in prior preview releases.

We Love Feedback

Stateful Dataflows is an ambitious project with many possibilities and just as many hazards. Please get in touch, we would love to hear your feedback and help us steer the product in the right direction.