小能豆

How to create topics in websocket like stomp or another protocol golang

go

How to create topics and send websocket message in golang? I’m using the gorilla/websocket library to create my websocket but I wanted to create topics to send and receive messages, does anyone have any ideas on how to do this? I’ll leave my code below:

package main

import (
    "fmt"
    "log"
    "net/http"

    "github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
    ReadBufferSize:  1024,
    WriteBufferSize: 1024,
}

func main() {
    http.HandleFunc("/gs-guide-websocket", wsHandler)
    log.Fatal(http.ListenAndServe("192.168.18.8:8080", nil))

    // Fecha a conexão

}

func wsHandler(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }

    defer conn.Close()

    for {
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Println(err)
            return
        }

        fmt.Println("Received message:", string(message))

        err = conn.WriteMessage(messageType, message)

        if err != nil {
            log.Println(err)
            return
        }

    }
}

Thank you very much for your help, whoever can help


阅读 71

收藏
2023-12-18

共1个答案

小能豆

To implement a publish-subscribe pattern (topics) with WebSocket connections in Go using the gorilla/websocket library, you can maintain a list of connected clients and manage topics by grouping clients based on their interests. Here’s a simple example:

package main

import (
    "fmt"
    "log"
    "net/http"
    "sync"

    "github.com/gorilla/websocket"
)

var (
    upgrader = websocket.Upgrader{
        ReadBufferSize:  1024,
        WriteBufferSize: 1024,
    }

    clientsMu sync.Mutex
    clients   = make(map[*websocket.Conn]struct{})
)

func main() {
    http.HandleFunc("/gs-guide-websocket", wsHandler)
    log.Fatal(http.ListenAndServe("192.168.18.8:8080", nil))
}

func wsHandler(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }
    defer func() {
        conn.Close()
        clientsMu.Lock()
        delete(clients, conn)
        clientsMu.Unlock()
    }()

    clientsMu.Lock()
    clients[conn] = struct{}{}
    clientsMu.Unlock()

    for {
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Println(err)
            return
        }

        fmt.Println("Received message:", string(message))

        // Broadcast the message to all connected clients
        broadcast(messageType, message)
    }
}

// Broadcast sends a message to all connected clients
func broadcast(messageType int, message []byte) {
    clientsMu.Lock()
    defer clientsMu.Unlock()

    for client := range clients {
        err := client.WriteMessage(messageType, message)
        if err != nil {
            log.Println(err)
            // Remove disconnected client
            delete(clients, client)
        }
    }
}

This example maintains a set of connected clients and broadcasts incoming messages to all connected clients. To implement topics, you can extend this by introducing a topic-based system where clients subscribe to specific topics and messages are only sent to clients subscribed to those topics.

Here’s a simplified example:

type Client struct {
    conn  *websocket.Conn
    topics map[string]struct{}
}

var (
    clientsMu sync.Mutex
    clients   = make(map[*Client]struct{})
)

func main() {
    http.HandleFunc("/gs-guide-websocket", wsHandler)
    log.Fatal(http.ListenAndServe("192.168.18.8:8080", nil))
}

func wsHandler(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        log.Println(err)
        return
    }
    defer conn.Close()

    client := &Client{
        conn:   conn,
        topics: make(map[string]struct{}),
    }

    clientsMu.Lock()
    clients[client] = struct{}{}
    clientsMu.Unlock()

    for {
        messageType, message, err := conn.ReadMessage()
        if err != nil {
            log.Println(err)
            return
        }

        fmt.Println("Received message:", string(message))

        // Handle message and topics here
        handleMessage(client, messageType, message)
    }
}

func handleMessage(client *Client, messageType int, message []byte) {
    // Parse the message and extract the topic information
    // Subscribe or unsubscribe the client to/from the topic
    // Broadcast the message to clients subscribed to the topic
    // Example:
    // if strings.HasPrefix(string(message), "subscribe:") {
    //     topic := strings.TrimPrefix(string(message), "subscribe:")
    //     client.subscribe(topic)
    // } else if strings.HasPrefix(string(message), "unsubscribe:") {
    //     topic := strings.TrimPrefix(string(message), "unsubscribe:")
    //     client.unsubscribe(topic)
    // } else {
    //     broadcastToTopic(messageType, message, "default")
    // }
}

func broadcastToTopic(messageType int, message []byte, topic string) {
    clientsMu.Lock()
    defer clientsMu.Unlock()

    for client := range clients {
        if _, subscribed := client.topics[topic]; subscribed {
            err := client.conn.WriteMessage(messageType, message)
            if err != nil {
                log.Println(err)
                // Remove disconnected client
                delete(clients, client)
            }
        }
    }
}

This is a basic example, and you might need to adapt it based on your specific requirements. Additionally, you may want to implement proper error handling, topic subscription/unsubscription logic, and secure communication with appropriate authentication.

2023-12-18