post-cover
Serverless is kinda booty...
(and go is great)
Written Thu Nov 02 2023
By Michael Freno
56 Hits
Serverless is kinda booty...
(and go is great)

So I have been trying out different languages the last few months.

Go, rust and OCaml. I had gotten tired of Typescript and its many weirdnesses, and for a few months I really focused on learning those three aforementioned language, and mostly enjoying the experience immensely(at least with Go and OCaml). So after a little while of doing that I revisited this website and refine a few things here and there. I wanted the comment section to use a WebSocket for real time updates. It was only the second time I had done a WebSocket (my first experience is covered in three parts part 1, part 2, and part 3) and as the rest of this site is done with Typescript I figured I would continue on with that and do the WebSocket with ts as I had done the first time.

I quickly decided to scrape that.

Maybe it was due to experiencing a much better time with OCaml and Go, but I had a very short patience with trying to make the WebSocket. I was running into both deployment issues and run of the mill development issues, but the lack of ability to actually see into the problem was really killing me, trying to even find where errors are being logged is annoying when dealing with serverless. And even when I found them, the time delay between trying to fix the problem and actually seeing if the problem was fixed was painfully long.

Eventually, I got it working. And the experience was dog water. The function execution time was anywhere from 200ms - 2.5seconds on a cold start. (For instance - REPORT RequestId: b2c5ce4f-a334-457e-985c-7bbf91d172a8 Duration: 322.21 ms Billed Duration: 323 ms Memory Size: 1024 MB Max Memory Used: 132 MB Init Duration: 1701.55 ms - taken from CloudWatch) Even when broadcasting to even small numbers of clients without a cold start, the round trip response was often over an entire second or 2. I was doing a whole bunch of work arounds for the sending user to see a response faster than waiting for the WebSocket response. Overall it was a terrible experience.

So I decided to scrape it.

I decided to use gorilla for the WebSocket library. It was very easy to understand, and the documentation was great. The only rough edge is my comparative skill deficiency with Go vs JS/TS. And learning a new thing is never a bad thing.

It is by no means well done, but it works, and works so much better than the previous implementation that its kind of embarassing. The go code is quite bad, but it works something like 2 orders of magnitude faster and more importantly than that, the working are actually easily visible! Also, all it needs to build and deploy is a simple dockerfile. A massive improvement over the serverless framework mess.

Here is the entire go code, by all means critique the hell out of it, would love to know where improvements can be made

package main

import (
	"database/sql"
	"encoding/json"
	"fmt"
	_ "github.com/go-sql-driver/mysql"
	"github.com/google/uuid"
	"github.com/gorilla/websocket"
	"github.com/joho/godotenv"
	"log"
	"net/http"
	"os"
	"strings"
	"sync"
)

type Data struct {
	Action          string  `json:"action"`
	DeleteType      *string `json:"deleteType"`
	PostType        string  `json:"postType"`
	PostID          *int    `json:"postID"`
	InvokerID       string  `json:"invokerID"`
	CommentBody     *string `json:"commentBody"`
	ParentCommentID *int    `json:"parentCommentID"`
	CommentID       *int    `json:"commentID"`
	Reaction        *string `json:"reactionType"`
}

type Client struct {
	ID          string
	Conn        *websocket.Conn
	ChannelID   *int
	ChannelType *string
}

var clients = make(map[string]*Client)
var lock = sync.RWMutex{}

var upgrader = websocket.Upgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
}

var db *sql.DB

// for debugging
func PrintClients(clients []*Client) {
	lock.RLock()
	defer lock.RUnlock()
	for _, client := range clients {
		log.Println("ID:", &client.ID)
		log.Println("Connection:", &client.Conn)
		log.Println("ChannelID:", &client.ChannelID)
		log.Println("ChannelType:", &client.ChannelType)
		log.Println()
	}
}
func PrintClientsMap() {
	lock.RLock()
	defer lock.RUnlock()
	for key, client := range clients {
		log.Println("Client Key:", key)
		log.Println("ID:", client.ID)
		log.Println("Connection:", &client.Conn)
		log.Println("ChannelID:", client.ChannelID)
		log.Println("ChannelType:", client.ChannelType)
		log.Println()
	}
}

