Skip to main content
Version: 0.11.11 (stable)

Python SDK Examples

To see the full docs, visit our pdoc page.

Example Workflow

Follow the installation instructions to run this example.

Prerequisites

Create the topic used to produce and consume records:

fluvio topic create python-data

Login

Login to Infinyon Cloud using username/password

from fluvio import cloud
cloud.login(email="my@email.com", password="mypassword")

You can also use the oauth method to log in. However this is only for interactive sessions.

from fluvio import cloud
cloud.login(Oauth2=true)

Producer

Create a file called python-produce.py:

#!/usr/bin/env python
from datetime import datetime
from fluvio import Fluvio

TOPIC_NAME = "python-data"
PARTITION = 0

if __name__ == "__main__":
# Connect to cluster
fluvio = Fluvio.connect()

# Produce 10 records to topic
producer = fluvio.topic_producer(TOPIC_NAME)
for x in range(10):
producer.send_string("{}: timestamp: {}".format(x, datetime.now()))

# Flush the last entry
producer.flush()

Let's run the file:

$ python python-produce.py

Consumer

Create a file called python-consume.py:

#!/usr/bin/env python
from fluvio import Fluvio, Offset

TOPIC_NAME = "python-data"
PARTITION = 0

if __name__ == "__main__":
# Connect to cluster
fluvio = Fluvio.connect()

# Consume last 10 records from topic
consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)
for idx, record in enumerate( consumer.stream(Offset.from_end(10)) ):
print("{}".format(record.value_string()))

if idx >= 9:
break

Let's run the file:

$ python python-consume.py

Limitations

  • Fluvio cluster administration is not supported.
  • Python async is not supported.

Example with a SmartModule

#!/usr/bin/env python
import os
from datetime import datetime
from fluvio import Fluvio, Offset, ConsumerCoonfig

TOPIC_NAME = "hello-python-smartmodule"
PARTITION = 0

# This is an example of a basic Fluvio workflow in Python
#
# 1. Create a topic to store data in via CLI
# 2. Establish a connection to the Fluvio cluster
# 3. Create a producer and send some bytes
# 4. Create a consumer, and stream the data back
if __name__ == "__main__":
# Currently the Python client does not support creating topics
# Using the Fluvio CLI
os.popen("fluvio topic create {}".format(TOPIC_NAME))

# Connect to cluster
fluvio = Fluvio.connect()

# Produce to topic
producer = fluvio.topic_producer(TOPIC_NAME)
producer.send_string("Hello World! - Time is: {}".format(datetime.now()))

# Consume from topic
# We're just going to get the last record
consumer = fluvio.partition_consumer(TOPIC_NAME, PARTITION)


# Create a ConsumerConfig using your "uppercase-map" smartmodule
config = ConsumerConfig()
config.smartmodule(name="uppercase-map")

for record in consumer.stream_with_config(Offset.from_end(0), config):
print("{}".format(record.value_string()))
break