NATS Setup with JWT Authorization

NATS is a high-performance messaging system. This guide covers setup and JWT-based authorization using nsc (NATS Security CLI) for secure, decentralized authentication.

Installation

 1# Install NATS server
 2# macOS
 3brew install nats-server
 4
 5# Linux
 6wget https://github.com/nats-io/nats-server/releases/latest/download/nats-server-linux-amd64.zip
 7unzip nats-server-linux-amd64.zip
 8sudo mv nats-server /usr/local/bin/
 9
10# Install nsc (NATS Security CLI)
11curl -L https://raw.githubusercontent.com/nats-io/nsc/master/install.sh | sh
12
13# Install nats CLI
14go install github.com/nats-io/natscli/nats@latest

Docker Setup

Docker Run

 1# Run NATS server
 2docker run -d \
 3  --name nats \
 4  -p 4222:4222 \
 5  -p 8222:8222 \
 6  -p 6222:6222 \
 7  nats:latest
 8
 9# With JetStream enabled
10docker run -d \
11  --name nats-js \
12  -p 4222:4222 \
13  -p 8222:8222 \
14  -v nats-data:/data \
15  nats:latest -js
16
17# Connect
18docker exec -it nats nats-cli

Docker Compose

 1version: '3.8'
 2
 3services:
 4  nats:
 5    image: nats:latest
 6    container_name: nats
 7    command: 
 8      - "-js"                    # Enable JetStream
 9      - "-m=8222"                # Monitoring port
10      - "-sd=/data"              # Store directory
11    ports:
12      - "4222:4222"              # Client connections
13      - "8222:8222"              # HTTP monitoring
14      - "6222:6222"              # Cluster routing
15    volumes:
16      - nats-data:/data
17    healthcheck:
18      test: ["CMD", "wget", "--spider", "http://localhost:8222/healthz"]
19      interval: 10s
20      timeout: 5s
21      retries: 5
22    restart: unless-stopped
23
24  # NATS with JWT auth
25  nats-auth:
26    image: nats:latest
27    container_name: nats-auth
28    command:
29      - "-c=/config/nats-server.conf"
30    ports:
31      - "4223:4222"
32      - "8223:8222"
33    volumes:
34      - ./nats-config:/config
35      - nats-auth-data:/data
36    restart: unless-stopped
37
38  # NATS Surveyor (monitoring UI)
39  nats-surveyor:
40    image: natsio/nats-surveyor:latest
41    container_name: nats-surveyor
42    command:
43      - "-s=http://nats:8222"
44    ports:
45      - "7777:7777"
46    depends_on:
47      - nats
48    restart: unless-stopped
49
50volumes:
51  nats-data:
52  nats-auth-data:

nats-server.conf (for JWT auth):

 1# NATS Server Configuration with JWT
 2
 3port: 4222
 4http_port: 8222
 5
 6# JetStream
 7jetstream {
 8    store_dir: /data
 9    max_memory_store: 1GB
10    max_file_store: 10GB
11}
12
13# JWT Authorization
14operator: /config/operator.jwt
15
16resolver: {
17    type: full
18    dir: /config/jwt
19    allow_delete: false
20    interval: "2m"
21}
22
23# System account
24system_account: SYS
25
26# Logging
27debug: false
28trace: false
29logtime: true

Docker Compose with Multiple Services

 1version: '3.8'
 2
 3services:
 4  # NATS Server
 5  nats:
 6    image: nats:latest
 7    container_name: nats
 8    command: ["-js", "-m=8222"]
 9    ports:
10      - "4222:4222"
11      - "8222:8222"
12    volumes:
13      - nats-data:/data
14    networks:
15      - app-network
16    restart: unless-stopped
17
18  # Publisher service
19  publisher:
20    build: ./publisher
21    container_name: publisher
22    environment:
23      NATS_URL: nats://nats:4222
24    depends_on:
25      - nats
26    networks:
27      - app-network
28    restart: unless-stopped
29
30  # Subscriber service
31  subscriber:
32    build: ./subscriber
33    container_name: subscriber
34    environment:
35      NATS_URL: nats://nats:4222
36    depends_on:
37      - nats
38    networks:
39      - app-network
40    restart: unless-stopped
41
42volumes:
43  nats-data:
44
45networks:
46  app-network:
47    driver: bridge

