Skip to main content

Built-in Replication

edgeProxy v0.3.0 includes built-in SQLite replication for automatic state synchronization across multiple POPs. This document provides a deep-dive into how the replication system works, targeted at developers who want to understand the internals.

Overview

The built-in replication system enables automatic synchronization of routing.db across multiple POPs (Points of Presence). When a backend is registered at one POP, it automatically propagates to all other POPs in the cluster.

Replication Architecture

Key Concepts

1. Hybrid Logical Clock (HLC)

The HLC is the foundation for ordering events across distributed nodes. It combines:

  • Wall Clock Time: Real timestamp in milliseconds
  • Logical Counter: Incremented when events happen at the same millisecond
// src/replication/types.rs
pub struct HlcTimestamp {
pub wall_time: u64, // milliseconds since epoch
pub logical: u32, // logical counter
pub node_id: String, // which node generated this timestamp
}

Why HLC?

Physical clocks can drift between servers. If Node A's clock is 100ms ahead of Node B, events on Node A would incorrectly appear newer. HLC solves this by:

  1. Using the maximum of local time and received message time
  2. Incrementing logical counter for ties
  3. Including node_id for deterministic tie-breaking
impl HlcTimestamp {
pub fn tick(&mut self) {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;

if now > self.wall_time {
self.wall_time = now;
self.logical = 0;
} else {
self.logical += 1;
}
}
}

2. Last-Write-Wins (LWW) Conflict Resolution

When two nodes modify the same record simultaneously, we need deterministic conflict resolution. LWW uses the HLC timestamp:

impl Change {
pub fn wins_over(&self, other: &Change) -> bool {
// Compare wall time first
if self.hlc_timestamp.wall_time != other.hlc_timestamp.wall_time {
return self.hlc_timestamp.wall_time > other.hlc_timestamp.wall_time;
}
// Then logical counter
if self.hlc_timestamp.logical != other.hlc_timestamp.logical {
return self.hlc_timestamp.logical > other.hlc_timestamp.logical;
}
// Finally node_id for deterministic tie-breaking
self.hlc_timestamp.node_id > other.hlc_timestamp.node_id
}
}

Example scenario:

  1. Node SA updates backend b1 at HLC(1000, 0, "sa")
  2. Node US updates backend b1 at HLC(1000, 0, "us")
  3. Both changes arrive at Node EU
  4. EU applies sa's change because "us" > "sa" lexicographically? No!
  5. Actually, "sa" < "us", so US wins (highest node_id wins ties)

3. Change Detection

Changes are tracked via the Change struct:

pub struct Change {
pub table: String, // "backends"
pub row_id: String, // primary key
pub kind: ChangeKind, // Insert, Update, Delete
pub data: String, // JSON serialized row data
pub hlc_timestamp: HlcTimestamp,
}

pub enum ChangeKind {
Insert,
Update,
Delete,
}

The SyncService collects pending changes and flushes them as a ChangeSet:

pub struct ChangeSet {
pub origin_node: String,
pub changes: Vec<Change>,
pub checksum: u32, // CRC32 for integrity
}

4. SWIM-like Gossip Protocol

The gossip protocol handles cluster membership and failure detection. It's inspired by SWIM (Scalable Weakly-consistent Infection-style Process Group Membership).

// src/replication/gossip.rs
pub enum GossipMessage {
// Check if node is alive
Ping {
sender_id: String,
sender_gossip_addr: SocketAddr,
sender_transport_addr: SocketAddr,
incarnation: u64,
},
// Response to ping
Ack {
sender_id: String,
sender_gossip_addr: SocketAddr,
sender_transport_addr: SocketAddr,
incarnation: u64,
},
// Announce joining the cluster
Join {
node_id: String,
gossip_addr: SocketAddr,
transport_addr: SocketAddr,
},
// Share member list
MemberList {
members: Vec<(String, SocketAddr, SocketAddr, u64)>,
},
}

