Back to blog
tutorialwebsocketrealtime

Build a Real-Time Chat Application with Rust WebSockets and Ultimo

Ultimo Team4 min read

Real-time communication is a baseline expectation for modern applications. Chat, notifications, collaborative editing, live dashboards — they all depend on bidirectional server-client communication over persistent connections.

This tutorial walks through building a fully functional chat server in Rust using Ultimo's built-in WebSocket support with pub/sub channels. We will build a multi-room chat with presence tracking, message history, and a working browser client — all in a single Rust binary.

What We Are Building

By the end of this tutorial, you will have:

  • A WebSocket server handling multiple concurrent connections
  • Named chat rooms with pub/sub message fan-out
  • User presence tracking (join/leave notifications)
  • Message history for late joiners
  • A simple HTML/JavaScript client for testing
  • Type-safe message structures shared between server and client

The full working example is available in the ultimo examples.

Prerequisites

  • Rust 1.86+ installed (rustup)
  • Basic familiarity with async Rust
  • cargo and a terminal

Project Setup

Create a new project and add Ultimo with WebSocket support:

cargo init chat-server
cd chat-server

Add dependencies to Cargo.toml:

[dependencies]
ultimo = { version = "0.5", features = ["websocket"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

Understanding WebSocket Connections

HTTP is request-response: the client sends a request, the server sends one response, the connection (logically) closes. WebSocket upgrades that HTTP connection to a full-duplex channel where both sides can send messages at any time.

In Ultimo, WebSocket connections are first-class. You register a WebSocket route just like an HTTP route, and Ultimo handles the protocol upgrade, frame parsing, ping/pong heartbeats, and connection lifecycle:

use ultimo::prelude::*;
 
#[tokio::main]
async fn main() -> ultimo::Result<()> {
    let mut app = Ultimo::new();
 
    app.websocket("/chat/:room", handle_chat);
 
    app.listen("127.0.0.1:3000").await
}

Defining Message Types

Before writing handlers, define the message types that flow between server and client. Shared types prevent the class of bugs where the client sends a shape the server doesn't expect:

use serde::{Deserialize, Serialize};
 
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum ClientMessage {
    #[serde(rename = "message")]
    ChatMessage { content: String },
    #[serde(rename = "typing")]
    Typing,
    #[serde(rename = "history")]
    RequestHistory { limit: usize },
}
 
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "type")]
pub enum ServerMessage {
    #[serde(rename = "message")]
    ChatMessage {
        user: String,
        content: String,
        timestamp: u64,
    },
    #[serde(rename = "join")]
    UserJoined { user: String },
    #[serde(rename = "leave")]
    UserLeft { user: String },
    #[serde(rename = "presence")]
    Presence { users: Vec<String> },
    #[serde(rename = "history")]
    History { messages: Vec<ChatMessageRecord> },
}
 
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ChatMessageRecord {
    pub user: String,
    pub content: String,
    pub timestamp: u64,
}

Using #[serde(tag = "type")] means messages serialize as {"type": "message", "content": "hello"} — easy to parse on the JavaScript side with a switch statement.

The Chat Handler

The core handler processes WebSocket connections for a specific room:

use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::RwLock;
use std::collections::HashMap;
 
type RoomState = Arc<RwLock<HashMap<String, Vec<ChatMessageRecord>>>>;
 
