Godot pitaya
godot 连接 pitaya¶
原由:¶
在选择游戏服务器框架的时候想到了一些东西¶
-
- 首先是 分布式,原因是大部分情况想需要部署不同的东西到不同的服务器,比如**数据库**,登录服务器,游戏服务器,活动,在不同需求的试试,可能添加各种东西,那么分布式一个考虑的选项
-
- 语言的选择上,Java,C++,C#,erlang,python,go,等等,
Java: 优:很稳定,需要jdk, 劣:写代码那个大呀 C++: 优:性能高 劣:容易bug,崩溃,要求高,编译起来麻烦 c#: 优劣和java类似,写代码是一件很痛苦的事情 erlang: 专门为服务器设计的,但是我没怎么接触,被搁置了 python: 脚本语言,还好,其实还是个人,那语法,缩进,我一直接的很崩溃,找尾部中间加代码,也是几个体力活,都无语了都 go: 优:为服务器设计,编译快,稳定还不错 劣:需要熟悉整体
- 语言的选择上,Java,C++,C#,erlang,python,go,等等,
-
- 跨平台,部署方面以及环境搭建方面,
最终选择go语言¶
框架选择方面¶
-
- 安装简单 安装起来却这却那的,linux太常见了,折磨
-
- 分布式 我自己在造轮子,写起来类
-
- 有在更新 技术更新快,没有更新可能都有可能淘汰了
暂时选择pitaya¶
- 有在更新
- 他们有在使用框架
- 写起来也还算不难
客户端 选择godot 因为开源,整体使用还不错¶
- unity 没钱,加各种登录,各种作妖
- ue 太大了,我笔记本玩不转,有点牛刀戳蚂蚁的感觉
开始正题¶
**版本选择 - godot 4.6 - pitaya v2@2.11.21 - 客户端和服务器项目 上传到github godot_pitaya.7z 不想下载,可以直接下面看代码
服务器部分¶
服务器参考的是demo chat
main.go¶
package main
import (
"context"
"fmt"
"strconv"
"time"
"github.com/topfreegames/pitaya/v2/groups"
"strings"
"github.com/topfreegames/pitaya/v2"
"github.com/topfreegames/pitaya/v2/acceptor"
"github.com/topfreegames/pitaya/v2/component"
"github.com/topfreegames/pitaya/v2/config"
"github.com/topfreegames/pitaya/v2/logger"
"github.com/topfreegames/pitaya/v2/timer"
)
type (
Room struct {
component.Base
timer *timer.Timer
app pitaya.Pitaya
}
UserMessage struct {
Name string `json:"name"`
Content string `json:"content"`
}
NewUser struct {
Content string `json:"content"`
}
AllMembers struct {
Members []string `json:"members"`
}
JoinResponse struct {
Code int `json:"code"`
Result string `json:"result"`
}
)
func NewRoom(app pitaya.Pitaya) *Room {
return &Room{
app: app,
}
}
func (r *Room) AfterInit() {
r.timer = pitaya.NewTimer(time.Minute, func() {
count, err := r.app.GroupCountMembers(context.Background(), "room")
logger.Log.Debugf("UserCount: Time=> %s, Count=> %d, Error=> %v", time.Now().String(), count, err)
})
}
func (r *Room) Join(ctx context.Context, msg []byte) (*JoinResponse, error) {
s := r.app.GetSessionFromCtx(ctx)
fakeUID := s.ID() // just use s.ID as uid !!!
err := s.Bind(ctx, strconv.Itoa(int(fakeUID))) // binding session uid
if err != nil {
return nil, pitaya.Error(err, "RH-000", map[string]string{"failed": "bind"})
}
uids, err := r.app.GroupMembers(ctx, "room")
if err != nil {
return nil, err
}
s.Push("onMembers", &AllMembers{Members: uids})
r.app.GroupBroadcast(ctx, "chat", "room", "onNewUser", &NewUser{Content: fmt.Sprintf("New user: %s", s.UID())})
r.app.GroupAddMember(ctx, "room", s.UID()) // add session to group
s.OnClose(func() {
r.app.GroupRemoveMember(ctx, "room", s.UID())
})
return &JoinResponse{Result: "success"}, nil
}
func (r *Room) Message(ctx context.Context, msg *UserMessage) {
err := r.app.GroupBroadcast(ctx, "chat", "room", "onMessage", msg)
if err != nil {
fmt.Println("error broadcasting message", err)
}
}
var app pitaya.Pitaya
func main() {
conf := config.NewDefaultPitayaConfig()
conf.Buffer.Handler.LocalProcess = 15
conf.Heartbeat.Interval = time.Duration(15 * time.Second)
conf.Buffer.Agent.Messages = 32
conf.Handler.Messages.Compression = false
// Standalone 独立模式
builder := pitaya.NewDefaultBuilder(true, "chat", pitaya.Standalone, map[string]string{}, *conf)
// Cluster 集群模式 需要 etcd, nats, 看情况还有可能需要开启redis
//builder := pitaya.NewDefaultBuilder(true, "chat", pitaya.Cluster, map[string]string{}, *conf)
builder.AddAcceptor(acceptor.NewTCPAcceptor(":3251"))
builder.Groups = groups.NewMemoryGroupService(builder.Config.Groups.Memory)
app = builder.Build()
defer app.Shutdown()
err := app.GroupCreate(context.Background(), "room")
if err != nil {
panic(err)
}
// rewrite component and handler name
room := NewRoom(app)
app.Register(room,
component.WithName("room"),
component.WithNameFunc(strings.ToLower),
)
app.Start()
}
客户端部分¶
- 在 https://blog.csdn.net/weixin_44627989/article/details/130355014 上的代码基础上进行的修改完成
客户端的设计
Control Control main.gd
client Node client.gd
txtContent TextEdit
txtInput TextEdit
txtNickName LineEdit
btnConnected Button
btnSend Button
txtHost LineEdit
txtPort LineEdit
客户端代码设计3个脚本¶
main.gd¶
extends Control
@onready var client: Client = $client
@onready var txt_port: LineEdit = $txtPort
@onready var txt_host: LineEdit = $txtHost
@onready var txt_input: TextEdit = $txtInput
@onready var txt_nick_name: LineEdit = $txtNickName
@onready var txt_content: TextEdit = $txtContent
@onready var txt_rtlabel: RichTextLabel = $txtRTLabel
@onready var btn_connected: Button = $btnConnected
@onready var btn_send: Button = $btnSend
func _ready() -> void:
btn_send.disabled = true;
pass
func _on_btn_connected_pressed() -> void:
var host = txt_host.text;
var port = int(txt_port.text);
client.connect_server(host, port, _rev_connected, _rev_msg);
btn_connected.disabled = true;
var tween:Tween = create_tween();
tween.tween_interval(2);
tween.tween_callback(func():
if (!client.is_connected_server()):
btn_connected.disabled = false;
)
pass # Replace with function body.
func _push_msg(uname:String, content:String)-> void:
var tmp_str:String = uname + ":" + content + "\n";
txt_content.insert_text_at_caret(tmp_str)
#txt_rtlabel.add_text(str);
pass
#----------------------------------------------`
func onNewUser(msg) -> void:
var data = msg.jsonData();
_push_msg('system', data.content);
pass
func onMembers(msg) -> void:
var data = msg.jsonData();
_push_msg('system', "members: " + JSON.stringify(data.members));
pass
func onJoin(succ:bool, msg:Variant, rt:String) -> void:
if (!succ):
print("err not get message rep:",msg, rt);
return;
var data = msg.jsonData();
if(data.code == 0):
_push_msg('system', data.result);
client.set_push_func('onMessage', onMessage);
#starx.on('onMessage', onMessage)
pass
func onMessage(msg) -> void:
var data = msg.jsonData();
_push_msg(data.name, data.content)
pass
func _rev_connected(succ:bool) -> void:
if (succ):
print("connect to server");
client.set_push_func("onNewUser", onNewUser);
client.set_push_func("onMembers", onMembers);
client.SendRequest("room.join", "{}", onJoin);
btn_send.disabled = false;
else:
btn_send.disabled = true;
btn_connected.disabled = false;
pass
func _rev_msg(msg) -> void:
print("--msg---:",msg);
pass
func _on_btn_send_pressed() -> void:
if (!client.is_connected_server()):
print(" not connected ")
return;
var nn = txt_nick_name.text;
var ctt = txt_input.text;
var data = JSON.stringify({"name": nn, "content":ctt }).to_utf8_buffer();
client.SendNotify('room.message', data);
pass # Replace with function body.
StreamTCP.gd¶
extends Node
class_name StreamTCP
####
## 从 复制下来进行的修改
## https://www.bytesnsprites.com/posts/2021/creating-a-tcp-client-in-godot/
###
signal connected
signal data
signal disconnected
signal error
var _status: int = 0
var _stream: StreamPeerTCP = StreamPeerTCP.new()
func _ready() -> void:
_status = _stream.get_status()
func _process(_delta: float) -> void:
var new_status: int = _stream.get_status()
if new_status != _status:
_status = new_status
match _status:
_stream.STATUS_NONE:
print("Disconnected from host.")
emit_signal("disconnected")
_stream.STATUS_CONNECTING:
print("Connecting to host.")
_stream.STATUS_CONNECTED:
print("Connected to host.")
emit_signal("connected")
_stream.STATUS_ERROR:
print("Error with socket stream.")
emit_signal("error")
if _status == _stream.STATUS_CONNECTED:
var available_bytes: int = _stream.get_available_bytes()
if available_bytes > 0:
#print("available bytes: ", available_bytes)
var buf_array: Array = _stream.get_partial_data(available_bytes)
# Check for read error.
if buf_array[0] != OK:
print("Error getting data from stream: ", buf_array[0])
emit_signal("error")
else:
emit_signal("data", buf_array[1])
func is_connected_server() -> bool:
var state:int = _stream.get_status();
return state == _stream.STATUS_CONNECTED;
func connect_to_host(host: String, port: int) -> void:
print("Connecting to %s:%d" % [host, port])
# Reset status so we can tell if it changes to error again.
_status = _stream.STATUS_NONE
if _stream.connect_to_host(host, port) != OK:
print("Error connecting to host.")
emit_signal("error")
else:
#_stream.set_no_delay(true)
_stream.poll();
func disconnect_from_host() -> void:
if (is_connected_server()):
_stream.disconnect_from_host();
func send(buff: PackedByteArray) -> bool:
if _status != _stream.STATUS_CONNECTED:
print("Error: Stream is not currently connected.")
return false
var err: int = _stream.put_data(buff)
if err != OK:
print("Error writing to stream: ", err)
return false
return true
client.gd¶
extends Node
class_name Client
#/**
#* Package protocol encode.
#*
#* Pomelo package format:
#* +------+-------------+------------------+
#* | type | body length | body |
#* +------+-------------+------------------+
#*
#* Head: 4bytes
#* 0: package type,
#* 1 - handshake,
#* 2 - handshake ack,
#* 3 - heartbeat,
#* 4 - data
#* 5 - kick
#* 1 - 3: big-endian body length
#* Body: body length bytes
#*
#* @param {Number} type package type
#* @param {ByteArray} body body content in bytes
#* @return {ByteArray} new byte array that contains encode result
#*/
# 5种 基本类型消息
const PACK_TYPE_HANDSHAKE:int = 0x01
const PACK_TYPE_HANDSHAKEACK:int = 0x02
const PACK_TYPE_HEARTBEAT:int = 0x03
const PACK_TYPE_DATA:int = 0x04
const PACK_TYPE_KICK:int = 0x05
const MSG_TYPE_REQUEST:int = 0x00
const MSG_TYPE_NOTIFY:int = 0x01
const MSG_TYPE_RESPONSE:int = 0x02
const MSG_TYPE_PUSH:int = 0x03
const MSG_MASK_ERROR:int = 0x20
const MSG_MASK_GZIP:int = 0x10
const MSG_MASK_MSGROUTECOMPRESS:int = 0x01
const MSG_MASK_MSGTYPE:int = 0x07
const MSG_MASK_SGROUTELENGTH:int = 0xFF
const MSG_MASK_MSGHEADLENGTH:int = 0x02
const HeadLength:int = 4
const MaxPacketSize:int = 1 << 24 #16MB
var _wait_cb_array = {}
var lastMsgId = 0
var clientHandshakeData = null
var packet_encoder:PomeloPacketEncoder = null
var packet_decoder:PomeloPacketDecoder = null
var message_encoder:MessagesEncoder = null
var compression:Compression = null
var heartbeatTimeout = -1
var requestTimeout = 5
var serializerType = "json";
var push_func = {}
var _is_handshake = false;
var _is_client_connected = false
const RECONNECT_TIMEOUT: float = 3.0
var _host:String = "127.0.0.1"
var _port:int = 40000;
#--------------------------------------------------------------
#--------------------------------------------------------
# 附加回调
var _cb_connected_func = null;
var _cb_msg_func = null;
var heartbeatTimer:Timer = null;
func is_handshaked() -> bool:
return _is_handshake;
func is_connected_server() -> bool:
return _is_client_connected;
func set_push_func(key:String, cb:Callable) -> void:
push_func[key] = cb;
func del_push_func(key) -> bool:
if (push_func.has(key)):
push_func.erase(key);
return true;
return false;
func connect_server(host:String, port:int, cbconnected:Callable, cbmsg:Callable):
_cb_connected_func = cbconnected;
_cb_msg_func = cbmsg;
_host = host;
_port = port;
_stream.connect_to_host(_host, _port);
#--------------------------------------------------------------
#--------------------------------------------------------------
var _stream: StreamTCP = StreamTCP.new()
func _ready() -> void:
print("connect ready")
compression = Compression.new()
packet_encoder = PomeloPacketEncoder.new()
packet_decoder = PomeloPacketDecoder.new()
message_encoder = MessagesEncoder.new()
message_encoder.compression = compression
heartbeatTimer = Timer.new();
heartbeatTimer.one_shot = false;
heartbeatTimer.autostart = false;
heartbeatTimer.timeout.connect(_handle_Heartbeats_send);
add_child(heartbeatTimer);
_stream.connect("connected", _handle_client_connected)
_stream.connect("disconnected", _handle_client_disconnected)
_stream.connect("error", _handle_client_error)
_stream.connect("data", _handle_client_data)
add_child(_stream)
func _connect_after_timeout(timeout: float) -> void:
await get_tree().create_timer(timeout).timeout # Delay for timeout
_stream.connect_to_host(_host, _port)
func _handle_client_connected() -> void:
print("Client connected to server.")
_send_handshake();
func _handle_client_data(buf: PackedByteArray) -> void:
#print("rev data len:",len(buf));
_try_read_packs(buf);
func _handle_client_disconnected() -> void:
print("Client disconnected from server.")
_done_client_disconcted();
#_connect_after_timeout(RECONNECT_TIMEOUT) # Try to reconnect after 3 seconds
func _handle_client_error() -> void:
print("Client error.")
_done_client_disconcted();
#_connect_after_timeout(RECONNECT_TIMEOUT) # Try to reconnect after 3 seconds
func _send_handshake() -> void:
var hand_shake_data = {"sys":{"clientVersion":"1.0", "clientBuildNumber":"999","platform":"pc"}};
var datstr = JSON.stringify(hand_shake_data)
var p = packet_encoder.Encode(PACK_TYPE_HANDSHAKE, datstr.to_utf8_buffer())
if p == null:
print("net connect fail")
return
_stream.send(p)
func _try_read_packs(buf:PackedByteArray) -> void:
var packets = packet_decoder.Decode(buf)
if packets == null:
print("net read fail len:" + str(len(buf)))
return;
if (!is_handshaked()):
var handshakePacket:Packet = packets[0]
if (handshakePacket.Type != PACK_TYPE_HANDSHAKE):
print("------ get something:", handshakePacket.Type);
return;
print("get handshake type:", handshakePacket.Type)
if compression.IsCompressed(handshakePacket.Data):
handshakePacket.Data = compression.InflateData(handshakePacket.Data)
if handshakePacket.Data == null:
print("comp err")
return
var handshake = handshakePacket.jsonData();
print("got handshake from sv, data: ", handshake)
var routeDict = {}
if (handshake["sys"]):
var sys = handshake["sys"];
if (sys["dict"]):
routeDict = sys["dict"];
if (sys["heartbeat"]):
heartbeatTimeout = sys["heartbeat"];
if (sys["serializer"]):
serializerType = sys["serializer"];
message_encoder.SetDictionary(routeDict)
var p = packet_encoder.Encode(PACK_TYPE_HANDSHAKEACK, PackedByteArray())
if p == null:
return
_stream.send(p)
_is_handshake = true;
_is_client_connected = true
heartbeatTimer.wait_time = heartbeatTimeout;
heartbeatTimer.start();
if (_cb_connected_func):
_cb_connected_func.call(true);
return;
print(" rev pack num:",packets.size());
for p in packets:
_handle_packets(p)
pass
#--------------------------------------------------------------
func _newMsgID():
lastMsgId += 1
return lastMsgId
#SendRequest sends a request to the server
func SendRequest(route:String, data:String, cb):
var m = Message.new()
m.Type = MSG_TYPE_REQUEST
m.ID = _newMsgID()
m.Route = route
m.Data = data.to_utf8_buffer();
m.SendAt = int(Time.get_unix_time_from_system())
m.Callback = cb
var p = _buildPacket(m)
_wait_cb_array[m.ID] = m
print("--- SendRequest route:", route, " msgid:", m.ID);
_stream.send(p)
#SendNotify sends a notify to the server
func SendNotify(route:String, data:PackedByteArray):
var m = Message.new()
m.Type = MSG_TYPE_NOTIFY
m.ID = _newMsgID()
m.Route = route
m.Data = data
m.SendAt = int(Time.get_unix_time_from_system())
var p = _buildPacket(m)
print("--- SendNotify route:", route, " msgid:", m.ID);
_stream.send(p)
func _buildPacket(msg:Message):
var encMsg = message_encoder.Encode(msg)
var p = packet_encoder.Encode(PACK_TYPE_DATA, encMsg)
return p
func _handle_Heartbeats_send() -> void:
var p = packet_encoder.Encode(PACK_TYPE_HEARTBEAT, PackedByteArray())
_stream.send(p)
func _handle_packets(p):
if p.Type == PACK_TYPE_DATA:
#handle data
var m = message_encoder.Decode(p.Data)
_handle_rev_one_packet(m)
elif p.Type == PACK_TYPE_KICK:
_done_client_disconcted();
print("kick from server");
elif p.Type == PACK_TYPE_HEARTBEAT:
print("------ heartbeat----");
pass
else:
print("-- un oper pack:", p.Type);
func _requestReaper() -> void:
if (!is_connected_server()):
return;
for msgid in _wait_cb_array.keys():
var msg:Message = _wait_cb_array[msgid];
if (int(Time.get_unix_time_from_system()) - msg.SendAt > requestTimeout):
_wait_cb_array.erase(msgid);
_handle_timeout_msg(msg);
func _handle_rev_one_packet(m:Message):
if m.Type == MSG_TYPE_RESPONSE:
print("--- respones msgid:", m.ID, " Route:", m.Route);
if m.ID in _wait_cb_array:
var cb_msg = _wait_cb_array[m.ID]
_wait_cb_array.erase(m.ID)
cb_msg.Callback.call(true, m, "success")
else:
_handle_push_msg(m)
func _handle_push_msg(msg:Message):
print(" _handle_push_msg ...msg.Route :",msg.Route, " type:" , msg.Type)
var cb_func = push_func.get(msg.Route)
if cb_func:
cb_func.call(msg)
elif (_cb_msg_func):
_cb_msg_func.call(msg);
func _handle_timeout_msg(msg:Message):
msg.Callback.call(false, msg, " message request is time out")
func _process(_delta: float) -> void:
_requestReaper();
func _done_client_disconcted() -> void:
_is_handshake = false;
_is_client_connected = false
_stream.disconnect_from_host();
heartbeatTimer.stop();
if (_cb_connected_func):
_cb_connected_func.call(false);
func _exit_tree():
_done_client_disconcted();
#--------------------------------------------------------------
# 可以在下面的位置好到对应的编码解码以及消息的对应处理
# github.com/topfreegames/pitaya/v2@v2.11.21/conn
#--------------------------------------------------------------
class Message:
var Type:int = 0
var ID:int = 0
var Route:String = ""
var Data:PackedByteArray = PackedByteArray()
var compressed:bool = false
var Err:bool = false
var SendAt:int = 0
var Callback = null
func jsonData() -> Variant:
return JSON.parse_string(Data.get_string_from_utf8());
class Packet:
var Type:int = -1
var Length:int = -1
var Data:PackedByteArray = PackedByteArray()
func jsonData() -> Variant:
return JSON.parse_string(Data.get_string_from_utf8());
class PomeloPacketEncoder:
#Encode create a packet.Packet from the raw bytes slice and then encode to network bytes slice
#Protocol refs: https://github.com/NetEase/pomelo/wiki/Communication-Protocol
#
# -<type>-|--------<length>--------|-<data>-
# --------|------------------------|--------
# 1 byte packet type, 3 bytes packet data length(big end), and data segment
## return null or PackedByteArray
func Encode(typ:int, data:PackedByteArray):
if typ < PACK_TYPE_HANDSHAKE || typ > PACK_TYPE_KICK:
return null
if len(data) > MaxPacketSize:
return null
var buf = PackedByteArray()
buf.append(typ)
buf.append_array(IntToBytes(len(data)))
buf.append_array(data)
return buf
func IntToBytes(n):
var buf = PackedByteArray()
buf.append((n >> 16) & 0xFF)
buf.append((n >> 8) & 0xFF)
buf.append(n & 0xFF)
return buf
class PomeloPacketDecoder:
func BytesToInt(b):
var result = 0
for bytev in b:
result = int(result<<8) + int(bytev)
return result
func ParseHeader(header):
if len(header) != HeadLength:
return null
var typ = header[0]
if typ < PACK_TYPE_HANDSHAKE || typ > PACK_TYPE_KICK:
print("err header nu" + str(typ))
return null
var size = BytesToInt(header.slice(1))
if size > MaxPacketSize:
print("err size" + str(size))
return null
return [size, typ]
#Decode decode the network bytes slice to packet.Packet(s)
## return null or [Packet]
func Decode(buf):
# check length
if len(buf) < HeadLength:
print("err header 0 ")
return null
# first time
var header = buf.slice(0, HeadLength)
buf = buf.slice(HeadLength) if len(buf) > HeadLength else []
var info = ParseHeader(header)
if info == null:
print("err header 1 " + str(len(header)))
return null
var size = info[0]
var typ = info[1]
var packets = []
while size <= len(buf):
var p = Packet.new()
p.Type = typ
p.Length = size
packets.append(p)
if size > 0:
p.Data = buf.slice(0, size)
buf = buf.slice(size) if size < len(buf) else []
if len(buf) < HeadLength:
break
header = buf.slice(0, HeadLength)
buf = buf.slice(HeadLength) if HeadLength < len(buf) else []
info = ParseHeader(header)
if info == null:
break
size = info[0]
typ = info[1]
return packets
class MessagesEncoder:
var DataCompression = false
var compression = null
var routes = {}#压缩字典
var codes = {}
func SetDictionary(dict):
if dict == null:
return null
#routesCodesMutex.Lock()
#defer routesCodesMutex.Unlock()
for route in dict:
var code = dict[route]
var r = route.replace(" ", "")
#duplication check
if r in routes:
return -1
if code in codes:
return -2
#update map, using last value when key duplicated
routes[r] = code
codes[code] = r
return null
#GetDictionary gets the routes map which is used to compress route.
func GetDictionary() -> Dictionary:
#routesCodesMutex.RLock()
#defer routesCodesMutex.RUnlock()
var dict = {}
for k in routes:
dict[k] = routes[k]
return dict
func routable(t) -> bool:
return t == MSG_TYPE_REQUEST || t == MSG_TYPE_NOTIFY || t == MSG_TYPE_PUSH
func invalidType(t) -> bool:
return t < MSG_TYPE_REQUEST || t > MSG_TYPE_PUSH
func Uint16(b) -> int:
var b1 = b[1] & 0xFFFF
var b2 = (b[0] & 0xFFFF)<<8
return b1 | b2
#// Encode marshals message to binary format. Different message types is corresponding to
#// different message header, message types is identified by 2-4 bit of flag field. The
#// relationship between message types and message header is presented as follows:
#// ------------------------------------------
#// | type | flag | other |
#// |----------|--------|--------------------|
#// | request |----000-|<message id>|<route>|
#// | notify |----001-|<route> |
#// | response |----010-|<message id> |
#// | push |----011-|<route> |
#// ------------------------------------------
#// The figure above indicates that the bit does not affect the type of message.
#// See ref: https://github.com/topfreegames/pitaya/v2/blob/master/docs/communication_protocol.md
func Encode(message:Message):
if invalidType(message.Type):
return null
var buf:PackedByteArray = PackedByteArray()
var flag = (message.Type & 0xFF) << 1
#routesCodesMutex.RLock()
var compressed = message.Route in routes
var code = routes.get(message.Route)
#routesCodesMutex.RUnlock()
if compressed:
flag |= MSG_MASK_MSGROUTECOMPRESS
if message.Err:
flag |= MSG_MASK_ERROR
buf.append(flag)
if message.Type == MSG_TYPE_REQUEST || message.Type == MSG_TYPE_RESPONSE:
var n:int = message.ID
#variant length encode
while true:
var b = (n % 128) & 0xFF
n = int(n/128.0);
if n != 0:
buf.append(b+128)
else:
buf.append(b)
break
if routable(message.Type):
if compressed:
buf.append((code>>8)&0xFF)
buf.append(code&0xFF)
else:
buf.append(len(message.Route)&0xFF)
buf.append_array(message.Route.to_utf8_buffer())
if DataCompression:
var d = compression.DeflateData(message.Data)
if d == null:
return null
if len(d) < len(message.Data):
message.Data = d
buf[0] |= MSG_MASK_GZIP
buf.append_array(message.Data)
return buf
# github.com/topfreegames/pitaya/v2@v2.11.21/conn
# Decode unmarshal the bytes slice to a message
# See ref: https://github.com/topfreegames/pitaya/v2/blob/master/docs/communication_protocol.md
func Decode(data) -> Message:
if len(data) < MSG_MASK_MSGHEADLENGTH:
return null
var m = Message.new()
var flag = data[0]
var offset = 1
m.Type = (0xFF & (flag >> 1)) & MSG_MASK_MSGTYPE
if invalidType(m.Type):
return null
if m.Type == MSG_TYPE_REQUEST || m.Type == MSG_TYPE_RESPONSE:
var id = 0
# little end byte order
# WARNING: must can be stored in 64 bits integer
# variant length encode
for i in range(offset, len(data)):
var b = data[i];
id += (b&0x7F) << int(7*(i-offset));
if b < 128:
offset = i + 1
break
#var i = offset
#while i < len(data):
#var b = data[i]
#id += (b&0x7F) * int(pow(2, (7 *i)))
#if b < 128:
#offset = i + 1
#break
#i += 1
m.ID = id
m.Err = flag&MSG_MASK_ERROR == MSG_MASK_ERROR
if routable(m.Type):
if flag&MSG_MASK_MSGROUTECOMPRESS == 1:
m.compressed = true
var code = Uint16(data.slice(offset, offset + 2))
#routesCodesMutex.RLock()
var route = codes.get(code, null)
#routesCodesMutex.RUnlock()
if route == null:
return null
m.Route = route
offset += 2
else:
m.compressed = false
var rl = data[offset]
offset += 1
var valb = data.slice(offset, offset + int(rl))
m.Route = valb.get_string_from_utf8()
offset += int(rl)
m.Data = data.slice(offset)
if flag&MSG_MASK_GZIP == MSG_MASK_GZIP:
m.Data = compression.InflateData(m.Data)
if m.Data == null:
return null
return m
class Compression:
func DeflateData(_data) -> PackedByteArray:
var bb = PackedByteArray()
#未实现
return bb
func InflateData(_data) -> PackedByteArray:
var bb = PackedByteArray()
#未实现
return bb
func IsCompressed(data) -> bool:
return len(data) > 2 &&(
# zlib
(data[0] == 0x78 &&
(data[1] == 0x9C ||
data[1] == 0x01 ||
data[1] == 0xDA ||
data[1] == 0x5E)) ||
#gzip
(data[0] == 0x1F && data[1] == 0x8B))