跳转至

Godot pitaya

godot 连接 pitaya

原由:

在选择游戏服务器框架的时候想到了一些东西
    1. 首先是 分布式,原因是大部分情况想需要部署不同的东西到不同的服务器,比如**数据库**,登录服务器游戏服务器,活动,在不同需求的试试,可能添加各种东西,那么分布式一个考虑的选项
    1. 语言的选择上,Java,C++,C#,erlang,python,go,等等,

      Java: 优:很稳定,需要jdk, 劣:写代码那个大呀 C++: 优:性能高 劣:容易bug,崩溃,要求高,编译起来麻烦 c#: 优劣和java类似,写代码是一件很痛苦的事情 erlang: 专门为服务器设计的,但是我没怎么接触,被搁置了 python: 脚本语言,还好,其实还是个人,那语法,缩进,我一直接的很崩溃,找尾部中间加代码,也是几个体力活,都无语了都 go: 优:为服务器设计,编译快,稳定还不错 劣:需要熟悉整体

    1. 跨平台,部署方面以及环境搭建方面,
最终选择go语言
框架选择方面
    1. 安装简单 安装起来却这却那的,linux太常见了,折磨
    1. 分布式 我自己在造轮子,写起来类
    1. 有在更新 技术更新快,没有更新可能都有可能淘汰了
暂时选择pitaya
  • 有在更新
  • 他们有在使用框架
  • 写起来也还算不难
客户端 选择godot 因为开源,整体使用还不错
  • unity 没钱,加各种登录,各种作妖
  • ue 太大了,我笔记本玩不转,有点牛刀戳蚂蚁的感觉

开始正题

**版本选择 - godot 4.6 - pitaya v2@2.11.21 - 客户端和服务器项目 上传到github godot_pitaya.7z 不想下载,可以直接下面看代码

服务器部分

服务器参考的是demo chat

main.go
package main

import (
    "context"
    "fmt"
    "strconv"
    "time"

    "github.com/topfreegames/pitaya/v2/groups"

    "strings"

    "github.com/topfreegames/pitaya/v2"
    "github.com/topfreegames/pitaya/v2/acceptor"
    "github.com/topfreegames/pitaya/v2/component"
    "github.com/topfreegames/pitaya/v2/config"
    "github.com/topfreegames/pitaya/v2/logger"
    "github.com/topfreegames/pitaya/v2/timer"
)

type (
    Room struct {
        component.Base
        timer *timer.Timer
        app   pitaya.Pitaya
    }

    UserMessage struct {
        Name    string `json:"name"`
        Content string `json:"content"`
    }

    NewUser struct {
        Content string `json:"content"`
    }

    AllMembers struct {
        Members []string `json:"members"`
    }

    JoinResponse struct {
        Code   int    `json:"code"`
        Result string `json:"result"`
    }
)

func NewRoom(app pitaya.Pitaya) *Room {
    return &Room{
        app: app,
    }
}

func (r *Room) AfterInit() {
    r.timer = pitaya.NewTimer(time.Minute, func() {
        count, err := r.app.GroupCountMembers(context.Background(), "room")
        logger.Log.Debugf("UserCount: Time=> %s, Count=> %d, Error=> %v", time.Now().String(), count, err)
    })
}

func (r *Room) Join(ctx context.Context, msg []byte) (*JoinResponse, error) {

    s := r.app.GetSessionFromCtx(ctx)
    fakeUID := s.ID()                              // just use s.ID as uid !!!
    err := s.Bind(ctx, strconv.Itoa(int(fakeUID))) // binding session uid

    if err != nil {
        return nil, pitaya.Error(err, "RH-000", map[string]string{"failed": "bind"})
    }

    uids, err := r.app.GroupMembers(ctx, "room")
    if err != nil {
        return nil, err
    }
    s.Push("onMembers", &AllMembers{Members: uids})
    r.app.GroupBroadcast(ctx, "chat", "room", "onNewUser", &NewUser{Content: fmt.Sprintf("New user: %s", s.UID())})
    r.app.GroupAddMember(ctx, "room", s.UID()) // add session to group

    s.OnClose(func() {
        r.app.GroupRemoveMember(ctx, "room", s.UID())
    })

    return &JoinResponse{Result: "success"}, nil
}

func (r *Room) Message(ctx context.Context, msg *UserMessage) {
    err := r.app.GroupBroadcast(ctx, "chat", "room", "onMessage", msg)
    if err != nil {
        fmt.Println("error broadcasting message", err)
    }
}