Membership flow:

  1. New node sends Join to bootstrap peers
  2. Bootstrap peer adds new node to member list
  3. Bootstrap peer responds with MemberList
  4. New node adds all discovered members
  5. Periodic Ping/Ack maintains liveness

Failure detection:

  • Nodes ping random members every gossip_interval (default: 1s)
  • If no Ack received within 30s, member is marked Dead
  • Dead members are removed from routing

5. QUIC Transport

Data synchronization uses QUIC via the Quinn library:

// src/replication/transport.rs
pub struct TransportService {
endpoint: Endpoint,
peers: RwLock<HashMap<String, Connection>>,
// ...
}

Why QUIC?

  • Multiplexed streams: Multiple ChangeSets can sync simultaneously
  • Built-in encryption: TLS 1.3 for secure peer communication
  • Connection migration: Handles IP changes gracefully
  • Low latency: 0-RTT handshakes for known peers

Self-signed certificates:

The transport generates self-signed certificates for cluster communication:

fn generate_self_signed_cert() -> (CertificateDer, PrivateKeyDer) {
let cert = rcgen::generate_simple_self_signed(vec![
"localhost".to_string(),
"127.0.0.1".to_string(),
]).unwrap();
// ...
}

Data Flow: End-to-End

Let's trace a backend registration from start to finish:

Step 1: Backend Registration

# Backend registers via Auto-Discovery API
curl -X POST http://pop-sa:8081/api/v1/register \
-H "Content-Type: application/json" \
-d '{"id": "sa-node-1", "app": "myapp", "region": "sa", "ip": "10.50.1.1", "port": 9000}'

Step 2: Local SQLite Write

The ApiServer inserts into local SQLite:

// adapters/inbound/api_server.rs
async fn register_backend(State(state): State<AppState>, Json(req): Json<RegisterRequest>) {
// Insert into SQLite
sqlx::query("INSERT INTO backends ...")
.execute(&state.db)
.await?;
}

Step 3: Change Recorded

The SyncService records the change with an HLC timestamp:

// replication/sync.rs
pub fn record_backend_change(&self, id: &str, kind: ChangeKind, data: &str) {
let mut hlc = self.hlc.write();
hlc.tick();

let change = Change {
table: "backends".to_string(),
row_id: id.to_string(),
kind,
data: data.to_string(),
hlc_timestamp: hlc.clone(),
};

self.pending_changes.write().push(change);
}

Step 4: Flush to ChangeSet

Periodically (default: 5s), pending changes are flushed:

pub async fn flush(&self) -> Option<ChangeSet> {
let changes: Vec<Change> = {
let mut pending = self.pending_changes.write();
if pending.is_empty() { return None; }
pending.drain(..).collect()
};

let changeset = ChangeSet::new(&self.node_id, changes);
let _ = self.event_tx.send(SyncEvent::BroadcastReady(changeset.clone())).await;
Some(changeset)
}

Step 5: Broadcast via QUIC

The ReplicationAgent receives the event and broadcasts to all peers:

// replication/agent.rs
async fn handle_sync_event(&self, event: SyncEvent) {
match event {
SyncEvent::BroadcastReady(changeset) => {
let transport = self.transport.read().await;
for member in self.gossip.alive_members() {
transport.send_changeset(&member.transport_addr, &changeset).await;
}
}
}
}

Step 6: Remote Node Receives

On the receiving POP (e.g., POP-US):

// replication/transport.rs
async fn handle_incoming_stream(&self, stream: RecvStream) {
let msg: Message = bincode::deserialize(&data)?;
match msg {
Message::ChangeBroadcast(changeset) => {
if changeset.verify_checksum() {
self.event_tx.send(TransportEvent::ChangeSetReceived(changeset)).await;
}
}
}
}

Step 7: Apply with LWW

The SyncService applies changes using LWW:

