NATS Setup with JWT Authorization
NATS is a high-performance messaging system. This guide covers setup and JWT-based authorization using nsc (NATS Security CLI) for secure, decentralized authentication.
Installation
1# Install NATS server
2# macOS
3brew install nats-server
4
5# Linux
6wget https://github.com/nats-io/nats-server/releases/latest/download/nats-server-linux-amd64.zip
7unzip nats-server-linux-amd64.zip
8sudo mv nats-server /usr/local/bin/
9
10# Install nsc (NATS Security CLI)
11curl -L https://raw.githubusercontent.com/nats-io/nsc/master/install.sh | sh
12
13# Install nats CLI
14go install github.com/nats-io/natscli/nats@latest
Docker Setup
Docker Run
1# Run NATS server
2docker run -d \
3 --name nats \
4 -p 4222:4222 \
5 -p 8222:8222 \
6 -p 6222:6222 \
7 nats:latest
8
9# With JetStream enabled
10docker run -d \
11 --name nats-js \
12 -p 4222:4222 \
13 -p 8222:8222 \
14 -v nats-data:/data \
15 nats:latest -js
16
17# Connect
18docker exec -it nats nats-cli
Docker Compose
1version: '3.8'
2
3services:
4 nats:
5 image: nats:latest
6 container_name: nats
7 command:
8 - "-js" # Enable JetStream
9 - "-m=8222" # Monitoring port
10 - "-sd=/data" # Store directory
11 ports:
12 - "4222:4222" # Client connections
13 - "8222:8222" # HTTP monitoring
14 - "6222:6222" # Cluster routing
15 volumes:
16 - nats-data:/data
17 healthcheck:
18 test: ["CMD", "wget", "--spider", "http://localhost:8222/healthz"]
19 interval: 10s
20 timeout: 5s
21 retries: 5
22 restart: unless-stopped
23
24 # NATS with JWT auth
25 nats-auth:
26 image: nats:latest
27 container_name: nats-auth
28 command:
29 - "-c=/config/nats-server.conf"
30 ports:
31 - "4223:4222"
32 - "8223:8222"
33 volumes:
34 - ./nats-config:/config
35 - nats-auth-data:/data
36 restart: unless-stopped
37
38 # NATS Surveyor (monitoring UI)
39 nats-surveyor:
40 image: natsio/nats-surveyor:latest
41 container_name: nats-surveyor
42 command:
43 - "-s=http://nats:8222"
44 ports:
45 - "7777:7777"
46 depends_on:
47 - nats
48 restart: unless-stopped
49
50volumes:
51 nats-data:
52 nats-auth-data:
nats-server.conf (for JWT auth):
1# NATS Server Configuration with JWT
2
3port: 4222
4http_port: 8222
5
6# JetStream
7jetstream {
8 store_dir: /data
9 max_memory_store: 1GB
10 max_file_store: 10GB
11}
12
13# JWT Authorization
14operator: /config/operator.jwt
15
16resolver: {
17 type: full
18 dir: /config/jwt
19 allow_delete: false
20 interval: "2m"
21}
22
23# System account
24system_account: SYS
25
26# Logging
27debug: false
28trace: false
29logtime: true
Docker Compose with Multiple Services
1version: '3.8'
2
3services:
4 # NATS Server
5 nats:
6 image: nats:latest
7 container_name: nats
8 command: ["-js", "-m=8222"]
9 ports:
10 - "4222:4222"
11 - "8222:8222"
12 volumes:
13 - nats-data:/data
14 networks:
15 - app-network
16 restart: unless-stopped
17
18 # Publisher service
19 publisher:
20 build: ./publisher
21 container_name: publisher
22 environment:
23 NATS_URL: nats://nats:4222
24 depends_on:
25 - nats
26 networks:
27 - app-network
28 restart: unless-stopped
29
30 # Subscriber service
31 subscriber:
32 build: ./subscriber
33 container_name: subscriber
34 environment:
35 NATS_URL: nats://nats:4222
36 depends_on:
37 - nats
38 networks:
39 - app-network
40 restart: unless-stopped
41
42volumes:
43 nats-data:
44
45networks:
46 app-network:
47 driver: bridge
JWT Authorization Setup
Step 1: Create Operator
1# Initialize nsc
2nsc init
3
4# Create operator (top-level authority)
5nsc add operator -n MyOperator
6
7# Generate operator JWT
8nsc describe operator
Step 2: Create Account
1# Create account (tenant/organization)
2nsc add account -n MyAccount
3
4# Set account limits (optional)
5nsc edit account MyAccount \
6 --max-connections 1000 \
7 --max-data 10GB \
8 --max-exports 10 \
9 --max-imports 10 \
10 --max-payload 1MB \
11 --max-subscriptions 1000
12
13# Describe account
14nsc describe account MyAccount
Step 3: Create Users
1# Create user with permissions
2nsc add user -n alice \
3 --allow-pub "orders.>" \
4 --allow-sub "orders.alice.>" \
5 --allow-pub-response
6
7# Create user with different permissions
8nsc add user -n bob \
9 --allow-pub "inventory.>" \
10 --allow-sub "inventory.>" \
11 --deny-pub "inventory.delete"
12
13# Create admin user
14nsc add user -n admin \
15 --allow-pub ">" \
16 --allow-sub ">"
17
18# Generate user credentials
19nsc generate creds -a MyAccount -n alice > alice.creds
20nsc generate creds -a MyAccount -n bob > bob.creds
Step 4: Configure NATS Server
1# nats-server.conf
2port: 4222
3http_port: 8222
4
5# JWT Authentication
6operator: /path/to/operator.jwt
7resolver: {
8 type: full
9 dir: '/path/to/jwt'
10}
11
12# System Account (for monitoring)
13system_account: SYS
14
15# Logging
16debug: false
17trace: false
18logtime: true
19log_file: "/var/log/nats/nats-server.log"
20
21# Limits
22max_connections: 10000
23max_payload: 1MB
24max_pending: 10MB
Step 5: Push Account JWT to Server
1# Push account JWT to server resolver
2nsc push -a MyAccount
3
4# Or manually copy JWT files
5cp ~/.nsc/nats/MyOperator/accounts/MyAccount/*.jwt /path/to/jwt/
Step 6: Start Server
1# Start with config
2nats-server -c nats-server.conf
3
4# Or with operator JWT directly
5nats-server --operator /path/to/operator.jwt
Client Connection Examples
Go Client
1package main
2
3import (
4 "log"
5 "github.com/nats-io/nats.go"
6)
7
8func main() {
9 // Connect with credentials
10 nc, err := nats.Connect("nats://localhost:4222",
11 nats.UserCredentials("alice.creds"),
12 )
13 if err != nil {
14 log.Fatal(err)
15 }
16 defer nc.Close()
17
18 // Publish
19 err = nc.Publish("orders.new", []byte("Order #123"))
20 if err != nil {
21 log.Fatal(err)
22 }
23
24 // Subscribe
25 sub, err := nc.Subscribe("orders.alice.>", func(m *nats.Msg) {
26 log.Printf("Received: %s", string(m.Data))
27 })
28 if err != nil {
29 log.Fatal(err)
30 }
31 defer sub.Unsubscribe()
32
33 // Keep alive
34 select {}
35}
Python Client
1import asyncio
2from nats.aio.client import Client as NATS
3
4async def main():
5 nc = NATS()
6
7 # Connect with credentials
8 await nc.connect(
9 servers=["nats://localhost:4222"],
10 user_credentials="alice.creds"
11 )
12
13 # Subscribe
14 async def message_handler(msg):
15 print(f"Received: {msg.data.decode()}")
16
17 await nc.subscribe("orders.alice.>", cb=message_handler)
18
19 # Publish
20 await nc.publish("orders.new", b"Order #123")
21
22 # Keep alive
23 await asyncio.sleep(60)
24 await nc.close()
25
26if __name__ == '__main__':
27 asyncio.run(main())
CLI
1# Subscribe
2nats sub --creds=alice.creds "orders.alice.>"
3
4# Publish
5nats pub --creds=alice.creds "orders.new" "Order #123"
6
7# Request-Reply
8nats req --creds=alice.creds "orders.status" "123"
Permission Patterns
Subject-Based Permissions
1# Allow all under namespace
2--allow-pub "orders.>"
3--allow-sub "orders.>"
4
5# Specific subjects only
6--allow-pub "orders.create"
7--allow-pub "orders.update"
8
9# Deny specific subjects
10--deny-pub "orders.delete"
11
12# Wildcards
13--allow-sub "orders.*.status" # Single token wildcard
14--allow-sub "orders.>" # Multi-token wildcard
Response Permissions
1# Allow publishing responses (for request-reply)
2--allow-pub-response
3
4# Limit response time
5--response-ttl 5s
Time-Based Permissions
1# User expires after 30 days
2--expiry 30d
3
4# Bearer token (one-time use)
5--bearer
Account Limits
1# Connection limits
2--max-connections 100
3
4# Data limits
5--max-data 1GB
6--max-payload 1MB
7
8# Subscription limits
9--max-subscriptions 1000
10
11# Export/Import limits (for account-to-account communication)
12--max-exports 10
13--max-imports 10
Monitoring
Server Stats
1# HTTP monitoring endpoint
2curl http://localhost:8222/varz
3
4# Connection stats
5curl http://localhost:8222/connz
6
7# Subscription stats
8curl http://localhost:8222/subsz
9
10# Account stats
11curl http://localhost:8222/accountz
System Account
1# Create system account
2nsc add account -n SYS
3nsc add user -n sys --account SYS
4
5# Subscribe to system events
6nats sub --creds=sys.creds '$SYS.>'
7
8# Account stats
9nats req --creds=sys.creds '$SYS.REQ.ACCOUNT.<account-id>.CONNZ' ''
Common Patterns
Request-Reply
1// Server
2nc.Subscribe("orders.status", func(m *nats.Msg) {
3 status := getOrderStatus(string(m.Data))
4 m.Respond([]byte(status))
5})
6
7// Client
8msg, err := nc.Request("orders.status", []byte("123"), 2*time.Second)
9if err != nil {
10 log.Fatal(err)
11}
12fmt.Printf("Status: %s\n", msg.Data)
Queue Groups (Load Balancing)
1// Multiple workers in same queue group
2nc.QueueSubscribe("orders.process", "workers", func(m *nats.Msg) {
3 // Only one worker receives each message
4 processOrder(m.Data)
5})
JetStream (Persistence)
1// Enable JetStream
2js, err := nc.JetStream()
3
4// Create stream
5js.AddStream(&nats.StreamConfig{
6 Name: "ORDERS",
7 Subjects: []string{"orders.>"},
8 Storage: nats.FileStorage,
9})
10
11// Publish to stream
12js.Publish("orders.new", []byte("Order #123"))
13
14// Durable consumer
15js.Subscribe("orders.>", func(m *nats.Msg) {
16 m.Ack()
17}, nats.Durable("order-processor"))
Security Best Practices
- Least Privilege: Grant minimum necessary permissions
- Credential Rotation: Regularly rotate user credentials
- Expiry: Set expiration on user JWTs
- Monitoring: Monitor for unauthorized access attempts
- TLS: Enable TLS for production
- Separate Accounts: Use different accounts for different services
TLS Configuration
1# nats-server.conf with TLS
2tls {
3 cert_file: "/path/to/server-cert.pem"
4 key_file: "/path/to/server-key.pem"
5 ca_file: "/path/to/ca.pem"
6 verify: true
7}
Notes
- JWT auth is decentralized - server doesn't need central auth service
- Credentials files contain both JWT and seed (private key)
- Operator signs accounts, accounts sign users
- Permissions are enforced at server level
- Use JetStream for guaranteed delivery
Gotchas/Warnings
- ⚠️ Credentials security: Protect .creds files - they contain private keys
- ⚠️ Subject design: Plan subject namespace carefully
- ⚠️ Wildcard permissions: Be careful with
>wildcard - ⚠️ Account limits: Set appropriate limits to prevent abuse
Custom Resolver Implementation
Memory Resolver (Go)
1package main
2
3import (
4 "encoding/json"
5 "fmt"
6 "io/ioutil"
7 "sync"
8
9 "github.com/nats-io/jwt/v2"
10 "github.com/nats-io/nats-server/v2/server"
11)
12
13// MemoryResolver stores account JWTs in memory
14type MemoryResolver struct {
15 mu sync.RWMutex
16 accounts map[string]string // account public key -> JWT
17}
18
19func NewMemoryResolver() *MemoryResolver {
20 return &MemoryResolver{
21 accounts: make(map[string]string),
22 }
23}
24
25// Fetch implements the AccountResolver interface
26func (mr *MemoryResolver) Fetch(name string) (string, error) {
27 mr.mu.RLock()
28 defer mr.mu.RUnlock()
29
30 jwt, ok := mr.accounts[name]
31 if !ok {
32 return "", fmt.Errorf("account not found: %s", name)
33 }
34 return jwt, nil
35}
36
37// Store adds an account JWT to the resolver
38func (mr *MemoryResolver) Store(name, jwt string) error {
39 mr.mu.Lock()
40 defer mr.mu.Unlock()
41
42 mr.accounts[name] = jwt
43 return nil
44}
45
46// Delete removes an account JWT
47func (mr *MemoryResolver) Delete(name string) error {
48 mr.mu.Lock()
49 defer mr.mu.Unlock()
50
51 delete(mr.accounts, name)
52 return nil
53}
54
55// List returns all account public keys
56func (mr *MemoryResolver) List() []string {
57 mr.mu.RLock()
58 defer mr.mu.RUnlock()
59
60 keys := make([]string, 0, len(mr.accounts))
61 for k := range mr.accounts {
62 keys = append(keys, k)
63 }
64 return keys
65}
66
67// Usage with NATS Server
68func main() {
69 // Create custom resolver
70 resolver := NewMemoryResolver()
71
72 // Load account JWTs from files
73 accountJWT, _ := ioutil.ReadFile("account.jwt")
74
75 // Parse JWT to get public key
76 claim, _ := jwt.DecodeAccountClaims(string(accountJWT))
77
78 // Store in resolver
79 resolver.Store(claim.Subject, string(accountJWT))
80
81 // Configure NATS server with custom resolver
82 opts := &server.Options{
83 Port: 4222,
84 HTTPPort: 8222,
85 }
86
87 // Set operator JWT
88 operatorJWT, _ := ioutil.ReadFile("operator.jwt")
89 opts.TrustedOperators = []*jwt.OperatorClaims{
90 // Parse operator JWT
91 }
92
93 // Set custom resolver
94 opts.AccountResolver = resolver
95
96 // Start server
97 s, err := server.NewServer(opts)
98 if err != nil {
99 panic(err)
100 }
101
102 s.Start()
103 s.WaitForShutdown()
104}
HTTP Resolver (Go)
1package main
2
3import (
4 "fmt"
5 "io/ioutil"
6 "net/http"
7 "time"
8)
9
10// HTTPResolver fetches account JWTs from HTTP endpoint
11type HTTPResolver struct {
12 baseURL string
13 client *http.Client
14}
15
16func NewHTTPResolver(baseURL string) *HTTPResolver {
17 return &HTTPResolver{
18 baseURL: baseURL,
19 client: &http.Client{
20 Timeout: 5 * time.Second,
21 },
22 }
23}
24
25// Fetch implements the AccountResolver interface
26func (hr *HTTPResolver) Fetch(accountID string) (string, error) {
27 url := fmt.Sprintf("%s/accounts/%s", hr.baseURL, accountID)
28
29 resp, err := hr.client.Get(url)
30 if err != nil {
31 return "", err
32 }
33 defer resp.Body.Close()
34
35 if resp.StatusCode != http.StatusOK {
36 return "", fmt.Errorf("account not found: %s", accountID)
37 }
38
39 jwt, err := ioutil.ReadAll(resp.Body)
40 if err != nil {
41 return "", err
42 }
43
44 return string(jwt), nil
45}
46
47// HTTP Server for serving JWTs
48type JWTServer struct {
49 accounts map[string]string
50}
51
52func (s *JWTServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
53 // Extract account ID from URL
54 accountID := r.URL.Path[len("/accounts/"):]
55
56 jwt, ok := s.accounts[accountID]
57 if !ok {
58 http.NotFound(w, r)
59 return
60 }
61
62 w.Header().Set("Content-Type", "application/jwt")
63 w.Write([]byte(jwt))
64}
65
66// Start JWT server
67func startJWTServer() {
68 server := &JWTServer{
69 accounts: make(map[string]string),
70 }
71
72 // Load JWTs from files
73 // server.accounts[accountID] = jwtString
74
75 http.ListenAndServe(":8080", server)
76}
Database Resolver (Go with PostgreSQL)
1package main
2
3import (
4 "database/sql"
5 "fmt"
6
7 _ "github.com/lib/pq"
8)
9
10// DBResolver fetches account JWTs from database
11type DBResolver struct {
12 db *sql.DB
13}
14
15func NewDBResolver(connStr string) (*DBResolver, error) {
16 db, err := sql.Open("postgres", connStr)
17 if err != nil {
18 return nil, err
19 }
20
21 // Create table if not exists
22 _, err = db.Exec(`
23 CREATE TABLE IF NOT EXISTS account_jwts (
24 account_id VARCHAR(255) PRIMARY KEY,
25 jwt TEXT NOT NULL,
26 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
27 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
28 )
29 `)
30 if err != nil {
31 return nil, err
32 }
33
34 return &DBResolver{db: db}, nil
35}
36
37// Fetch implements the AccountResolver interface
38func (dr *DBResolver) Fetch(accountID string) (string, error) {
39 var jwt string
40 err := dr.db.QueryRow(
41 "SELECT jwt FROM account_jwts WHERE account_id = $1",
42 accountID,
43 ).Scan(&jwt)
44
45 if err == sql.ErrNoRows {
46 return "", fmt.Errorf("account not found: %s", accountID)
47 }
48 if err != nil {
49 return "", err
50 }
51
52 return jwt, nil
53}
54
55// Store adds or updates an account JWT
56func (dr *DBResolver) Store(accountID, jwt string) error {
57 _, err := dr.db.Exec(`
58 INSERT INTO account_jwts (account_id, jwt)
59 VALUES ($1, $2)
60 ON CONFLICT (account_id)
61 DO UPDATE SET jwt = $2, updated_at = CURRENT_TIMESTAMP
62 `, accountID, jwt)
63
64 return err
65}
66
67// Delete removes an account JWT
68func (dr *DBResolver) Delete(accountID string) error {
69 _, err := dr.db.Exec(
70 "DELETE FROM account_jwts WHERE account_id = $1",
71 accountID,
72 )
73 return err
74}
75
76// List returns all account IDs
77func (dr *DBResolver) List() ([]string, error) {
78 rows, err := dr.db.Query("SELECT account_id FROM account_jwts")
79 if err != nil {
80 return nil, err
81 }
82 defer rows.Close()
83
84 var accounts []string
85 for rows.Next() {
86 var accountID string
87 if err := rows.Scan(&accountID); err != nil {
88 return nil, err
89 }
90 accounts = append(accounts, accountID)
91 }
92
93 return accounts, nil
94}
Cached Resolver (Go)
1package main
2
3import (
4 "sync"
5 "time"
6)
7
8// CachedResolver wraps another resolver with caching
9type CachedResolver struct {
10 underlying AccountResolver
11 cache map[string]*cacheEntry
12 mu sync.RWMutex
13 ttl time.Duration
14}
15
16type cacheEntry struct {
17 jwt string
18 expiresAt time.Time
19}
20
21func NewCachedResolver(underlying AccountResolver, ttl time.Duration) *CachedResolver {
22 cr := &CachedResolver{
23 underlying: underlying,
24 cache: make(map[string]*cacheEntry),
25 ttl: ttl,
26 }
27
28 // Start cleanup goroutine
29 go cr.cleanup()
30
31 return cr
32}
33
34// Fetch implements the AccountResolver interface with caching
35func (cr *CachedResolver) Fetch(accountID string) (string, error) {
36 // Check cache first
37 cr.mu.RLock()
38 entry, ok := cr.cache[accountID]
39 cr.mu.RUnlock()
40
41 if ok && time.Now().Before(entry.expiresAt) {
42 return entry.jwt, nil
43 }
44
45 // Cache miss or expired, fetch from underlying resolver
46 jwt, err := cr.underlying.Fetch(accountID)
47 if err != nil {
48 return "", err
49 }
50
51 // Update cache
52 cr.mu.Lock()
53 cr.cache[accountID] = &cacheEntry{
54 jwt: jwt,
55 expiresAt: time.Now().Add(cr.ttl),
56 }
57 cr.mu.Unlock()
58
59 return jwt, nil
60}
61
62// Invalidate removes an entry from cache
63func (cr *CachedResolver) Invalidate(accountID string) {
64 cr.mu.Lock()
65 delete(cr.cache, accountID)
66 cr.mu.Unlock()
67}
68
69// cleanup removes expired entries periodically
70func (cr *CachedResolver) cleanup() {
71 ticker := time.NewTicker(cr.ttl / 2)
72 defer ticker.Stop()
73
74 for range ticker.C {
75 cr.mu.Lock()
76 now := time.Now()
77 for key, entry := range cr.cache {
78 if now.After(entry.expiresAt) {
79 delete(cr.cache, key)
80 }
81 }
82 cr.mu.Unlock()
83 }
84}
NATS Server Configuration with Custom Resolver
1# nats-server.conf with custom resolver
2
3port: 4222
4http_port: 8222
5
6# Operator JWT
7operator: /path/to/operator.jwt
8
9# Custom resolver (URL-based)
10resolver: URL(http://localhost:8080/accounts/)
11
12# Or use NATS-based resolver
13# resolver: NATS(nats://resolver-server:4222)
14
15# System account
16system_account: SYS
17
18# TLS
19tls {
20 cert_file: "/path/to/server-cert.pem"
21 key_file: "/path/to/server-key.pem"
22}
23
24# Logging
25debug: false
26trace: false
27logtime: true
Testing Custom Resolver
1package main
2
3import (
4 "testing"
5 "time"
6)
7
8func TestMemoryResolver(t *testing.T) {
9 resolver := NewMemoryResolver()
10
11 // Test Store and Fetch
12 accountID := "ACCOUNT123"
13 jwt := "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9..."
14
15 err := resolver.Store(accountID, jwt)
16 if err != nil {
17 t.Fatalf("Store failed: %v", err)
18 }
19
20 fetched, err := resolver.Fetch(accountID)
21 if err != nil {
22 t.Fatalf("Fetch failed: %v", err)
23 }
24
25 if fetched != jwt {
26 t.Errorf("Expected %s, got %s", jwt, fetched)
27 }
28
29 // Test Delete
30 err = resolver.Delete(accountID)
31 if err != nil {
32 t.Fatalf("Delete failed: %v", err)
33 }
34
35 _, err = resolver.Fetch(accountID)
36 if err == nil {
37 t.Error("Expected error after delete")
38 }
39}
40
41func TestCachedResolver(t *testing.T) {
42 underlying := NewMemoryResolver()
43 underlying.Store("ACC1", "jwt1")
44
45 cached := NewCachedResolver(underlying, 1*time.Second)
46
47 // First fetch - cache miss
48 jwt1, _ := cached.Fetch("ACC1")
49
50 // Second fetch - cache hit
51 jwt2, _ := cached.Fetch("ACC1")
52
53 if jwt1 != jwt2 {
54 t.Error("Cache not working")
55 }
56
57 // Wait for expiry
58 time.Sleep(2 * time.Second)
59
60 // Should fetch from underlying again
61 jwt3, _ := cached.Fetch("ACC1")
62 if jwt3 != jwt1 {
63 t.Error("Expired cache not refreshed")
64 }
65}
comments powered by Disqus