var app pitaya.Pitaya

func main() {
    conf := config.NewDefaultPitayaConfig()
    conf.Buffer.Handler.LocalProcess = 15
    conf.Heartbeat.Interval = time.Duration(15 * time.Second)
    conf.Buffer.Agent.Messages = 32
    conf.Handler.Messages.Compression = false
    // Standalone 独立模式
    builder := pitaya.NewDefaultBuilder(true, "chat", pitaya.Standalone, map[string]string{}, *conf)
    // Cluster 集群模式 需要 etcd, nats, 看情况还有可能需要开启redis
    //builder := pitaya.NewDefaultBuilder(true, "chat", pitaya.Cluster, map[string]string{}, *conf)
    builder.AddAcceptor(acceptor.NewTCPAcceptor(":3251"))
    builder.Groups = groups.NewMemoryGroupService(builder.Config.Groups.Memory)
    app = builder.Build()

    defer app.Shutdown()

    err := app.GroupCreate(context.Background(), "room")
    if err != nil {
        panic(err)
    }

    // rewrite component and handler name
    room := NewRoom(app)
    app.Register(room,
        component.WithName("room"),
        component.WithNameFunc(strings.ToLower),
    )

    app.Start()
}

客户端部分

  • 在 https://blog.csdn.net/weixin_44627989/article/details/130355014 上的代码基础上进行的修改完成

客户端的设计

    Control         Control   main.gd
        client      Node      client.gd

        txtContent  TextEdit
        txtInput    TextEdit
        txtNickName LineEdit

        btnConnected Button
        btnSend      Button

        txtHost      LineEdit
        txtPort      LineEdit

客户端代码设计3个脚本
main.gd
extends Control

@onready var client: Client = $client

@onready var txt_port: LineEdit = $txtPort
@onready var txt_host: LineEdit = $txtHost
@onready var txt_input: TextEdit = $txtInput
@onready var txt_nick_name: LineEdit = $txtNickName

@onready var txt_content: TextEdit = $txtContent
@onready var txt_rtlabel: RichTextLabel = $txtRTLabel

@onready var btn_connected: Button = $btnConnected
@onready var btn_send: Button = $btnSend


func _ready() -> void:
    btn_send.disabled = true;
    pass

func _on_btn_connected_pressed() -> void:
    var host = txt_host.text;
    var port = int(txt_port.text);

    client.connect_server(host, port, _rev_connected, _rev_msg);
    btn_connected.disabled = true;
    var tween:Tween = create_tween();
    tween.tween_interval(2);
    tween.tween_callback(func():
        if (!client.is_connected_server()):
            btn_connected.disabled = false;
        )

    pass # Replace with function body.

func _push_msg(uname:String, content:String)-> void:
    var tmp_str:String = uname + ":" + content + "\n";
    txt_content.insert_text_at_caret(tmp_str)
    #txt_rtlabel.add_text(str);
    pass

#----------------------------------------------`
func onNewUser(msg) -> void:
    var data = msg.jsonData();
    _push_msg('system', data.content);
    pass

func onMembers(msg) -> void:
    var data = msg.jsonData();
    _push_msg('system',  "members: " + JSON.stringify(data.members));
    pass

func onJoin(succ:bool, msg:Variant, rt:String) -> void:
    if (!succ):
        print("err not get message rep:",msg, rt);
        return;
    var data = msg.jsonData();
    if(data.code == 0):
        _push_msg('system', data.result);
        client.set_push_func('onMessage', onMessage);
        #starx.on('onMessage', onMessage)
    pass

func onMessage(msg) -> void:
    var data = msg.jsonData();
    _push_msg(data.name, data.content)
    pass

func _rev_connected(succ:bool) -> void:
    if (succ):
        print("connect to server");
        client.set_push_func("onNewUser", onNewUser);
        client.set_push_func("onMembers", onMembers);

        client.SendRequest("room.join", "{}", onJoin);
        btn_send.disabled = false;
    else:
        btn_send.disabled = true;
        btn_connected.disabled = false;
    pass

func _rev_msg(msg) -> void:
    print("--msg---:",msg);
    pass

func _on_btn_send_pressed() -> void:
    if (!client.is_connected_server()):
        print(" not connected ")
        return;

    var nn = txt_nick_name.text;
    var ctt = txt_input.text;

    var data = JSON.stringify({"name": nn, "content":ctt }).to_utf8_buffer();
    client.SendNotify('room.message', data);
    pass # Replace with function body.
StreamTCP.gd
extends Node
class_name StreamTCP

####
## 从 复制下来进行的修改
## https://www.bytesnsprites.com/posts/2021/creating-a-tcp-client-in-godot/
###

signal connected
signal data
signal disconnected
signal error

var _status: int = 0
var _stream: StreamPeerTCP = StreamPeerTCP.new()

func _ready() -> void:
    _status = _stream.get_status()

func _process(_delta: float) -> void:
    var new_status: int = _stream.get_status()
    if new_status != _status:
        _status = new_status
        match _status:
            _stream.STATUS_NONE:
                print("Disconnected from host.")
                emit_signal("disconnected")
            _stream.STATUS_CONNECTING:
                print("Connecting to host.")
            _stream.STATUS_CONNECTED:
                print("Connected to host.")
                emit_signal("connected")
            _stream.STATUS_ERROR:
                print("Error with socket stream.")
                emit_signal("error")

    if _status == _stream.STATUS_CONNECTED:
        var available_bytes: int = _stream.get_available_bytes()
        if available_bytes > 0:
            #print("available bytes: ", available_bytes)
            var buf_array: Array = _stream.get_partial_data(available_bytes)
            # Check for read error.
            if buf_array[0] != OK:
                print("Error getting data from stream: ", buf_array[0])
                emit_signal("error")
            else:
                emit_signal("data", buf_array[1])

func is_connected_server() -> bool:
    var state:int = _stream.get_status();
    return state == _stream.STATUS_CONNECTED;

func connect_to_host(host: String, port: int) -> void:
    print("Connecting to %s:%d" % [host, port])
    # Reset status so we can tell if it changes to error again.
    _status = _stream.STATUS_NONE
    if _stream.connect_to_host(host, port) != OK:
        print("Error connecting to host.")
        emit_signal("error")
    else:
        #_stream.set_no_delay(true)
        _stream.poll();

func disconnect_from_host() -> void:
    if (is_connected_server()):
        _stream.disconnect_from_host();

func send(buff: PackedByteArray) -> bool:
    if _status != _stream.STATUS_CONNECTED:
        print("Error: Stream is not currently connected.")
        return false
    var err: int = _stream.put_data(buff)
    if err != OK:
        print("Error writing to stream: ", err)
        return false
    return true
client.gd
extends Node
class_name  Client



 #/**
   #* Package protocol encode.
   #*
   #* Pomelo package format:
   #* +------+-------------+------------------+
   #* | type | body length |       body       |
   #* +------+-------------+------------------+
   #*
   #* Head: 4bytes
   #*   0: package type,
   #*      1 - handshake,
   #*      2 - handshake ack,
   #*      3 - heartbeat,
   #*      4 - data
   #*      5 - kick
   #*   1 - 3: big-endian body length
   #* Body: body length bytes
   #*
   #* @param  {Number}    type   package type
   #* @param  {ByteArray} body   body content in bytes
   #* @return {ByteArray}        new byte array that contains encode result
   #*/

# 5种 基本类型消息

const PACK_TYPE_HANDSHAKE:int = 0x01
const PACK_TYPE_HANDSHAKEACK:int = 0x02
const PACK_TYPE_HEARTBEAT:int = 0x03
const PACK_TYPE_DATA:int = 0x04
const PACK_TYPE_KICK:int = 0x05

const MSG_TYPE_REQUEST:int = 0x00
const MSG_TYPE_NOTIFY:int = 0x01
const MSG_TYPE_RESPONSE:int = 0x02
const MSG_TYPE_PUSH:int = 0x03

const MSG_MASK_ERROR:int = 0x20
const MSG_MASK_GZIP:int = 0x10
const MSG_MASK_MSGROUTECOMPRESS:int = 0x01
const MSG_MASK_MSGTYPE:int = 0x07
const MSG_MASK_SGROUTELENGTH:int = 0xFF
const MSG_MASK_MSGHEADLENGTH:int = 0x02

const HeadLength:int    = 4
const MaxPacketSize:int = 1 << 24 #16MB

var _wait_cb_array = {}
var lastMsgId = 0
var clientHandshakeData = null
var packet_encoder:PomeloPacketEncoder = null
var packet_decoder:PomeloPacketDecoder = null
var message_encoder:MessagesEncoder = null
var compression:Compression = null
var heartbeatTimeout = -1
var requestTimeout = 5
var serializerType = "json";

var push_func = {}

var _is_handshake = false;
var _is_client_connected = false

const RECONNECT_TIMEOUT: float = 3.0

var _host:String = "127.0.0.1"
var _port:int = 40000;
#--------------------------------------------------------------
#--------------------------------------------------------
# 附加回调
var _cb_connected_func = null;
var _cb_msg_func = null;
var heartbeatTimer:Timer = null;

func is_handshaked() -> bool:
    return _is_handshake;

func is_connected_server() -> bool:
    return _is_client_connected;

func set_push_func(key:String, cb:Callable) -> void:
    push_func[key] = cb;

func del_push_func(key) -> bool:
    if (push_func.has(key)):
        push_func.erase(key);
        return true;
    return false;

func connect_server(host:String, port:int, cbconnected:Callable, cbmsg:Callable):
    _cb_connected_func = cbconnected;
    _cb_msg_func = cbmsg;
    _host = host;
    _port = port;
    _stream.connect_to_host(_host, _port);

#--------------------------------------------------------------
#--------------------------------------------------------------
var _stream: StreamTCP = StreamTCP.new()

func _ready() -> void:
    print("connect ready")
    compression = Compression.new()
    packet_encoder = PomeloPacketEncoder.new()
    packet_decoder = PomeloPacketDecoder.new()
    message_encoder = MessagesEncoder.new()
    message_encoder.compression = compression

    heartbeatTimer = Timer.new();
    heartbeatTimer.one_shot = false;
    heartbeatTimer.autostart = false;
    heartbeatTimer.timeout.connect(_handle_Heartbeats_send);
    add_child(heartbeatTimer);

    _stream.connect("connected", _handle_client_connected)
    _stream.connect("disconnected", _handle_client_disconnected)
    _stream.connect("error", _handle_client_error)
    _stream.connect("data", _handle_client_data)
    add_child(_stream)

func _connect_after_timeout(timeout: float) -> void:
    await get_tree().create_timer(timeout).timeout # Delay for timeout
    _stream.connect_to_host(_host, _port)

func _handle_client_connected() -> void:
    print("Client connected to server.")
    _send_handshake();

func _handle_client_data(buf: PackedByteArray) -> void:
    #print("rev data len:",len(buf));
    _try_read_packs(buf);

func _handle_client_disconnected() -> void:
    print("Client disconnected from server.")
    _done_client_disconcted();
    #_connect_after_timeout(RECONNECT_TIMEOUT) # Try to reconnect after 3 seconds

func _handle_client_error() -> void:
    print("Client error.")
    _done_client_disconcted();
    #_connect_after_timeout(RECONNECT_TIMEOUT) # Try to reconnect after 3 seconds

func _send_handshake() -> void:
    var hand_shake_data = {"sys":{"clientVersion":"1.0", "clientBuildNumber":"999","platform":"pc"}};
    var datstr = JSON.stringify(hand_shake_data)
    var p = packet_encoder.Encode(PACK_TYPE_HANDSHAKE, datstr.to_utf8_buffer())
    if p == null:
        print("net connect fail")
        return

    _stream.send(p)

func _try_read_packs(buf:PackedByteArray) -> void:
    var packets = packet_decoder.Decode(buf)
    if packets == null:
        print("net read fail len:" + str(len(buf)))
        return;

    if (!is_handshaked()):
        var handshakePacket:Packet = packets[0]
        if (handshakePacket.Type != PACK_TYPE_HANDSHAKE):
            print("------ get something:", handshakePacket.Type);
            return;

        print("get handshake type:", handshakePacket.Type)
        if compression.IsCompressed(handshakePacket.Data):
            handshakePacket.Data = compression.InflateData(handshakePacket.Data)
            if handshakePacket.Data == null:
                print("comp err")
                return
        var handshake = handshakePacket.jsonData();
        print("got handshake from sv, data: ", handshake)
        var routeDict = {}
        if (handshake["sys"]):
            var sys = handshake["sys"];
            if (sys["dict"]):
                routeDict = sys["dict"];
            if (sys["heartbeat"]):
                heartbeatTimeout = sys["heartbeat"];
            if (sys["serializer"]):
                serializerType =  sys["serializer"];

        message_encoder.SetDictionary(routeDict)

        var p = packet_encoder.Encode(PACK_TYPE_HANDSHAKEACK, PackedByteArray())
        if p == null:
            return

        _stream.send(p)

        _is_handshake = true;
        _is_client_connected = true

        heartbeatTimer.wait_time = heartbeatTimeout;
        heartbeatTimer.start();

        if (_cb_connected_func):
            _cb_connected_func.call(true);

        return;

    print(" rev pack num:",packets.size());
    for p in packets:
        _handle_packets(p)
    pass