async fn handle_chat(ctx: Context) -> ultimo::Result<()> {
    let room = ctx.param("room")?;
    let user = ctx.query("user").unwrap_or("anonymous".to_string());
    let rooms: &RoomState = ctx.state();
 
    // Subscribe to the room's pub/sub channel
    ctx.subscribe(format!("chat.{room}")).await;
 
    // Announce the join
    let join_msg = ServerMessage::UserJoined { user: user.clone() };
    ctx.publish(
        format!("chat.{room}"),
        serde_json::to_string(&join_msg)?,
    ).await;
 
    // Handle incoming messages
    ctx.on_message(move |msg| {
        let room = room.clone();
        let user = user.clone();
        async move {
            let text = msg.to_text()?;
            let client_msg: ClientMessage = serde_json::from_str(text)?;
 
            match client_msg {
                ClientMessage::ChatMessage { content } => {
                    let timestamp = SystemTime::now()
                        .duration_since(UNIX_EPOCH)
                        .unwrap()
                        .as_secs();
 
                    let record = ChatMessageRecord {
                        user: user.clone(),
                        content: content.clone(),
                        timestamp,
                    };
 
                    // Store in history
                    let mut rooms = rooms.write().await;
                    rooms.entry(room.clone())
                        .or_default()
                        .push(record);
 
                    // Broadcast to room
                    let server_msg = ServerMessage::ChatMessage {
                        user,
                        content,
                        timestamp,
                    };
                    ctx.publish(
                        format!("chat.{room}"),
                        serde_json::to_string(&server_msg)?,
                    ).await;
                }
                ClientMessage::RequestHistory { limit } => {
                    let rooms = rooms.read().await;
                    let messages = rooms
                        .get(&room)
                        .map(|msgs| {
                            msgs.iter()
                                .rev()
                                .take(limit)
                                .rev()
                                .cloned()
                                .collect()
                        })
                        .unwrap_or_default();
 
                    let history_msg = ServerMessage::History { messages };
                    ctx.send(serde_json::to_string(&history_msg)?).await;
                }
                ClientMessage::Typing => {
                    // Could broadcast typing indicators
                }
            }
            Ok(())
        }
    }).await;
 
    // Handle disconnect — announce leave
    let leave_msg = ServerMessage::UserLeft { user: user.clone() };
    ctx.publish(
        format!("chat.{room}"),
        serde_json::to_string(&leave_msg)?,
    ).await;
 
    Ok(())
}

Key Concepts

ctx.subscribe(channel) — joins a pub/sub channel. All messages published to that channel are sent to this connection automatically.

ctx.publish(channel, message) — sends a message to every subscriber of the channel, including the sender. This is the fan-out mechanism.

ctx.send(message) — sends a message only to the current connection (for private responses like history).

ctx.on_message(handler) — registers a callback for incoming messages from the client. This is where you parse and dispatch client messages.

Serving the HTML Client

Ultimo can serve static files alongside WebSocket and API routes. Add a simple HTML client:

#[tokio::main]
async fn main() -> ultimo::Result<()> {
    let mut app = Ultimo::new();
 
    let rooms: RoomState = Arc::new(RwLock::new(HashMap::new()));
    app.state(rooms);
 
    // Serve the chat client
    app.get("/", |ctx: Context| async move {
        ctx.html(include_str!("../index.html")).await
    });
 
    // WebSocket endpoint
    app.websocket("/chat/:room", handle_chat);
 
    // REST endpoint for room list
    app.get("/rooms", |ctx: Context| async move {
        let rooms: &RoomState = ctx.state();
        let room_names: Vec<String> = rooms.read().await.keys().cloned().collect();
        ctx.json(json!({"rooms": room_names})).await
    });
 
    println!("Chat server running on http://127.0.0.1:3000");
    app.listen("127.0.0.1:3000").await
}

The Browser Client

Create an index.html that connects to the WebSocket and renders messages:

<!DOCTYPE html>
<html>
<head>
    <title>Ultimo Chat</title>
    <style>
        body { font-family: system-ui, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
        #messages { height: 400px; overflow-y: auto; border: 1px solid #ddd; padding: 10px; margin: 10px 0; }
        .message { margin: 4px 0; }
        .system { color: #666; font-style: italic; }
        input, button { padding: 8px 12px; font-size: 14px; }
        #msg-input { width: 70%; }
    </style>
</head>
<body>
    <h1>Ultimo Chat</h1>
    <div>
        <input id="user-input" placeholder="Your name" value="">
        <input id="room-input" placeholder="Room name" value="general">
        <button onclick="connect()">Join</button>
    </div>
    <div id="messages"></div>
    <div>
        <input id="msg-input" placeholder="Type a message..." onkeypress="if(event.key==='Enter')send()">
        <button onclick="send()">Send</button>
    </div>
 
    <script>
        let ws;
        const messages = document.getElementById('messages');
 
        function connect() {
            const user = document.getElementById('user-input').value || 'anonymous';
            const room = document.getElementById('room-input').value || 'general';
 
            ws = new WebSocket(`ws://${location.host}/chat/${room}?user=${user}`);
 
            ws.onopen = () => {
                addMessage('Connected to room: ' + room, 'system');
                ws.send(JSON.stringify({type: 'history', limit: 50}));
            };
 
            ws.onmessage = (event) => {
                const msg = JSON.parse(event.data);
                switch (msg.type) {
                    case 'message':
                        addMessage(`${msg.user}: ${msg.content}`);
                        break;
                    case 'join':
                        addMessage(`${msg.user} joined`, 'system');
                        break;
                    case 'leave':
                        addMessage(`${msg.user} left`, 'system');
                        break;
                    case 'history':
                        msg.messages.forEach(m => addMessage(`${m.user}: ${m.content}`));
                        break;
                }
            };
 
            ws.onclose = () => addMessage('Disconnected', 'system');
        }
 
        function send() {
            const input = document.getElementById('msg-input');
            if (ws && input.value) {
                ws.send(JSON.stringify({type: 'message', content: input.value}));
                input.value = '';
            }
        }
 
        function addMessage(text, cls = 'message') {
            const div = document.createElement('div');
            div.className = cls;
            div.textContent = text;
            messages.appendChild(div);
            messages.scrollTop = messages.scrollHeight;
        }
    </script>
</body>
</html>

Running the Chat Server

cargo run

Open http://127.0.0.1:3000 in multiple browser tabs. Enter different names in each and start chatting. Messages are broadcast in real time through the pub/sub system.

How Pub/Sub Scales

Ultimo's pub/sub system is designed for multi-room fan-out without external dependencies. Under the hood:

  • Each channel maintains a subscriber list (lock-free reads, write-locked on join/leave)
  • Publishing iterates subscribers and sends to each connection's outbound buffer
  • Back-pressure is handled per connection — a slow client doesn't block other subscribers
  • Connection drops trigger automatic unsubscription and cleanup

For a single server, this handles thousands of concurrent connections per room. For horizontal scaling beyond one server, you would front the pub/sub layer with a message broker (Redis pub/sub, NATS, or Kafka) — but for many use cases, a single Ultimo instance is sufficient.

Adding Presence Tracking

Presence (knowing who is currently online in a room) requires tracking connections at the application level. A simple approach using Ultimo's state:

use std::collections::HashSet;
 
type PresenceMap = Arc<RwLock<HashMap<String, HashSet<String>>>>;
 
async fn handle_chat(ctx: Context) -> ultimo::Result<()> {
    let room = ctx.param("room")?;
    let user = ctx.query("user").unwrap_or("anonymous".to_string());
    let presence: &PresenceMap = ctx.state();
 
    // Track presence
    {
        let mut map = presence.write().await;
        map.entry(room.clone()).or_default().insert(user.clone());
    }
 
    // Send current presence to the new user
    {
        let map = presence.read().await;
        let users: Vec<String> = map
            .get(&room)
            .map(|s| s.iter().cloned().collect())
            .unwrap_or_default();
        let msg = ServerMessage::Presence { users };
        ctx.send(serde_json::to_string(&msg)?).await;
    }
 
    // ... message handling (same as before) ...
 
    // On disconnect: remove from presence
    {
        let mut map = presence.write().await;
        if let Some(room_users) = map.get_mut(&room) {
            room_users.remove(&user);
        }
    }
 
    Ok(())
}

Production Considerations

Heartbeats and Timeouts

Ultimo handles WebSocket ping/pong frames automatically. If a client stops responding to pings, the connection is closed after a timeout. This prevents resource leaks from orphaned connections (common in mobile clients that lose connectivity without sending a close frame).

Authentication

In production, you would validate the user before upgrading to WebSocket. Ultimo's middleware pipeline runs before the WebSocket upgrade, so JWT or session validation works naturally:

let app = Ultimo::new()
    .with(JwtAuth::new(jwt_secret));
 
// The WebSocket handler only runs if auth middleware passes
app.websocket("/chat/:room", handle_chat);

Message Persistence

For production chat, you would persist messages to a database (PostgreSQL via SQLx, for example) instead of the in-memory HashMap shown here. The handler pattern stays the same — just replace the RwLock<HashMap> with a database pool.

Rate Limiting Messages

Prevent message flooding with per-connection rate limiting:

ClientMessage::ChatMessage { content } => {
    // Enforce max 10 messages per second per user
    if rate_limiter.check(&user).is_err() {
        ctx.send(r#"{"type":"error","message":"Rate limited"}"#).await;
        return Ok(());
    }
    // ... broadcast as normal
}

Summary

Ultimo's WebSocket support gives you:

  • Pub/sub channels for room-based message fan-out without external dependencies
  • Type-safe messages via serde enums with tagged variants
  • Automatic connection lifecycle — heartbeats, cleanup, back-pressure
  • Middleware integration — auth, rate limiting, and logging apply to WebSocket routes
  • Single binary — your chat server, REST API, and static HTML client all compile into one artifact

The combination of Rust's memory safety, Tokio's async runtime, and Ultimo's pub/sub abstraction produces a chat server that handles thousands of connections with predictable latency and minimal resource usage.

Next steps: