Rust SDK Examples
- The Rust client is the core client for all language clients.
- New features arrive in the Rust client before any of the other clients
- Full support for the Admin API.
- This client uses async Rust for all blocking calls.
Refer to the fluvio docs.rs page for full detail.
Example Workflow
Follow the installation instructions to run this example.
[package]
edition = "2021"
name = "fluvio-rust-example"
publish = false
version = "0.0.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-std = {version = "1", features = ["attributes"]}
chrono = "0.4"
fluvio = "0.23"
use async_std::stream::StreamExt;
use chrono::Local;
use fluvio::metadata::topic::TopicSpec;
use fluvio::{consumer::ConsumerConfigExtBuilder, Fluvio, RecordKey};
const TOPIC_NAME: &str = "hello-rust";
const PARTITION_NUM: u32 = 0;
const PARTITIONS: u32 = 1;
const REPLICAS: u32 = 1;
/// This is an example of a basic Fluvio workflow in Rust
///
/// 1. Establish a connection to the Fluvio cluster
/// 2. Create a topic to store data in
/// 3. Create a producer and send some bytes
/// 4. Create a consumer, and stream the data back
#[async_std::main]
async fn main() {
// Connect to Fluvio cluster
let fluvio = Fluvio::connect().await.unwrap();
// Create a topic
let admin = fluvio.admin().await;
let topic_spec = TopicSpec::new_computed(PARTITIONS, REPLICAS, None);
let _topic_create = admin
.create(TOPIC_NAME.to_string(), false, topic_spec)
.await;
// Create a record
let record = format!("Hello World! - Time is {}", Local::now().to_rfc2822());
// Produce to a topic
let producer = fluvio::producer(TOPIC_NAME).await.unwrap();
producer.send(RecordKey::NULL, record).await.unwrap();
// Fluvio batches outgoing records by default, so flush producer to ensure all records are sent
producer.flush().await.unwrap();
// Consume last record from topic
let fluvio = Fluvio::connect().await.unwrap();
let config = ConsumerConfigExtBuilder::default()
.topic(TOPIC_NAME)
.partition(PARTITION_NUM)
.offset_start(fluvio::Offset::from_end(1))
.build()
.unwrap();
let mut stream = fluvio.consumer_with_config(config).await.unwrap();
if let Some(Ok(record)) = stream.next().await {
let string = String::from_utf8_lossy(record.value());
println!("{}", string);
}
}
Run
$ cargo run
Additional Producer options
Alternatively, we can create a producer with custom configuration:
Example:
This is how to configure a Producer with a
batch_size
of 500 bytes
, linger of 500ms
, and Gzip
type compression.
let config = TopicProducerConfigBuilder::default()
.batch_size(500)
.linger(std::time::Duration::from_millis(500))
.compression(Compression::Gzip)
.build().expect("Failed to create topic producer config");
let producer = fluvio.topic_producer_with_config("my-fluvio-topic", config).await.expect("Failed to create a producer");
Using a SmartModule with the Rust Consumer
Below is an example of how to use a SmartModule filter with the Rust consumer.
use std::io::Read;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use fluvio::{Fluvio, Offset, PartitionConsumer};
use fluvio::consumer::{
SmartModuleInvocation,
SmartModuleInvocationWasm,
SmartModuleKind,
ConsumerConfig
};
use async_std::stream::StreamExt;
let raw_buffer = std::fs::read("/my_projects/example_filter/target/wasm32-unknown-unknown/release/example_filter.wasm").expect("wasm file is missing");
let mut encoder = GzEncoder::new(raw_buffer.as_slice(), Compression::default());
let mut buffer = Vec::with_capacity(raw_buffer.len());
encoder.read_to_end(&mut buffer);
let mut builder = ConsumerConfig::builder();
builder.smartmodule(Some(SmartModuleInvocation {
wasm: SmartModuleInvocationWasm::AdHoc(buffer),
kind: SmartModuleKind::Filter,
params: Default::default()
}));
let filter_config = builder.build().expect("Failed to create config");
// create partition consumer
let consumer = fluvio.partition_consumer("my-topic", 0).await.expect("failed to create consumer");
// stream from beginning
let mut stream = consumer.stream_with_config(Offset::beginning(),filter_config).await.expect("Failed to create stream");
while let Some(Ok(record)) = stream.next().await {
let key = record.key().map(|key| String::from_utf8_lossy(key).to_string());
let value = String::from_utf8_lossy(record.value()).to_string();
println!("Got filter event: key={:?}, value={}", key, value);
}
Refer to the fluvio docs.rs page for full detail.