Superfluid Streams
Superfluid streams enable continuous, real-time payments for subscription-based APIs.
How it Works & Architecture
For API Consumers
Create Stream
There are several ways to create a stream:
Use the Superfluid Dashboard to start a stream to the provider's address with your configured flow rate.
Use Pipegate Hub to subscribe to a provider directly.
For CLI users, create a stream using the
command (requires wrapped or native streamable tokens):
cast send $CFA_FORWARDER_ADDRESS "createFlow(address, address, address, int96, bytes)" $TOKEN_ADDRESS $YOUR_ADDRESS $PROVIDER_ADDRESS $FLOW_RATE 0x --rpc-url $RPC_URL --private-key $PRIVATE_KEY
Flow Rate Calculation
flowRate = "761035007610"; // 2 USDC per month
Setup API Client
import { ClientInterceptor } from "pipegate-sdk";
const pipeGate = new ClientInterceptor();
const api = axios.create({
baseURL: "",
const streamSender = "0x..."; // Your address
For API Providers
Configure Stream Settings
Configure the token you want to receive the streams in and the amount converted to flow rate ( /sec ).
use pipegate::middleware::stream_payment::types::StreamsConfig;
use pipegate::utils::{Address, Url, I96};
let rpc_url: Url = "".parse().unwrap();
let stream_payment_config = StreamsConfig {
recipient: Address::from_str("YOUR_ADDRESS").unwrap(),
token_address: Address::from_str("TOKEN_ADDRESS").unwrap(),
amount: "761035007610".parse::<I96>().unwrap(), // 2 USDC/month
cfa_forwarder: Address::from_str("CFA_ADDRESS").unwrap(),
rpc_url: rpc_url.to_string(),
cache_time: 86400, // 1 day
Setup Middleware
use pipegate::middleware::stream_payment::{state::StreamState, StreamMiddlewareLayer};
let stream_state = StreamState::new();
let app = Router::new()
.route("/", get(root))
Attach listener
Attach the stream listener to the server that will maintain a local cache of the streams verification and status, later invalidating the cache if the stream is terminated.
use pipegate::middleware::stream_payment::{types::StreamListenerConfig, StreamListner};
let stream_state_clone = stream_state.clone();
let stream_payment_config_clone = stream_payment_config.clone();
// create configuration for the stream listener
let stream_listener_config = StreamListenerConfig {
wss_url: "wss://".to_string(),
cfa: Address::from_str("0x6836F23d6171D74Ef62FcF776655aBcD2bcd62Ef").unwrap(),
// start the stream listener
let _stream_listener = StreamListner::new(
To check the current streams you are receiving, you can check the superfluid app for now
Best Practices
- Monitor stream health
- Handle stream termination
- Implement grace periods
- Cache stream status