JWT Authorization Setup

Step 1: Create Operator

1# Initialize nsc
2nsc init
3
4# Create operator (top-level authority)
5nsc add operator -n MyOperator
6
7# Generate operator JWT
8nsc describe operator

Step 2: Create Account

 1# Create account (tenant/organization)
 2nsc add account -n MyAccount
 3
 4# Set account limits (optional)
 5nsc edit account MyAccount \
 6    --max-connections 1000 \
 7    --max-data 10GB \
 8    --max-exports 10 \
 9    --max-imports 10 \
10    --max-payload 1MB \
11    --max-subscriptions 1000
12
13# Describe account
14nsc describe account MyAccount

Step 3: Create Users

 1# Create user with permissions
 2nsc add user -n alice \
 3    --allow-pub "orders.>" \
 4    --allow-sub "orders.alice.>" \
 5    --allow-pub-response
 6
 7# Create user with different permissions
 8nsc add user -n bob \
 9    --allow-pub "inventory.>" \
10    --allow-sub "inventory.>" \
11    --deny-pub "inventory.delete"
12
13# Create admin user
14nsc add user -n admin \
15    --allow-pub ">" \
16    --allow-sub ">"
17
18# Generate user credentials
19nsc generate creds -a MyAccount -n alice > alice.creds
20nsc generate creds -a MyAccount -n bob > bob.creds

Step 4: Configure NATS Server

 1# nats-server.conf
 2port: 4222
 3http_port: 8222
 4
 5# JWT Authentication
 6operator: /path/to/operator.jwt
 7resolver: {
 8    type: full
 9    dir: '/path/to/jwt'
10}
11
12# System Account (for monitoring)
13system_account: SYS
14
15# Logging
16debug: false
17trace: false
18logtime: true
19log_file: "/var/log/nats/nats-server.log"
20
21# Limits
22max_connections: 10000
23max_payload: 1MB
24max_pending: 10MB

Step 5: Push Account JWT to Server

