feat(websocket): add deduplication support to PeriodicWrite function and introduce DeepEqual utility

This commit is contained in:
yusing
2025-09-13 22:37:51 +08:00
parent a483e15a20
commit 99c1922342
3 changed files with 285 additions and 4 deletions

View File

@@ -16,6 +16,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
"github.com/yusing/go-proxy/internal/common"
)
@@ -121,10 +122,21 @@ func NewManagerWithUpgrade(c *gin.Context) (*Manager, error) {
return cm, nil
}
// Periodic writes data to the connection periodically.
func (cm *Manager) Context() context.Context {
return cm.ctx
}
// Periodic writes data to the connection periodically, with deduplication.
// If the connection is closed, the error is returned.
// If the write timeout is reached, ErrWriteTimeout is returned.
func (cm *Manager) PeriodicWrite(interval time.Duration, getData func() (any, error)) error {
func (cm *Manager) PeriodicWrite(interval time.Duration, getData func() (any, error), deduplicate ...DeduplicateFunc) error {
var lastData any
var equals DeduplicateFunc
if len(deduplicate) > 0 {
equals = deduplicate[0]
}
write := func() {
data, err := getData()
if err != nil {
@@ -133,6 +145,13 @@ func (cm *Manager) PeriodicWrite(interval time.Duration, getData func() (any, er
return
}
// skip if the data is the same as the last data
if equals != nil && equals(data, lastData) {
return
}
lastData = data
if err := cm.WriteJSON(data, interval); err != nil {
cm.err = err
cm.Close()
@@ -214,6 +233,17 @@ func (cm *Manager) ReadJSON(out any, timeout time.Duration) error {
}
}
func (cm *Manager) ReadBinary(timeout time.Duration) ([]byte, error) {
select {
case <-cm.ctx.Done():
return nil, cm.err
case data := <-cm.readCh:
return data, nil
case <-time.After(timeout):
return nil, ErrReadTimeout
}
}
// Close closes the connection and cancels the context
func (cm *Manager) Close() {
cm.closeOnce.Do(cm.close)
@@ -230,6 +260,12 @@ func (cm *Manager) close() {
cm.conn.Close()
cm.pingCheckTicker.Stop()
if cm.err != nil {
log.Debug().Caller(4).Msg("Closing WebSocket connection: " + cm.err.Error())
} else {
log.Debug().Caller(4).Msg("Closing WebSocket connection")
}
}
// Done returns a channel that is closed when the context is done or the connection is closed

View File

@@ -7,14 +7,16 @@ import (
apitypes "github.com/yusing/go-proxy/internal/api/types"
)
func PeriodicWrite(c *gin.Context, interval time.Duration, get func() (any, error)) {
type DeduplicateFunc func(last, current any) bool
func PeriodicWrite(c *gin.Context, interval time.Duration, get func() (any, error), deduplicate ...DeduplicateFunc) {
manager, err := NewManagerWithUpgrade(c)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to upgrade to websocket"))
return
}
defer manager.Close()
err = manager.PeriodicWrite(interval, get)
err = manager.PeriodicWrite(interval, get, deduplicate...)
if err != nil {
c.Error(apitypes.InternalServerError(err, "failed to write to websocket"))
}