pub async fn apply_changeset(&self, changeset: &ChangeSet) -> anyhow::Result<usize> {
let mut applied = 0;

for change in &changeset.changes {
// Check if we already have a newer version
let existing = self.version_vector.read().get(&change.row_id);
if let Some(existing_hlc) = existing {
if !change.wins_over_hlc(existing_hlc) {
continue; // Skip, we have newer
}
}

// Apply the change
match change.kind {
ChangeKind::Insert => self.apply_insert(&change).await?,
ChangeKind::Update => self.apply_update(&change).await?,
ChangeKind::Delete => self.apply_delete(&change).await?,
}

// Update version vector
self.version_vector.write().insert(change.row_id.clone(), change.hlc_timestamp.clone());
applied += 1;
}

Ok(applied)
}

Step 8: Backend Available Everywhere

Now sa-node-1 is available on all POPs:

# Query from POP-US
curl http://pop-us:8081/api/v1/backends
# Returns: [{"id": "sa-node-1", "app": "myapp", "region": "sa", ...}]

# Query from POP-EU
curl http://pop-eu:8081/api/v1/backends
# Returns: [{"id": "sa-node-1", "app": "myapp", "region": "sa", ...}]

Configuration

Environment Variables

VariableDefaultDescription
EDGEPROXY_REPLICATION_ENABLEDfalseEnable built-in replication
EDGEPROXY_REPLICATION_NODE_IDhostnameUnique node identifier
EDGEPROXY_REPLICATION_GOSSIP_ADDR0.0.0.0:4001UDP address for gossip
EDGEPROXY_REPLICATION_TRANSPORT_ADDR0.0.0.0:4002QUIC address for data sync
EDGEPROXY_REPLICATION_BOOTSTRAP_PEERS(none)Comma-separated peer addresses
EDGEPROXY_REPLICATION_GOSSIP_INTERVAL_MS1000Gossip ping interval
EDGEPROXY_REPLICATION_SYNC_INTERVAL_MS5000Sync flush interval
EDGEPROXY_REPLICATION_CLUSTER_NAMEedgeproxyCluster name for isolation

Example: 3-POP Cluster

POP-SA (Bootstrap)

EDGEPROXY_REPLICATION_ENABLED=true
EDGEPROXY_REPLICATION_NODE_ID=pop-sa
EDGEPROXY_REPLICATION_GOSSIP_ADDR=0.0.0.0:4001
EDGEPROXY_REPLICATION_TRANSPORT_ADDR=0.0.0.0:4002
# No bootstrap peers - this is the first node

POP-US (Joins SA)

EDGEPROXY_REPLICATION_ENABLED=true
EDGEPROXY_REPLICATION_NODE_ID=pop-us
EDGEPROXY_REPLICATION_GOSSIP_ADDR=0.0.0.0:4001
EDGEPROXY_REPLICATION_TRANSPORT_ADDR=0.0.0.0:4002
EDGEPROXY_REPLICATION_BOOTSTRAP_PEERS=10.50.1.1:4001

POP-EU (Joins SA and US)

EDGEPROXY_REPLICATION_ENABLED=true
EDGEPROXY_REPLICATION_NODE_ID=pop-eu
EDGEPROXY_REPLICATION_GOSSIP_ADDR=0.0.0.0:4001
EDGEPROXY_REPLICATION_TRANSPORT_ADDR=0.0.0.0:4002
EDGEPROXY_REPLICATION_BOOTSTRAP_PEERS=10.50.1.1:4001,10.50.2.1:4001

Troubleshooting

Nodes not discovering each other

# Check if gossip port is open
nc -zvu 10.50.1.1 4001

# Verify bootstrap peers are correct
echo $EDGEPROXY_REPLICATION_BOOTSTRAP_PEERS

# Check firewall rules
sudo ufw status

Changes not propagating

# Check transport connectivity
nc -zv 10.50.1.1 4002

# Verify cluster membership (check logs)
journalctl -u edgeproxy | grep "member joined"

# Ensure sync interval is reasonable
echo $EDGEPROXY_REPLICATION_SYNC_INTERVAL_MS

HLC drift warnings

If you see HLC drift warnings, ensure NTP is running:

# Check NTP status
timedatectl status

# Install and enable NTP
sudo apt install chrony
sudo systemctl enable chronyd
sudo systemctl start chronyd

Performance Tuning

Gossip Interval

  • Lower (500ms): Faster failure detection, more network traffic
  • Higher (2000ms): Less traffic, slower detection
  • Recommendation: 1000ms for most deployments

Sync Interval

  • Lower (1000ms): Near real-time sync, higher CPU usage
  • Higher (10000ms): Batches more changes, potential lag
  • Recommendation: 5000ms for balanced performance

Network Requirements

PathProtocolPortBandwidth
GossipUDP4001~1 KB/s per node
TransportQUIC/UDP4002Varies with change rate

Security Considerations

  1. Network Isolation: Run replication ports on WireGuard overlay
  2. Firewall: Only allow trusted POPs to connect to 4001/4002
  3. TLS: Transport uses TLS 1.3 (self-signed certs for cluster)
  4. Cluster Name: Use unique cluster names to prevent cross-cluster pollution
# Firewall rules example (UFW)
sudo ufw allow from 10.50.0.0/16 to any port 4001 proto udp
sudo ufw allow from 10.50.0.0/16 to any port 4002 proto udp

Advanced Features (v0.4.0)

The following features were added in v0.4.0 to improve replication efficiency, observability, and ease of use.

Delta Sync

Instead of sending the entire row on every update, edgeProxy now supports delta sync which only transmits changed fields.

// src/replication/types.rs
pub enum FieldOp {
Set(serde_json::Value), // Field was changed
Unset, // Field was removed
}

pub struct DeltaData {
pub fields: HashMap<String, FieldOp>,
}

pub enum ChangeData {
Full(String), // Full row (backward compatible)
Delta(DeltaData), // Only changed fields
}

Benefits:

  • Reduced bandwidth for large rows with small changes
  • Faster sync for high-frequency updates
  • Backward compatible with nodes running older versions

Usage:

// Compute delta between two JSON objects
let delta = DeltaData::diff(&old_value, &new_value);

// Apply delta to reconstruct new value
let result = delta.apply_to(&old_value);
assert_eq!(result, new_value);

Merkle Tree Anti-Entropy

Merkle trees enable efficient detection and repair of divergence between nodes without comparing all data.

// src/replication/merkle.rs
pub struct MerkleTree {
table: String,
max_depth: u8,
leaves: BTreeMap<u64, Hash>,
}

impl MerkleTree {
pub fn root_hash(&mut self) -> Hash;
pub fn diff(&mut self, other: &mut MerkleTree, depth: u8) -> Vec<u64>;
}

How it works:

  1. Each node maintains a Merkle tree per table
  2. Periodically, nodes exchange root hashes
  3. If roots differ, they recursively compare subtrees
  4. Only differing key ranges need to sync

Message types:

pub enum MerkleMessage {
RootRequest { table: String },
RootResponse { table: String, hash: Hash, depth: u8 },
RangeRequest { table: String, depth: u8, prefixes: Vec<u64> },
RangeResponse { table: String, depth: u8, hashes: Vec<(u64, Hash)> },
DataRequest { table: String, prefix: u64, depth: u8 },
DataResponse { table: String, entries: Vec<(String, Vec<u8>)> },
}

mDNS Auto-Discovery

Automatic peer discovery via multicast DNS eliminates the need for manual bootstrap_peers configuration on local networks.

// src/replication/mdns.rs
pub struct MdnsDiscovery {
config: ReplicationConfig,
discovered_tx: mpsc::Sender<DiscoveredPeer>,
}

pub struct DiscoveredPeer {
pub node_id: String,
pub cluster: String,
pub gossip_addr: SocketAddr,
pub transport_addr: SocketAddr,
}

Service registration:

  • Service type: _edgeproxy._udp.local.
  • TXT records: node_id, cluster, gossip, transport

Configuration:

VariableDefaultDescription
EDGEPROXY_REPLICATION_MDNS_ENABLEDtrueEnable mDNS discovery
EDGEPROXY_REPLICATION_MDNS_SERVICE_TYPE_edgeproxy._udp.local.mDNS service type

Example: With mDNS enabled, nodes on the same LAN automatically discover each other:

# Node 1 - No bootstrap peers needed!
EDGEPROXY_REPLICATION_ENABLED=true
EDGEPROXY_REPLICATION_NODE_ID=pop-1
EDGEPROXY_REPLICATION_MDNS_ENABLED=true

# Node 2 - Automatically discovers Node 1
EDGEPROXY_REPLICATION_ENABLED=true
EDGEPROXY_REPLICATION_NODE_ID=pop-2
EDGEPROXY_REPLICATION_MDNS_ENABLED=true

Prometheus Metrics

Replication metrics are now exposed via the Prometheus endpoint for monitoring replication health.

New metrics:

MetricTypeDescription
edgeproxy_replication_lag_msGaugeTime since last successful sync
edgeproxy_replication_pending_changesGaugeChanges waiting to be broadcast
edgeproxy_replication_applied_totalCounterTotal changes applied from peers
edgeproxy_replication_broadcast_totalCounterTotal changesets broadcast
edgeproxy_replication_errors_totalCounterReplication errors
edgeproxy_replication_peers_aliveGaugeNumber of alive cluster members
edgeproxy_replication_merkle_repairs_totalCounterAnti-entropy repairs performed
edgeproxy_replication_bytes_sentCounterBytes sent for replication
edgeproxy_replication_bytes_receivedCounterBytes received for replication

Example Prometheus output:

# TYPE edgeproxy_replication_lag_ms gauge
edgeproxy_replication_lag_ms{region="sa"} 15

# TYPE edgeproxy_replication_peers_alive gauge
edgeproxy_replication_peers_alive{region="sa"} 3

# TYPE edgeproxy_replication_applied_total counter
edgeproxy_replication_applied_total{region="sa"} 1523

Grafana dashboard: Create alerts for:

  • edgeproxy_replication_lag_ms > 30000 (30s lag)
  • edgeproxy_replication_peers_alive < 2 (cluster degraded)
  • rate(edgeproxy_replication_errors_total[5m]) > 0 (errors occurring)

Read Replicas

Read replicas receive all changes but don't broadcast, enabling read scaling.

// src/replication/config.rs
pub enum ReplicaMode {
Primary, // Full read-write node
ReadReplica, // Read-only node
}

Behavior:

  • Primary: Records local changes, broadcasts to peers
  • ReadReplica: Receives changes, does NOT record or broadcast

Configuration:

VariableDefaultDescription
EDGEPROXY_REPLICATION_MODEprimaryNode mode: primary or replica

Use cases:

  • Scale read traffic across multiple nodes
  • Geographic read caching without write overhead
  • Disaster recovery standby nodes

Example:

# Primary node (writes go here)
EDGEPROXY_REPLICATION_MODE=primary
EDGEPROXY_REPLICATION_NODE_ID=primary-1

# Read replica (receives all data, no writes)
EDGEPROXY_REPLICATION_MODE=replica
EDGEPROXY_REPLICATION_NODE_ID=replica-1
EDGEPROXY_REPLICATION_BOOTSTRAP_PEERS=primary-1:4001

Source Code Reference

FilePurpose
src/replication/mod.rsModule exports
src/replication/config.rsReplicationConfig, ReplicaMode
src/replication/types.rsHlcTimestamp, NodeId, Change, ChangeSet, DeltaData, FieldOp
src/replication/gossip.rsGossipService, GossipMessage, Member
src/replication/sync.rsSyncService, change tracking
src/replication/transport.rsTransportService, QUIC peer communication
src/replication/agent.rsReplicationAgent orchestrator
src/replication/mdns.rsMdnsDiscovery, mDNS service registration
src/replication/merkle.rsMerkleTree, anti-entropy sync