Files
godoxy-yusing/agent/pkg/agent/stream/header.go
Yuzerion 6fac5d2d3e feat(agent): agent stream tunneling with TLS and dTLS (UDP) (#188)
* **New Features**
  * Multiplexed TLS port: HTTP API and a custom stream protocol can share one port via ALPN.
  * Agent-side TCP and DTLS/UDP stream tunneling with health-check support and runtime capability detection.
  * Agents now advertise per-agent stream support (TCP/UDP).

* **Documentation**
  * Added comprehensive stream protocol documentation.

* **Tests**
  * Extended integration and concurrency tests covering multiplexing, TCP/UDP streams, and health checks.

* **Chores**
  * Compose/template updated to expose both TCP and UDP ports.
2026-01-09 10:52:35 +08:00

118 lines
2.9 KiB
Go

package stream
import (
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"reflect"
"unsafe"
)
const (
versionSize = 8
hostSize = 255
portSize = 5
flagSize = 1
checksumSize = 4 // crc32 checksum
headerSize = versionSize + 1 + hostSize + 1 + portSize + flagSize + checksumSize
)
var version = [versionSize]byte{'0', '.', '1', '.', '0', 0, 0, 0}
var ErrInvalidHeader = errors.New("invalid header")
var ErrCloseImmediately = errors.New("close immediately")
type FlagType uint8
const FlagCloseImmediately FlagType = 1 << iota
type StreamRequestHeader struct {
Version [versionSize]byte
HostLength byte
Host [hostSize]byte
PortLength byte
Port [portSize]byte
Flag FlagType
Checksum [checksumSize]byte
}
func init() {
if headerSize != reflect.TypeFor[StreamRequestHeader]().Size() {
panic("headerSize does not match the size of StreamRequestHeader")
}
}
func NewStreamRequestHeader(host, port string) (*StreamRequestHeader, error) {
if len(host) > hostSize {
return nil, fmt.Errorf("host is too long: max %d characters, got %d", hostSize, len(host))
}
if len(port) > portSize {
return nil, fmt.Errorf("port is too long: max %d characters, got %d", portSize, len(port))
}
header := &StreamRequestHeader{}
copy(header.Version[:], version[:])
header.HostLength = byte(len(host))
copy(header.Host[:], host)
header.PortLength = byte(len(port))
copy(header.Port[:], port)
header.updateChecksum()
return header, nil
}
func NewStreamHealthCheckHeader() *StreamRequestHeader {
header := &StreamRequestHeader{}
copy(header.Version[:], version[:])
header.Flag |= FlagCloseImmediately
header.updateChecksum()
return header
}
// ToHeader converts header byte array to a copy of itself as a StreamRequestHeader.
func ToHeader(buf *[headerSize]byte) StreamRequestHeader {
return *(*StreamRequestHeader)(unsafe.Pointer(buf))
}
func (h *StreamRequestHeader) GetHostPort() (string, string) {
return string(h.Host[:h.HostLength]), string(h.Port[:h.PortLength])
}
func (h *StreamRequestHeader) Validate() bool {
if h.Version != version {
return false
}
if h.HostLength > hostSize {
return false
}
if h.PortLength > portSize {
return false
}
return h.validateChecksum()
}
func (h *StreamRequestHeader) ShouldCloseImmediately() bool {
return h.Flag&FlagCloseImmediately != 0
}
func (h *StreamRequestHeader) updateChecksum() {
checksum := crc32.ChecksumIEEE(h.BytesWithoutChecksum())
binary.BigEndian.PutUint32(h.Checksum[:], checksum)
}
func (h *StreamRequestHeader) validateChecksum() bool {
checksum := crc32.ChecksumIEEE(h.BytesWithoutChecksum())
return checksum == binary.BigEndian.Uint32(h.Checksum[:])
}
func (h *StreamRequestHeader) BytesWithoutChecksum() []byte {
return (*[headerSize - checksumSize]byte)(unsafe.Pointer(h))[:]
}
func (h *StreamRequestHeader) Bytes() []byte {
return (*[headerSize]byte)(unsafe.Pointer(h))[:]
}