Skip to content

Superfluid Streams

Superfluid streams enable continuous, real-time payments for subscription-based APIs.

How it Works & Architecture

Stream Flow

For API Consumers

Create Stream

There are several ways to create a stream:

  1. Use the Superfluid Dashboard to start a stream to the provider's address with your configured flow rate.

  2. Use Pipegate Hub to subscribe to a provider directly.

  3. For CLI users, create a stream using the cast command (requires wrapped or native streamable tokens):

terminal
cast send $CFA_FORWARDER_ADDRESS "createFlow(address, address, address, int96, bytes)" $TOKEN_ADDRESS $YOUR_ADDRESS $PROVIDER_ADDRESS $FLOW_RATE 0x --rpc-url $RPC_URL --private-key $PRIVATE_KEY

Flow Rate Calculation

flowRate = "761035007610"; // 2 USDC per month

Setup API Client

import { ClientInterceptor } from "pipegate-sdk";
 
const pipeGate = new ClientInterceptor();
 
const api = axios.create({
  baseURL: "https://api.example.com",
});
 
const streamSender = "0x..."; // Your address
 
api.interceptors.request.use(
  pipeGate.createStreamRequestInterceptor(streamSender).request
);

For API Providers

Configure Stream Settings

Configure the token you want to receive the streams in and the amount converted to flow rate ( /sec ).

use pipegate::middleware::stream_payment::types::StreamsConfig;
use pipegate::utils::{Address, Url, I96};
 
let rpc_url: Url = "https://base-rpc.publicnode.com".parse().unwrap();
 
let stream_payment_config = StreamsConfig {
    recipient: Address::from_str("YOUR_ADDRESS").unwrap(),
    token_address: Address::from_str("TOKEN_ADDRESS").unwrap(),
    amount: "761035007610".parse::<I96>().unwrap(), // 2 USDC/month
    cfa_forwarder: Address::from_str("CFA_ADDRESS").unwrap(),
    rpc_url: rpc_url.to_string(),
    cache_time: 86400, // 1 day
};

Setup Middleware

use pipegate::middleware::stream_payment::{state::StreamState, StreamMiddlewareLayer};
 
let stream_state = StreamState::new();
 
let app = Router::new()
    .route("/", get(root))
    .layer(StreamMiddlewareLayer::new(
        stream_payment_config,
        stream_state
    ));

Attach listener

Attach the stream listener to the server that will maintain a local cache of the streams verification and status, later invalidating the cache if the stream is terminated.

use pipegate::middleware::stream_payment::{types::StreamListenerConfig, StreamListner};
 
let stream_state_clone = stream_state.clone();
 
let stream_payment_config_clone = stream_payment_config.clone();
 
// create configuration for the stream listener
let stream_listener_config = StreamListenerConfig {
    wss_url: "wss://base-sepolia-rpc.publicnode.com".to_string(),
    cfa: Address::from_str("0x6836F23d6171D74Ef62FcF776655aBcD2bcd62Ef").unwrap(),
};
 
// start the stream listener
let _stream_listener = StreamListner::new(
    stream_state_clone,
    stream_payment_config_clone,
    stream_listener_config,
).await;

To check the current streams you are receiving, you can check the superfluid app for now

Best Practices

  • Monitor stream health
  • Handle stream termination
  • Implement grace periods
  • Cache stream status