#--------------------------------------------------------------

func _newMsgID():
    lastMsgId += 1
    return lastMsgId

#SendRequest sends a request to the server
func SendRequest(route:String, data:String, cb):
    var m = Message.new()
    m.Type = MSG_TYPE_REQUEST
    m.ID = _newMsgID()
    m.Route = route
    m.Data = data.to_utf8_buffer();
    m.SendAt = int(Time.get_unix_time_from_system())
    m.Callback = cb
    var p = _buildPacket(m)
    _wait_cb_array[m.ID] = m

    print("--- SendRequest route:", route, " msgid:", m.ID);
    _stream.send(p)

#SendNotify sends a notify to the server
func SendNotify(route:String, data:PackedByteArray):
    var m = Message.new()
    m.Type = MSG_TYPE_NOTIFY
    m.ID = _newMsgID()
    m.Route = route
    m.Data = data
    m.SendAt = int(Time.get_unix_time_from_system())
    var p = _buildPacket(m)
    print("--- SendNotify route:", route, " msgid:", m.ID);
    _stream.send(p)

func _buildPacket(msg:Message):
    var encMsg = message_encoder.Encode(msg)
    var p = packet_encoder.Encode(PACK_TYPE_DATA, encMsg)
    return p

func _handle_Heartbeats_send() -> void:
    var p = packet_encoder.Encode(PACK_TYPE_HEARTBEAT, PackedByteArray())
    _stream.send(p)

func _handle_packets(p):
    if p.Type == PACK_TYPE_DATA:
        #handle data
        var m = message_encoder.Decode(p.Data)
        _handle_rev_one_packet(m)
    elif p.Type == PACK_TYPE_KICK:
        _done_client_disconcted();
        print("kick from server");
    elif p.Type == PACK_TYPE_HEARTBEAT:
        print("------ heartbeat----");
        pass
    else:
        print("-- un oper pack:", p.Type);

func _requestReaper() -> void:
    if (!is_connected_server()):
        return;
    for msgid in _wait_cb_array.keys():
        var msg:Message = _wait_cb_array[msgid];
        if (int(Time.get_unix_time_from_system()) - msg.SendAt > requestTimeout):
            _wait_cb_array.erase(msgid);
            _handle_timeout_msg(msg);

func _handle_rev_one_packet(m:Message):
    if m.Type == MSG_TYPE_RESPONSE:
        print("--- respones msgid:", m.ID, " Route:", m.Route);
        if m.ID in _wait_cb_array:
            var cb_msg = _wait_cb_array[m.ID]
            _wait_cb_array.erase(m.ID)
            cb_msg.Callback.call(true, m, "success")
    else:
        _handle_push_msg(m)

func _handle_push_msg(msg:Message):
    print(" _handle_push_msg ...msg.Route :",msg.Route, " type:" , msg.Type)
    var cb_func = push_func.get(msg.Route)
    if cb_func:
        cb_func.call(msg)
    elif (_cb_msg_func):
        _cb_msg_func.call(msg);

func _handle_timeout_msg(msg:Message):
    msg.Callback.call(false, msg, " message request is time out")

func _process(_delta: float) -> void:
    _requestReaper();

func _done_client_disconcted() -> void:
    _is_handshake = false;
    _is_client_connected = false
    _stream.disconnect_from_host();
    heartbeatTimer.stop();
    if (_cb_connected_func):
        _cb_connected_func.call(false);

func _exit_tree():
    _done_client_disconcted();

#--------------------------------------------------------------
# 可以在下面的位置好到对应的编码解码以及消息的对应处理
# github.com/topfreegames/pitaya/v2@v2.11.21/conn
#--------------------------------------------------------------
class Message:
    var Type:int = 0
    var ID:int = 0
    var Route:String = ""
    var Data:PackedByteArray = PackedByteArray()
    var compressed:bool = false
    var Err:bool = false
    var SendAt:int = 0
    var Callback = null

    func jsonData() -> Variant:
        return JSON.parse_string(Data.get_string_from_utf8());