func createDBConnection() *sql.DB {
	err := godotenv.Load()
	if err != nil {
		log.Printf("Error loading .env file")
	}

	db, err := sql.Open("mysql", os.Getenv("DSN"))
	if err != nil {
		log.Fatalf("failed to connect: %v", err)
	}

	if err := db.Ping(); err != nil {
		log.Fatalf("failed to ping: %v", err)
	}
	return db
}

func getAllConnectionsInChannel(post_id int, post_type string) []*Client {
	lock.RLock()
	defer lock.RUnlock()
	var clientsInChannel []*Client
	for _, client := range clients {
		if client.ChannelID != nil && client.ChannelType != nil &&
			*client.ChannelID == post_id && *client.ChannelType == post_type {
			clientsInChannel = append(clientsInChannel, client)
		}
	}
	return clientsInChannel
}

func broadcast(message []byte, clients []*Client) {
	for _, client := range clients {
		err := client.Conn.WriteMessage(websocket.TextMessage, message)
		if err != nil {
			fmt.Println("write error:", err)
			client.Conn.Close()
		}
	}
}

func channelUpdate(data Data, client *Client) {
	lock.Lock()
	defer lock.Unlock()
	client, exists := clients[client.ID]
	if exists {
		client.ChannelType = &data.PostType
		client.ChannelID = data.PostID
	}
}

func commentCreation(data Data) {
	noParentSafe := -1
	if data.ParentCommentID != nil {
		noParentSafe = *data.ParentCommentID
	}
	query := fmt.Sprintf(`INSERT INTO Comment (body, %s, parent_comment_id, commenter_id) VALUES (?, ?, ?, ?)`, (data.PostType + "_id"))
	params := []interface{}{
		data.CommentBody,
		data.PostID,
		data.ParentCommentID,
		data.InvokerID,
	}
	res, err := db.Exec(query, params...)
	if err != nil {
		log.Printf("Failed to execute query: %v", err)
		return
	}
	commentID, err := res.LastInsertId()
	if err != nil {
		log.Printf("Failed to retrieve ID: %v", err)
		return
	}
	broadcastTargets := getAllConnectionsInChannel(*data.PostID, data.PostType)
	jsonMsg, err := json.Marshal(&struct {
		Action        string `json:"action"`
		CommentID     int64  `json:"commentID"`
		CommentParent int    `json:"commentParent"`
		CommentBody   string `json:"commentBody"`
		CommenterID   string `json:"commenterID"`
	}{
		Action:        "commentCreationBroadcast",
		CommentID:     commentID,
		CommentParent: noParentSafe,
		CommentBody:   *data.CommentBody,
		CommenterID:   data.InvokerID,
	})

	if err != nil {
		log.Printf("Failed to create JSON message: %v", err)
		return
	}
	broadcast(jsonMsg, broadcastTargets)
}

func commentUpdate(data Data) {
	const query = `UPDATE Comment SET body = ?, edited = ? WHERE id = ?`
	params := []interface{}{
		data.CommentBody,
		true,
		data.CommentID,
	}
	_, err := db.Exec(query, params...)
	if err != nil {
		log.Printf("Failed to execute query: %v", err)
		return
	}
	broadcastTargets := getAllConnectionsInChannel(*data.PostID, data.PostType)
	jsonMsg, err := json.Marshal(&struct {
		Action      string `json:"action"`
		CommentID   int    `json:"commentID"`
		CommentBody string `json:"commentBody"`
	}{
		Action:      "commentUpdateBroadcast",
		CommentID:   *data.CommentID,
		CommentBody: *data.CommentBody,
	})

	if err != nil {
		log.Printf("Failed to create JSON message: %v", err)
		return
	}
	broadcast(jsonMsg, broadcastTargets)
}

func commentDeletion(data Data) {
	if *data.DeleteType == "user" || (*data.DeleteType == "admin" && data.InvokerID == os.Getenv("ADMIN_ID")) {
		var params []interface{}
		var query string
		deletionBody := fmt.Sprintf("[comment removed by %s]", *data.DeleteType)

		if *data.DeleteType == "user" {
			query = `UPDATE Comment SET body = ?, edited = ? WHERE id = ?`
			params = []interface{}{
				deletionBody,
				false,
				data.CommentID,
			}
		} else {
			query = `UPDATE Comment SET body = ?, edited = ?, commenter_id = ? WHERE id = ?`
			params = []interface{}{
				deletionBody,
				false,
				0,
				data.CommentID,
			}
		}
		_, err := db.Exec(query, params...)
		if err != nil {
			log.Printf("Failed to execute query: %v", err)
			return
		}

		broadcastTargets := getAllConnectionsInChannel(*data.PostID, data.PostType)
		jsonMsg, err := json.Marshal(&struct {
			Action      string `json:"action"`
			CommentID   int    `json:"commentID"`
			CommentBody string `json:"commentBody"`
		}{
			Action:      "commentDeletionBroadcast",
			CommentID:   *data.CommentID,
			CommentBody: deletionBody,
		})

		if err != nil {
			log.Printf("Failed to create JSON message: %v", err)
			return
		}
		broadcast(jsonMsg, broadcastTargets)
	} else if *data.DeleteType == "full" {
		query := `DELETE FROM Comment WHERE id = ?`
		_, err := db.Exec(query, data.CommentID)
		if err != nil {
			log.Printf("Failed to execute query %v", err)
			return
		}
		broadcastTargets := getAllConnectionsInChannel(*data.PostID, data.PostType)
		jsonMsg, err := json.Marshal(&struct {
			Action    string `json:"action"`
			CommentID int    `json:"commentID"`
		}{
			Action:    "commentDeletionBroadcast",
			CommentID: *data.CommentID,
		})
		if err != nil {
			log.Printf("Failed to create JSON message: %v", err)
			return
		}
		broadcast(jsonMsg, broadcastTargets)
	}
}

func commentReaction(data Data) {
	if *data.Reaction == "upVote" || *data.Reaction == "downVote" {
		commentPoints(data)
	} else {
		//first delete
		deleteQuery := `DELETE FROM CommentReaction WHERE type = ? AND comment_id = ? AND user_id = ?`
		params := []interface{}{
			*data.Reaction,
			*data.CommentID,
			data.InvokerID,
		}
		res, err := db.Exec(deleteQuery, params...)
		if err != nil {
			log.Printf("Failed to execute query: %v", err)
			return
		}
		affectedRows, err := res.RowsAffected()
		if err != nil {
			log.Printf("Failed to get affected row count: %v", err)
			return
		}
		var endEffect = "deletion"
		if affectedRows == 0 {
			insertQuery := `INSERT INTO CommentReaction (type, comment_id, user_id) VALUES (?, ?, ?)`
			_, err := db.Exec(insertQuery, params...)
			if err != nil {
				log.Printf("Failed to get affected row count: %v", err)
				return
			}
			endEffect = "creation"
		}
		broadcastTargets := getAllConnectionsInChannel(*data.PostID, data.PostType)
		jsonMsg, err := json.Marshal(&struct {
			Action         string `json:"action"`
			ReactionType   string `json:"reactionType"`
			EndEffect      string `json:"endEffect"`
			ReactingUserID string `json:"reactingUserID"`
			CommentID      int    `json:"commentID"`
		}{
			Action:         "commentReactionBroadcast",
			ReactionType:   *data.Reaction,
			EndEffect:      endEffect,
			ReactingUserID: data.InvokerID,
			CommentID:      *data.CommentID,
		})
		if err != nil {
			log.Printf("Failed to create JSON message: %v", err)
			return
		}
		broadcast(jsonMsg, broadcastTargets)
	}
}

func commentPoints(data Data) {
	deleteQuery := `DELETE FROM CommentReaction WHERE type = ? AND comment_id = ? AND user_id = ?`
	insertQuery := `INSERT INTO CommentReaction (type, comment_id, user_id) VALUES (?, ?, ?)`
	upVoteParams := []interface{}{
		"upVote",
		*data.CommentID,
		data.InvokerID,
	}
	downVoteParams := []interface{}{
		"downVote",
		*data.CommentID,
		data.InvokerID,
	}
	localBroadcaster := func(endEffect string) {
		broadcastTargets := getAllConnectionsInChannel(*data.PostID, data.PostType)
		jsonMsg, err := json.Marshal(&struct {
			Action         string `json:"action"`
			ReactionType   string `json:"reactionType"`
			EndEffect      string `json:"endEffect"`
			ReactingUserID string `json:"reactingUserID"`
			CommentID      int    `json:"commentID"`
		}{
			Action:         "commentReactionBroadcast",
			ReactionType:   *data.Reaction,
			EndEffect:      endEffect,
			ReactingUserID: data.InvokerID,
			CommentID:      *data.CommentID,
		})
		if err != nil {
			log.Printf("Failed to create JSON message: %v", err)
			return
		}
		broadcast(jsonMsg, broadcastTargets)
	}

	if *data.Reaction == "upVote" {
		//start with delete upVote, if a deletion was done, all good, broadcast it
		//if not, delete a downVote (may or may not exist), and create an upVote. broadcast
		res, err := db.Exec(deleteQuery, upVoteParams...)
		if err != nil {
			log.Printf("Failed to execute query 0: %v", err)
			return
		}
		affectedRows, err := res.RowsAffected()
		if err != nil {
			log.Printf("Failed to get affected row count: %v", err)
			return
		}
		if affectedRows == 0 {
			//delete downVote
			res2, deleteErr := db.Exec(deleteQuery, downVoteParams...)
			if deleteErr != nil {
				log.Printf("Failed to execute query 1: %v", err)
				return
			}
			affectedRows, err := res2.RowsAffected()
			if err != nil {
				log.Printf("Failed to get affected row count: %v", err)
				return
			}
			_, createErr := db.Exec(insertQuery, upVoteParams...)
			if createErr != nil {
				log.Printf("Failed to execute query 2: %v", createErr)
				return
			}
			endEffect := "inversion"
			if affectedRows == 0 {
				endEffect = "creation"
			}
			localBroadcaster(endEffect)

		} else {
			//user already had upVote given, and reclicked upVote button (return to neutral)
			localBroadcaster("deletion")
		}
	} else if *data.Reaction == "downVote" {
		//repeat same as above, but with downVote type
		res, err := db.Exec(deleteQuery, downVoteParams...)
		if err != nil {
			log.Printf("Failed to execute query 3: %v", err)
			return
		}
		affectedRows, err := res.RowsAffected()
		if err != nil {
			log.Printf("Failed to get affected row count: %v", err)
			return
		}
		if affectedRows == 0 {
			res2, deleteErr := db.Exec(deleteQuery, upVoteParams...)
			if deleteErr != nil {
				log.Printf("Failed to execute query 4: %v", deleteErr)
				return
			}
			affectedRows, err := res2.RowsAffected()
			if err != nil {
				log.Printf("Failed to get affected row count: %v", err)
				return
			}
			_, createErr := db.Exec(insertQuery, downVoteParams...)
			if createErr != nil {
				log.Printf("Failed to execute query 5: %v", createErr)
				return
			}
			endEffect := "inversion"
			if affectedRows == 0 {
				endEffect = "creation"
			}
			localBroadcaster(endEffect)
		} else {
			//user already had downVote given, and reclicked downVote button (return to neutral)
			localBroadcaster("deletion")
		}
	}
}

func postLike(data Data) {
	alreadyLikedCheckQuery := fmt.Sprintf(`DELETE FROM %sLike WHERE user_id = ?, AND %s_id = ?`, strings.Title(data.PostType), data.PostType)
	params := []interface{}{
		data.InvokerID,
		data.PostID,
	}
	res, err := db.Exec(alreadyLikedCheckQuery, params...)
	if err != nil {
		log.Printf("Failed to execute query: %v", err)
		return
	}
	affectedRows, err := res.RowsAffected()
	if err != nil {
		log.Printf("Failed to get affected row count: %v", err)
		return
	}
	if affectedRows == 0 {
		//add new postLike
		query := fmt.Sprintf(`INSERT INTO %sLike (user_id, %s_id) VALUES (?, ?)`, strings.Title(data.PostType), data.PostType)
		params := []interface{}{
			data.InvokerID,
			data.PostID,
		}
		_, err := db.Exec(query, params...)
		if err != nil {
			log.Printf("Failed to execute query: %v", err)
			return
		}
		//broadcast increment
		broadcastTargets := getAllConnectionsInChannel(*data.PostID, data.PostType)
		jsonMsg, err := json.Marshal(&struct {
			Action  string `json:"action"`
			LikerID string `json:"likerID"`
			Change  int    `json:"change"`
		}{
			Action:  "postLikeBroadcast",
			LikerID: data.InvokerID,
			Change:  1,
		})
		if err != nil {
			log.Printf("Failed to create JSON message: %v", err)
			return
		}
		broadcast(jsonMsg, broadcastTargets)
	} else {
		//broadcast decrement
		broadcastTargets := getAllConnectionsInChannel(*data.PostID, data.PostType)
		jsonMsg, err := json.Marshal(&struct {
			Action  string `json:"action"`
			LikerID string `json:"likerID"`
			Change  int    `json:"change"`
		}{
			Action:  "postLikeBroadcast",
			LikerID: data.InvokerID,
			Change:  -1,
		})
		if err != nil {
			log.Printf("Failed to create JSON message: %v", err)
			return
		}
		broadcast(jsonMsg, broadcastTargets)
	}

}

func reader(client *Client) {
	for {
		messageType, p, err := client.Conn.ReadMessage()
		if err != nil {
			lock.Lock()
			delete(clients, client.ID)
			lock.Unlock()

			client.Conn.Close()
			log.Println(err)
			return
		}
		jsonData := string(p)
		var data Data

		parse_err := json.Unmarshal([]byte(jsonData), &data)
		if parse_err != nil {
			log.Printf("Error occurred during unmarshaling. Error: %s", err.Error())
		}

		log.Println(data.Action)

		switch data.Action {
		case "channelUpdate":
			channelUpdate(data, client)
		case "commentCreation":
			commentCreation(data)
		case "commentUpdate":
			commentUpdate(data)
		case "commentDeletion":
			commentDeletion(data)
		case "commentReaction":
			commentReaction(data)
		case "postLike":
			postLike(data)
		default:
			log.Printf("Unrecognized action: %s", data.Action)
		}

		if err := client.Conn.WriteMessage(messageType, p); err != nil {
			log.Println(err)
			return
		}
	}
}

func wsEndpoint(writer http.ResponseWriter, req *http.Request) {
	upgrader.CheckOrigin = func(req *http.Request) bool { return true }

	ws, err := upgrader.Upgrade(writer, req, nil)
	if err != nil {
		log.Println(err)
	}
	client := &Client{
		ID:   uuid.New().String(),
		Conn: ws,
	}
	lock.Lock()
	clients[client.ID] = client
	lock.Unlock()
	reader(client)
}

func setupRoutes() {
	http.HandleFunc("/", wsEndpoint)
}

func main() {
	db = createDBConnection()
	defer db.Close()
	setupRoutes()

	err := godotenv.Load()
	if err != nil {
		log.Printf("Error loading .env file")
	}
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080" // Default port if not specified
	}
	log.Fatal(http.ListenAndServe(":"+port, nil))

}
Comments
No Comments Yet