1# Push account JWT to server resolver
2nsc push -a MyAccount
3
4# Or manually copy JWT files
5cp ~/.nsc/nats/MyOperator/accounts/MyAccount/*.jwt /path/to/jwt/

Step 6: Start Server

1# Start with config
2nats-server -c nats-server.conf
3
4# Or with operator JWT directly
5nats-server --operator /path/to/operator.jwt

Client Connection Examples

Go Client

 1package main
 2
 3import (
 4    "log"
 5    "github.com/nats-io/nats.go"
 6)
 7
 8func main() {
 9    // Connect with credentials
10    nc, err := nats.Connect("nats://localhost:4222",
11        nats.UserCredentials("alice.creds"),
12    )
13    if err != nil {
14        log.Fatal(err)
15    }
16    defer nc.Close()
17
18    // Publish
19    err = nc.Publish("orders.new", []byte("Order #123"))
20    if err != nil {
21        log.Fatal(err)
22    }
23
24    // Subscribe
25    sub, err := nc.Subscribe("orders.alice.>", func(m *nats.Msg) {
26        log.Printf("Received: %s", string(m.Data))
27    })
28    if err != nil {
29        log.Fatal(err)
30    }
31    defer sub.Unsubscribe()
32
33    // Keep alive
34    select {}
35}

Python Client

 1import asyncio
 2from nats.aio.client import Client as NATS
 3
 4async def main():
 5    nc = NATS()
 6    
 7    # Connect with credentials
 8    await nc.connect(
 9        servers=["nats://localhost:4222"],
10        user_credentials="alice.creds"
11    )
12    
13    # Subscribe
14    async def message_handler(msg):
15        print(f"Received: {msg.data.decode()}")
16    
17    await nc.subscribe("orders.alice.>", cb=message_handler)
18    
19    # Publish
20    await nc.publish("orders.new", b"Order #123")
21    
22    # Keep alive
23    await asyncio.sleep(60)
24    await nc.close()
25
26if __name__ == '__main__':
27    asyncio.run(main())

CLI

1# Subscribe
2nats sub --creds=alice.creds "orders.alice.>"
3
4# Publish
5nats pub --creds=alice.creds "orders.new" "Order #123"
6
7# Request-Reply
8nats req --creds=alice.creds "orders.status" "123"

Permission Patterns

Subject-Based Permissions

 1# Allow all under namespace
 2--allow-pub "orders.>"
 3--allow-sub "orders.>"
 4
 5# Specific subjects only
 6--allow-pub "orders.create"
 7--allow-pub "orders.update"
 8
 9# Deny specific subjects
10--deny-pub "orders.delete"
11
12# Wildcards
13--allow-sub "orders.*.status"  # Single token wildcard
14--allow-sub "orders.>"          # Multi-token wildcard

Response Permissions

1# Allow publishing responses (for request-reply)
2--allow-pub-response
3
4# Limit response time
5--response-ttl 5s

Time-Based Permissions

1# User expires after 30 days
2--expiry 30d
3
4# Bearer token (one-time use)
5--bearer

Account Limits

 1# Connection limits
 2--max-connections 100
 3
 4# Data limits
 5--max-data 1GB
 6--max-payload 1MB
 7
 8# Subscription limits
 9--max-subscriptions 1000
10
11# Export/Import limits (for account-to-account communication)
12--max-exports 10
13--max-imports 10

Monitoring

Server Stats

 1# HTTP monitoring endpoint
 2curl http://localhost:8222/varz
 3
 4# Connection stats
 5curl http://localhost:8222/connz
 6
 7# Subscription stats
 8curl http://localhost:8222/subsz
 9
10# Account stats
11curl http://localhost:8222/accountz

System Account

1# Create system account
2nsc add account -n SYS
3nsc add user -n sys --account SYS
4
5# Subscribe to system events
6nats sub --creds=sys.creds '$SYS.>'
7
8# Account stats
9nats req --creds=sys.creds '$SYS.REQ.ACCOUNT.<account-id>.CONNZ' ''

Common Patterns

Request-Reply

 1// Server
 2nc.Subscribe("orders.status", func(m *nats.Msg) {
 3    status := getOrderStatus(string(m.Data))
 4    m.Respond([]byte(status))
 5})
 6
 7// Client
 8msg, err := nc.Request("orders.status", []byte("123"), 2*time.Second)
 9if err != nil {
10    log.Fatal(err)
11}
12fmt.Printf("Status: %s\n", msg.Data)

Queue Groups (Load Balancing)

1// Multiple workers in same queue group
2nc.QueueSubscribe("orders.process", "workers", func(m *nats.Msg) {
3    // Only one worker receives each message
4    processOrder(m.Data)
5})

JetStream (Persistence)

 1// Enable JetStream
 2js, err := nc.JetStream()
 3
 4// Create stream
 5js.AddStream(&nats.StreamConfig{
 6    Name:     "ORDERS",
 7    Subjects: []string{"orders.>"},
 8    Storage:  nats.FileStorage,
 9})
10
11// Publish to stream
12js.Publish("orders.new", []byte("Order #123"))
13
14// Durable consumer
15js.Subscribe("orders.>", func(m *nats.Msg) {
16    m.Ack()
17}, nats.Durable("order-processor"))

Security Best Practices

  1. Least Privilege: Grant minimum necessary permissions
  2. Credential Rotation: Regularly rotate user credentials
  3. Expiry: Set expiration on user JWTs
  4. Monitoring: Monitor for unauthorized access attempts
  5. TLS: Enable TLS for production
  6. Separate Accounts: Use different accounts for different services

TLS Configuration

1# nats-server.conf with TLS
2tls {
3    cert_file: "/path/to/server-cert.pem"
4    key_file: "/path/to/server-key.pem"
5    ca_file: "/path/to/ca.pem"
6    verify: true
7}

Notes

  • JWT auth is decentralized - server doesn't need central auth service
  • Credentials files contain both JWT and seed (private key)
  • Operator signs accounts, accounts sign users
  • Permissions are enforced at server level
  • Use JetStream for guaranteed delivery

Gotchas/Warnings

  • ⚠️ Credentials security: Protect .creds files - they contain private keys
  • ⚠️ Subject design: Plan subject namespace carefully
  • ⚠️ Wildcard permissions: Be careful with > wildcard
  • ⚠️ Account limits: Set appropriate limits to prevent abuse

Custom Resolver Implementation

Memory Resolver (Go)

  1package main
  2
  3import (
  4    "encoding/json"
  5    "fmt"
  6    "io/ioutil"
  7    "sync"
  8    
  9    "github.com/nats-io/jwt/v2"
 10    "github.com/nats-io/nats-server/v2/server"
 11)
 12
 13// MemoryResolver stores account JWTs in memory
 14type MemoryResolver struct {
 15    mu       sync.RWMutex
 16    accounts map[string]string // account public key -> JWT
 17}
 18
 19func NewMemoryResolver() *MemoryResolver {
 20    return &MemoryResolver{
 21        accounts: make(map[string]string),
 22    }
 23}
 24
 25// Fetch implements the AccountResolver interface
 26func (mr *MemoryResolver) Fetch(name string) (string, error) {
 27    mr.mu.RLock()
 28    defer mr.mu.RUnlock()
 29    
 30    jwt, ok := mr.accounts[name]
 31    if !ok {
 32        return "", fmt.Errorf("account not found: %s", name)
 33    }
 34    return jwt, nil
 35}
 36
 37// Store adds an account JWT to the resolver
 38func (mr *MemoryResolver) Store(name, jwt string) error {
 39    mr.mu.Lock()
 40    defer mr.mu.Unlock()
 41    
 42    mr.accounts[name] = jwt
 43    return nil
 44}
 45
 46// Delete removes an account JWT
 47func (mr *MemoryResolver) Delete(name string) error {
 48    mr.mu.Lock()
 49    defer mr.mu.Unlock()
 50    
 51    delete(mr.accounts, name)
 52    return nil
 53}
 54
 55// List returns all account public keys
 56func (mr *MemoryResolver) List() []string {
 57    mr.mu.RLock()
 58    defer mr.mu.RUnlock()
 59    
 60    keys := make([]string, 0, len(mr.accounts))
 61    for k := range mr.accounts {
 62        keys = append(keys, k)
 63    }
 64    return keys
 65}
 66
 67// Usage with NATS Server
 68func main() {
 69    // Create custom resolver
 70    resolver := NewMemoryResolver()
 71    
 72    // Load account JWTs from files
 73    accountJWT, _ := ioutil.ReadFile("account.jwt")
 74    
 75    // Parse JWT to get public key
 76    claim, _ := jwt.DecodeAccountClaims(string(accountJWT))
 77    
 78    // Store in resolver
 79    resolver.Store(claim.Subject, string(accountJWT))
 80    
 81    // Configure NATS server with custom resolver
 82    opts := &server.Options{
 83        Port:     4222,
 84        HTTPPort: 8222,
 85    }
 86    
 87    // Set operator JWT
 88    operatorJWT, _ := ioutil.ReadFile("operator.jwt")
 89    opts.TrustedOperators = []*jwt.OperatorClaims{
 90        // Parse operator JWT
 91    }
 92    
 93    // Set custom resolver
 94    opts.AccountResolver = resolver
 95    
 96    // Start server
 97    s, err := server.NewServer(opts)
 98    if err != nil {
 99        panic(err)
100    }
101    
102    s.Start()
103    s.WaitForShutdown()
104}

HTTP Resolver (Go)

 1package main
 2
 3import (
 4    "fmt"
 5    "io/ioutil"
 6    "net/http"
 7    "time"
 8)
 9
10// HTTPResolver fetches account JWTs from HTTP endpoint
11type HTTPResolver struct {
12    baseURL string
13    client  *http.Client
14}
15
16func NewHTTPResolver(baseURL string) *HTTPResolver {
17    return &HTTPResolver{
18        baseURL: baseURL,
19        client: &http.Client{
20            Timeout: 5 * time.Second,
21        },
22    }
23}
24
25// Fetch implements the AccountResolver interface
26func (hr *HTTPResolver) Fetch(accountID string) (string, error) {
27    url := fmt.Sprintf("%s/accounts/%s", hr.baseURL, accountID)
28    
29    resp, err := hr.client.Get(url)
30    if err != nil {
31        return "", err
32    }
33    defer resp.Body.Close()
34    
35    if resp.StatusCode != http.StatusOK {
36        return "", fmt.Errorf("account not found: %s", accountID)
37    }
38    
39    jwt, err := ioutil.ReadAll(resp.Body)
40    if err != nil {
41        return "", err
42    }
43    
44    return string(jwt), nil
45}
46
47// HTTP Server for serving JWTs
48type JWTServer struct {
49    accounts map[string]string
50}
51
52func (s *JWTServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
53    // Extract account ID from URL
54    accountID := r.URL.Path[len("/accounts/"):]
55    
56    jwt, ok := s.accounts[accountID]
57    if !ok {
58        http.NotFound(w, r)
59        return
60    }
61    
62    w.Header().Set("Content-Type", "application/jwt")
63    w.Write([]byte(jwt))
64}
65
66// Start JWT server
67func startJWTServer() {
68    server := &JWTServer{
69        accounts: make(map[string]string),
70    }
71    
72    // Load JWTs from files
73    // server.accounts[accountID] = jwtString
74    
75    http.ListenAndServe(":8080", server)
76}

Database Resolver (Go with PostgreSQL)

 1package main
 2
 3import (
 4    "database/sql"
 5    "fmt"
 6    
 7    _ "github.com/lib/pq"
 8)
 9
10// DBResolver fetches account JWTs from database
11type DBResolver struct {
12    db *sql.DB
13}
14
15func NewDBResolver(connStr string) (*DBResolver, error) {
16    db, err := sql.Open("postgres", connStr)
17    if err != nil {
18        return nil, err
19    }
20    
21    // Create table if not exists
22    _, err = db.Exec(`
23        CREATE TABLE IF NOT EXISTS account_jwts (
24            account_id VARCHAR(255) PRIMARY KEY,
25            jwt TEXT NOT NULL,
26            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
27            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
28        )
29    `)
30    if err != nil {
31        return nil, err
32    }
33    
34    return &DBResolver{db: db}, nil
35}
36
37// Fetch implements the AccountResolver interface
38func (dr *DBResolver) Fetch(accountID string) (string, error) {
39    var jwt string
40    err := dr.db.QueryRow(
41        "SELECT jwt FROM account_jwts WHERE account_id = $1",
42        accountID,
43    ).Scan(&jwt)
44    
45    if err == sql.ErrNoRows {
46        return "", fmt.Errorf("account not found: %s", accountID)
47    }
48    if err != nil {
49        return "", err
50    }
51    
52    return jwt, nil
53}
54
55// Store adds or updates an account JWT
56func (dr *DBResolver) Store(accountID, jwt string) error {
57    _, err := dr.db.Exec(`
58        INSERT INTO account_jwts (account_id, jwt)
59        VALUES ($1, $2)
60        ON CONFLICT (account_id)
61        DO UPDATE SET jwt = $2, updated_at = CURRENT_TIMESTAMP
62    `, accountID, jwt)
63    
64    return err
65}
66
67// Delete removes an account JWT
68func (dr *DBResolver) Delete(accountID string) error {
69    _, err := dr.db.Exec(
70        "DELETE FROM account_jwts WHERE account_id = $1",
71        accountID,
72    )
73    return err
74}
75
76// List returns all account IDs
77func (dr *DBResolver) List() ([]string, error) {
78    rows, err := dr.db.Query("SELECT account_id FROM account_jwts")
79    if err != nil {
80        return nil, err
81    }
82    defer rows.Close()
83    
84    var accounts []string
85    for rows.Next() {
86        var accountID string
87        if err := rows.Scan(&accountID); err != nil {
88            return nil, err
89        }
90        accounts = append(accounts, accountID)
91    }
92    
93    return accounts, nil
94}

Cached Resolver (Go)

 1package main
 2
 3import (
 4    "sync"
 5    "time"
 6)
 7
 8// CachedResolver wraps another resolver with caching
 9type CachedResolver struct {
10    underlying AccountResolver
11    cache      map[string]*cacheEntry
12    mu         sync.RWMutex
13    ttl        time.Duration
14}
15
16type cacheEntry struct {
17    jwt       string
18    expiresAt time.Time
19}
20
21func NewCachedResolver(underlying AccountResolver, ttl time.Duration) *CachedResolver {
22    cr := &CachedResolver{
23        underlying: underlying,
24        cache:      make(map[string]*cacheEntry),
25        ttl:        ttl,
26    }
27    
28    // Start cleanup goroutine
29    go cr.cleanup()
30    
31    return cr
32}
33
34// Fetch implements the AccountResolver interface with caching
35func (cr *CachedResolver) Fetch(accountID string) (string, error) {
36    // Check cache first
37    cr.mu.RLock()
38    entry, ok := cr.cache[accountID]
39    cr.mu.RUnlock()
40    
41    if ok && time.Now().Before(entry.expiresAt) {
42        return entry.jwt, nil
43    }
44    
45    // Cache miss or expired, fetch from underlying resolver
46    jwt, err := cr.underlying.Fetch(accountID)
47    if err != nil {
48        return "", err
49    }
50    
51    // Update cache
52    cr.mu.Lock()
53    cr.cache[accountID] = &cacheEntry{
54        jwt:       jwt,
55        expiresAt: time.Now().Add(cr.ttl),
56    }
57    cr.mu.Unlock()
58    
59    return jwt, nil
60}
61
62// Invalidate removes an entry from cache
63func (cr *CachedResolver) Invalidate(accountID string) {
64    cr.mu.Lock()
65    delete(cr.cache, accountID)
66    cr.mu.Unlock()
67}
68
69// cleanup removes expired entries periodically
70func (cr *CachedResolver) cleanup() {
71    ticker := time.NewTicker(cr.ttl / 2)
72    defer ticker.Stop()
73    
74    for range ticker.C {
75        cr.mu.Lock()
76        now := time.Now()
77        for key, entry := range cr.cache {
78            if now.After(entry.expiresAt) {
79                delete(cr.cache, key)
80            }
81        }
82        cr.mu.Unlock()
83    }
84}

NATS Server Configuration with Custom Resolver

 1# nats-server.conf with custom resolver
 2
 3port: 4222
 4http_port: 8222
 5
 6# Operator JWT
 7operator: /path/to/operator.jwt
 8
 9# Custom resolver (URL-based)
10resolver: URL(http://localhost:8080/accounts/)
11
12# Or use NATS-based resolver
13# resolver: NATS(nats://resolver-server:4222)
14
15# System account
16system_account: SYS
17
18# TLS
19tls {
20    cert_file: "/path/to/server-cert.pem"
21    key_file: "/path/to/server-key.pem"
22}
23
24# Logging
25debug: false
26trace: false
27logtime: true

Testing Custom Resolver

 1package main
 2
 3import (
 4    "testing"
 5    "time"
 6)
 7
 8func TestMemoryResolver(t *testing.T) {
 9    resolver := NewMemoryResolver()
10    
11    // Test Store and Fetch
12    accountID := "ACCOUNT123"
13    jwt := "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
14    
15    err := resolver.Store(accountID, jwt)
16    if err != nil {
17        t.Fatalf("Store failed: %v", err)
18    }
19    
20    fetched, err := resolver.Fetch(accountID)
21    if err != nil {
22        t.Fatalf("Fetch failed: %v", err)
23    }
24    
25    if fetched != jwt {
26        t.Errorf("Expected %s, got %s", jwt, fetched)
27    }
28    
29    // Test Delete
30    err = resolver.Delete(accountID)
31    if err != nil {
32        t.Fatalf("Delete failed: %v", err)
33    }
34    
35    _, err = resolver.Fetch(accountID)
36    if err == nil {
37        t.Error("Expected error after delete")
38    }
39}
40
41func TestCachedResolver(t *testing.T) {
42    underlying := NewMemoryResolver()
43    underlying.Store("ACC1", "jwt1")
44    
45    cached := NewCachedResolver(underlying, 1*time.Second)
46    
47    // First fetch - cache miss
48    jwt1, _ := cached.Fetch("ACC1")
49    
50    // Second fetch - cache hit
51    jwt2, _ := cached.Fetch("ACC1")
52    
53    if jwt1 != jwt2 {
54        t.Error("Cache not working")
55    }
56    
57    // Wait for expiry
58    time.Sleep(2 * time.Second)
59    
60    // Should fetch from underlying again
61    jwt3, _ := cached.Fetch("ACC1")
62    if jwt3 != jwt1 {
63        t.Error("Expired cache not refreshed")
64    }
65}
comments powered by Disqus