class Packet:
    var Type:int = -1
    var Length:int = -1
    var Data:PackedByteArray = PackedByteArray()

    func jsonData() -> Variant:
        return JSON.parse_string(Data.get_string_from_utf8());

class PomeloPacketEncoder:
#Encode create a packet.Packet from  the raw bytes slice and then encode to network bytes slice
#Protocol refs: https://github.com/NetEase/pomelo/wiki/Communication-Protocol
#
# -<type>-|--------<length>--------|-<data>-
# --------|------------------------|--------
# 1 byte packet type, 3 bytes packet data length(big end), and data segment
    ## return null or PackedByteArray
    func Encode(typ:int, data:PackedByteArray):
        if typ < PACK_TYPE_HANDSHAKE || typ > PACK_TYPE_KICK:
            return null
        if len(data) > MaxPacketSize:
            return null
        var buf = PackedByteArray()
        buf.append(typ)
        buf.append_array(IntToBytes(len(data)))
        buf.append_array(data)
        return buf

    func IntToBytes(n):
        var buf = PackedByteArray()
        buf.append((n >> 16) & 0xFF)
        buf.append((n >> 8) & 0xFF)
        buf.append(n & 0xFF)
        return buf

class PomeloPacketDecoder:

    func BytesToInt(b):
        var result = 0
        for bytev in b:
            result = int(result<<8) + int(bytev)
        return result

    func ParseHeader(header):
        if len(header) != HeadLength:
            return null
        var typ = header[0]
        if typ < PACK_TYPE_HANDSHAKE || typ > PACK_TYPE_KICK:
            print("err header nu" + str(typ))
            return null
        var size = BytesToInt(header.slice(1))
        if size > MaxPacketSize:
            print("err size" + str(size))
            return null
        return [size, typ]

    #Decode decode the network bytes slice to packet.Packet(s)
    ## return null or [Packet]
    func Decode(buf):
        # check length
        if len(buf) < HeadLength:
            print("err header 0 ")
            return null

        # first time
        var header = buf.slice(0, HeadLength)
        buf = buf.slice(HeadLength) if len(buf) > HeadLength else []
        var info = ParseHeader(header)
        if info == null:
            print("err header 1 " + str(len(header)))
            return null
        var size = info[0]
        var typ = info[1] 
        var packets = []
        while size <= len(buf):
            var p = Packet.new()
            p.Type = typ
            p.Length = size
            packets.append(p)
            if size > 0:
                p.Data = buf.slice(0, size)
                buf = buf.slice(size) if size < len(buf) else []
            if len(buf) < HeadLength:
                break
            header = buf.slice(0, HeadLength)
            buf = buf.slice(HeadLength) if HeadLength < len(buf) else []
            info = ParseHeader(header)
            if info == null:
                break 
            size = info[0]
            typ = info[1]
        return packets

class MessagesEncoder:
    var DataCompression = false
    var compression = null
    var routes = {}#压缩字典
    var codes = {}

    func SetDictionary(dict):
        if dict == null:
            return null
        #routesCodesMutex.Lock()
        #defer routesCodesMutex.Unlock()

        for route in dict:
            var code = dict[route]
            var r = route.replace(" ", "")

            #duplication check
            if r in routes:
                return -1
            if code in codes:
                return -2
            #update map, using last value when key duplicated
            routes[r] = code
            codes[code] = r

        return null

    #GetDictionary gets the routes map which is used to compress route.
    func GetDictionary() -> Dictionary:
        #routesCodesMutex.RLock()
        #defer routesCodesMutex.RUnlock()
        var dict = {}
        for k in routes:
            dict[k] = routes[k]
        return dict

    func routable(t) -> bool:
        return t == MSG_TYPE_REQUEST || t == MSG_TYPE_NOTIFY || t == MSG_TYPE_PUSH

    func invalidType(t) -> bool:
        return t < MSG_TYPE_REQUEST || t > MSG_TYPE_PUSH

    func Uint16(b) -> int:
        var b1 = b[1] & 0xFFFF
        var b2 = (b[0] & 0xFFFF)<<8
        return b1 | b2

#// Encode marshals message to binary format. Different message types is corresponding to
#// different message header, message types is identified by 2-4 bit of flag field. The
#// relationship between message types and message header is presented as follows:
#// ------------------------------------------
#// |   type   |  flag  |       other        |
#// |----------|--------|--------------------|
#// | request  |----000-|<message id>|<route>|
#// | notify   |----001-|<route>             |
#// | response |----010-|<message id>        |
#// | push     |----011-|<route>             |
#// ------------------------------------------
#// The figure above indicates that the bit does not affect the type of message.
#// See ref: https://github.com/topfreegames/pitaya/v2/blob/master/docs/communication_protocol.md

    func Encode(message:Message):
        if invalidType(message.Type):
            return null
        var buf:PackedByteArray = PackedByteArray()
        var flag = (message.Type & 0xFF) << 1

        #routesCodesMutex.RLock()
        var compressed = message.Route in routes
        var code = routes.get(message.Route)

        #routesCodesMutex.RUnlock()
        if compressed:
            flag |= MSG_MASK_MSGROUTECOMPRESS

        if message.Err:
            flag |= MSG_MASK_ERROR

        buf.append(flag)

        if message.Type == MSG_TYPE_REQUEST || message.Type == MSG_TYPE_RESPONSE:
            var n:int = message.ID
            #variant length encode
            while true:
                var b = (n % 128) & 0xFF
                n = int(n/128.0);
                if n != 0:
                    buf.append(b+128)
                else:
                    buf.append(b)
                    break

        if routable(message.Type):
            if compressed:
                buf.append((code>>8)&0xFF)
                buf.append(code&0xFF)
            else:
                buf.append(len(message.Route)&0xFF)
                buf.append_array(message.Route.to_utf8_buffer())


        if DataCompression:
            var d = compression.DeflateData(message.Data)
            if d == null:
                return null

            if len(d) < len(message.Data):
                message.Data = d
                buf[0] |= MSG_MASK_GZIP

        buf.append_array(message.Data)
        return buf


    # github.com/topfreegames/pitaya/v2@v2.11.21/conn
    # Decode unmarshal the bytes slice to a message
    # See ref: https://github.com/topfreegames/pitaya/v2/blob/master/docs/communication_protocol.md
    func Decode(data) -> Message:
        if len(data) < MSG_MASK_MSGHEADLENGTH:
            return null
        var m = Message.new()
        var flag = data[0]
        var offset = 1
        m.Type = (0xFF & (flag >> 1)) & MSG_MASK_MSGTYPE

        if invalidType(m.Type):
            return null

        if m.Type == MSG_TYPE_REQUEST || m.Type == MSG_TYPE_RESPONSE:
            var id = 0
            # little end byte order
            # WARNING: must can be stored in 64 bits integer
            # variant length encode
            for i in range(offset, len(data)):
                var b = data[i];
                id += (b&0x7F) << int(7*(i-offset));
                if b < 128:
                    offset = i + 1
                    break

            #var i = offset
            #while i < len(data):
                #var b = data[i]
                #id += (b&0x7F) * int(pow(2, (7 *i)))
                #if b < 128:
                    #offset = i + 1
                    #break
                #i += 1 

            m.ID = id

        m.Err = flag&MSG_MASK_ERROR == MSG_MASK_ERROR

        if routable(m.Type):
            if flag&MSG_MASK_MSGROUTECOMPRESS == 1:
                m.compressed = true
                var code = Uint16(data.slice(offset, offset + 2))
                #routesCodesMutex.RLock()
                var route = codes.get(code, null)
                #routesCodesMutex.RUnlock()
                if route == null:
                    return null

                m.Route = route
                offset += 2
            else:
                m.compressed = false
                var rl = data[offset]
                offset += 1
                var valb = data.slice(offset, offset + int(rl))
                m.Route = valb.get_string_from_utf8()
                offset += int(rl)


        m.Data = data.slice(offset)
        if flag&MSG_MASK_GZIP == MSG_MASK_GZIP:
            m.Data = compression.InflateData(m.Data)
            if m.Data == null:
                return null

        return m

class Compression:
    func DeflateData(_data) -> PackedByteArray:
        var bb = PackedByteArray()
        #未实现
        return bb

    func InflateData(_data) -> PackedByteArray:
        var bb = PackedByteArray()
        #未实现
        return bb

    func IsCompressed(data) -> bool:
        return len(data) > 2 &&(
            # zlib
            (data[0] == 0x78 &&
            (data[1] == 0x9C ||
            data[1] == 0x01 ||
            data[1] == 0xDA ||
            data[1] == 0x5E)) ||
            #gzip
            (data[0] == 0x1F && data[1] == 0x8B))

评论