Teleport是一个通用、高效、灵活的Socket框架。
可用于Peer-Peer对等通信、RPC、长连接网关、微服务、推送服务,游戏服务等领域。

性能测试
测试用例
- 一个服务端与一个客户端进程,在同一台机器上运行
- CPU: Intel Xeon E312xx (Sandy Bridge) 16 cores 2.53GHz
- Memory: 16G
- OS: Linux 2.6.32-696.16.1.el6.centos.plus.x86_64, CentOS 6.4
- Go: 1.9.2
- 信息大小: 581 bytes
- 信息编码:protobuf
- 发送 1000000 条信息
测试结果
- teleport
| 并发client | 平均值(ms) | 中位数(ms) | 最大值(ms) | 最小值(ms) | 吞吐率(TPS) |
|---|---|---|---|---|---|
| 100 | 1 | 0 | 16 | 0 | 75505 |
| 500 | 9 | 11 | 97 | 0 | 52192 |
| 1000 | 19 | 24 | 187 | 0 | 50040 |
| 2000 | 39 | 54 | 409 | 0 | 42551 |
| 5000 | 96 | 128 | 1148 | 0 | 46367 |
- teleport/socket
| 并发client | 平均值(ms) | 中位数(ms) | 最大值(ms) | 最小值(ms) | 吞吐率(TPS) |
|---|---|---|---|---|---|
| 100 | 0 | 0 | 14 | 0 | 225682 |
| 500 | 2 | 1 | 24 | 0 | 212630 |
| 1000 | 4 | 3 | 51 | 0 | 180733 |
| 2000 | 8 | 6 | 64 | 0 | 183351 |
| 5000 | 21 | 18 | 651 | 0 | 133886 |
- CPU耗时火焰图 teleport/socket

- 堆栈信息火焰图 teleport/socket

版本
| 版本 | 状态 | 分支 |
|---|---|---|
| v3 | release | |
| v2 | release | |
| v1 | release |
安装
go get -u -f github.com/henrylee2cn/teleport
特性
HeaderBodyHeaderBodyJSONProtobufstringtcptcp4tcp6unixunixpacket
代码示例
server.go
package main
import (
"fmt"
"time"
tp "github.com/henrylee2cn/teleport"
)
func main() {
srv := tp.NewPeer(tp.PeerConfig{
CountTime: true,
ListenAddress: ":9090",
})
srv.RoutePull(new(math))
srv.ListenAndServe()
}
type math struct {
tp.PullCtx
}
func (m *math) Add(args *[]int) (int, *tp.Rerror) {
if m.Query().Get("push_status") == "yes" {
m.Session().Push(
"/push/status",
fmt.Sprintf("%d numbers are being added...", len(*args)),
)
time.Sleep(time.Millisecond * 10)
}
var r int
for _, a := range *args {
r += a
}
return r, nil
}
client.go
package main
import (
tp "github.com/henrylee2cn/teleport"
)
func main() {
tp.SetLoggerLevel("ERROR")
cli := tp.NewPeer(tp.PeerConfig{})
defer cli.Close()
cli.RoutePush(new(push))
sess, err := cli.Dial(":9090")
if err != nil {
tp.Fatalf("%v", err)
}
var reply int
rerr := sess.Pull("/math/add?push_status=yes",
[]int{1, 2, 3, 4, 5},
&reply,
).Rerror()
if rerr != nil {
tp.Fatalf("%v", rerr)
}
tp.Printf("reply: %d", reply)
}
type push struct {
tp.PushCtx
}
func (p *push) Status(args *string) *tp.Rerror {
tp.Printf("server status: %s", *args)
return nil
}
框架设计
名称解释
Packet.Body
数据包内容
每个数据包的内容如下:
// in .../teleport/socket package
// Packet a socket data packet.
type Packet struct {
// Has unexported fields.
}
func GetPacket(settings ...PacketSetting) *Packet
func NewPacket(settings ...PacketSetting) *Packet
func (p *Packet) Body() interface{}
func (p *Packet) BodyCodec() byte
func (p *Packet) Context() context.Context
func (p *Packet) MarshalBody() ([]byte, error)
func (p *Packet) Meta() *utils.Args
func (p *Packet) Ptype() byte
func (p *Packet) Reset(settings ...PacketSetting)
func (p *Packet) Seq() string
func (p *Packet) SetBody(body interface{})
func (p *Packet) SetBodyCodec(bodyCodec byte)
func (p *Packet) SetNewBody(newBodyFunc NewBodyFunc)
func (p *Packet) SetPtype(ptype byte)
func (p *Packet) SetSeq(seq string)
func (p *Packet) SetSize(size uint32) error
func (p *Packet) SetUri(uri string)
func (p *Packet) SetUriObject(uriObject *url.URL)
func (p *Packet) Size() uint32
func (p *Packet) String() string
func (p *Packet) UnmarshalBody(bodyBytes []byte) error
func (p *Packet) Uri() string
func (p *Packet) UriObject() *url.URL
func (p *Packet) XferPipe() *xfer.XferPipe
// NewBodyFunc creates a new body by header.
type NewBodyFunc func(Header) interface{}
编解码器
数据包中Body内容的编解码器。
type Codec interface {
// Id returns codec id.
Id() byte
// Name returns codec name.
Name() string
// Marshal returns the encoding of v.
Marshal(v interface{}) ([]byte, error)
// Unmarshal parses the encoded data and stores the result
// in the value pointed to by v.
Unmarshal(data []byte, v interface{}) error
}
过滤管道
传输数据的过滤管道。
// XferFilter handles byte stream of packet when transfer.
type XferFilter interface {
// Id returns transfer filter id.
Id() byte
// Name returns transfer filter name.
Name() string
// OnPack performs filtering on packing.
OnPack([]byte) ([]byte, error)
// OnUnpack performs filtering on unpacking.
OnUnpack([]byte) ([]byte, error)
}
// Get returns transfer filter by id.
func Get(id byte) (XferFilter, error)
// GetByName returns transfer filter by name.
func GetByName(name string) (XferFilter, error)
// XferPipe transfer filter pipe, handlers from outer-most to inner-most.
// Note: the length can not be bigger than 255!
type XferPipe struct {
// Has unexported fields.
}
func NewXferPipe() *XferPipe
func (x *XferPipe) Append(filterId ...byte) error
func (x *XferPipe) AppendFrom(src *XferPipe)
func (x *XferPipe) Ids() []byte
func (x *XferPipe) Len() int
func (x *XferPipe) Names() []string
func (x *XferPipe) OnPack(data []byte) ([]byte, error)
func (x *XferPipe) OnUnpack(data []byte) ([]byte, error)
func (x *XferPipe) Range(callback func(idx int, filter XferFilter) bool)
func (x *XferPipe) Reset()
插件
运行过程中以挂载方式执行的插件。
type (
// Plugin plugin background
Plugin interface {
Name() string
}
// PreNewPeerPlugin is executed before creating peer.
PreNewPeerPlugin interface {
Plugin
PreNewPeer(*PeerConfig, *PluginContainer) error
}
...
)
通信协议
支持通过接口定制自己的通信协议:
type (
// Proto pack/unpack protocol scheme of socket packet.
Proto interface {
// Version returns the protocol's id and name.
Version() (byte, string)
// Pack writes the Packet into the connection.
// Note: Make sure to write only once or there will be package contamination!
Pack(*Packet) error
// Unpack reads bytes from the connection to the Packet.
// Note: Concurrent unsafe!
Unpack(*Packet) error
}
ProtoFunc func(io.ReadWriter) Proto
)
接着,你可以使用以下任意方式指定自己的通信协议:
func SetDefaultProtoFunc(socket.ProtoFunc)
type Peer interface {
...
ServeConn(conn net.Conn, protoFunc ...socket.ProtoFunc) Session
DialContext(ctx context.Context, addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
Dial(addr string, protoFunc ...socket.ProtoFunc) (Session, *Rerror)
Listen(protoFunc ...socket.ProtoFunc) error
...
}
FastProto
{4 bytes packet length}
{1 byte protocol version}
{1 bytes transfer pipe length}
{transfer pipe IDs}
# The following is handled data by transfer pipe
{4 bytes sequence length}
{sequence}
{1 byte packet type}
{4 bytes URI length}
{URI}
{4 bytes metadata length}
{metadata(urlencoded)}
{1 byte body codec id}
{body}
用法
Peer端点(服务端或客户端)示例
// Start a server
var peer1 = tp.NewPeer(tp.PeerConfig{
ListenAddress: "0.0.0.0:9090", // for server role
})
peer1.Listen()
...
// Start a client
var peer2 = tp.NewPeer(tp.PeerConfig{})
var sess, err = peer2.Dial("127.0.0.1:8080")
Pull-Controller-Struct 接口模板
type Aaa struct {
tp.PullCtx
}
func (x *Aaa) XxZz(args *<T>) (<T>, *tp.Rerror) {
...
return r, nil
}
- 注册到根路由:
// register the pull route: /aaa/xx_zz peer.RoutePull(new(Aaa)) // or register the pull route: /xx_zz peer.RoutePullFunc((*Aaa).XxZz)
Pull-Handler-Function 接口模板
func XxZz(ctx tp.PullCtx, args *<T>) (<T>, *tp.Rerror) {
...
return r, nil
}
- 注册到根路由:
// register the pull route: /xx_zz peer.RoutePullFunc(XxZz)
Push-Controller-Struct 接口模板
type Bbb struct {
tp.PushCtx
}
func (b *Bbb) YyZz(args *<T>) *tp.Rerror {
...
return nil
}
- 注册到根路由:
// register the push route: /bbb/yy_zz peer.RoutePush(new(Bbb)) // or register the push route: /yy_zz peer.RoutePushFunc((*Bbb).YyZz)
Push-Handler-Function 接口模板
// YyZz register the route: /yy_zz
func YyZz(ctx tp.PushCtx, args *<T>) *tp.Rerror {
...
return nil
}
- 注册到根路由:
// register the push route: /yy_zz peer.RoutePushFunc(YyZz)
Unknown-Pull-Handler-Function 接口模板
func XxxUnknownPull (ctx tp.UnknownPullCtx) (interface{}, *tp.Rerror) {
...
return r, nil
}
- 注册到根路由:
// register the unknown pull route: /* peer.SetUnknownPull(XxxUnknownPull)
Unknown-Push-Handler-Function 接口模板
func XxxUnknownPush(ctx tp.UnknownPushCtx) *tp.Rerror {
...
return nil
}
- 注册到根路由:
// register the unknown push route: /* peer.SetUnknownPush(XxxUnknownPush)
结构体(函数)名称映射到URI路径的规则:
AaBb/aa_bbAa_Bb/aa/bbaa_bb/aa/bbAa__Bb/aa_bbaa__bb/aa_bbABC_XYZ/abc/xyzABcXYz/abc_xyzABC__XYZ/abc_xyz
插件示例
// NewIgnoreCase Returns a ignoreCase plugin.
func NewIgnoreCase() *ignoreCase {
return &ignoreCase{}
}
type ignoreCase struct{}
var (
_ tp.PostReadPullHeaderPlugin = new(ignoreCase)
_ tp.PostReadPushHeaderPlugin = new(ignoreCase)
)
func (i *ignoreCase) Name() string {
return "ignoreCase"
}
func (i *ignoreCase) PostReadPullHeader(ctx tp.ReadCtx) *tp.Rerror {
// Dynamic transformation path is lowercase
ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
return nil
}
func (i *ignoreCase) PostReadPushHeader(ctx tp.ReadCtx) *tp.Rerror {
// Dynamic transformation path is lowercase
ctx.UriObject().Path = strings.ToLower(ctx.UriObject().Path)
return nil
}
注册以上操作和插件示例到路由
// add router group
group := peer.SubRoute("test")
// register to test group
group.RoutePull(new(Aaa), NewIgnoreCase())
peer.RoutePullFunc(XxZz, NewIgnoreCase())
group.RoutePush(new(Bbb))
peer.RoutePushFunc(YyZz)
peer.SetUnknownPull(XxxUnknownPull)
peer.SetUnknownPush(XxxUnknownPush)
配置信息
type PeerConfig struct {
Network string `yaml:"network" ini:"network" comment:"Network; tcp, tcp4, tcp6, unix or unixpacket"`
ListenAddress string `yaml:"listen_address" ini:"listen_address" comment:"Listen address; for server role"`
DefaultDialTimeout time.Duration `yaml:"default_dial_timeout" ini:"default_dial_timeout" comment:"Default maximum duration for dialing; for client role; ns,µs,ms,s,m,h"`
RedialTimes int32 `yaml:"redial_times" ini:"redial_times" comment:"The maximum times of attempts to redial, after the connection has been unexpectedly broken; for client role"`
DefaultBodyCodec string `yaml:"default_body_codec" ini:"default_body_codec" comment:"Default body codec type id"`
DefaultSessionAge time.Duration `yaml:"default_session_age" ini:"default_session_age" comment:"Default session max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
DefaultContextAge time.Duration `yaml:"default_context_age" ini:"default_context_age" comment:"Default PULL or PUSH context max age, if less than or equal to 0, no time limit; ns,µs,ms,s,m,h"`
SlowCometDuration time.Duration `yaml:"slow_comet_duration" ini:"slow_comet_duration" comment:"Slow operation alarm threshold; ns,µs,ms,s ..."`
PrintDetail bool `yaml:"print_detail" ini:"print_detail" comment:"Is print body and metadata or not"`
CountTime bool `yaml:"count_time" ini:"count_time" comment:"Is count cost time or not"`
}
通信优化
func SetPacketSizeLimit(maxPacketSize uint32)
func SetSocketKeepAlive(keepalive bool)
func SetSocketKeepAlivePeriod(d time.Duration)
func SetSocketNoDelay(_noDelay bool)
func SetSocketReadBuffer(bytes int)
func SetSocketWriteBuffer(bytes int)
扩展包
编解码器
import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"import "github.com/henrylee2cn/teleport/codec"
插件
import "github.com/henrylee2cn/teleport/plugin"import binder "github.com/henrylee2cn/tp-ext/plugin-binder"import heartbeat "github.com/henrylee2cn/tp-ext/plugin-heartbeat"import "github.com/henrylee2cn/teleport/plugin"import secure "github.com/henrylee2cn/tp-ext/plugin-secure"
协议
import "github.com/henrylee2cn/teleport/socketimport jsonproto "github.com/henrylee2cn/tp-ext/proto-jsonproto"import pbproto "github.com/henrylee2cn/tp-ext/proto-pbproto"
传输过滤器
import "github.com/henrylee2cn/teleport/xfer"import md5Hash "github.com/henrylee2cn/tp-ext/xfer-md5Hash"
其他模块
import cliSession "github.com/henrylee2cn/tp-ext/mod-cliSession"import websocket "github.com/henrylee2cn/tp-ext/mod-websocket"html "github.com/xiaoenai/ants/helper/mod-html"
基于Teleport的项目
| project | description |
|---|---|
| TP-Micro 是一个基于 Teleport 定制的、简约而强大的微服务框架 | |
| Ants 是一套基于 TP-Micro 和 Teleport 的、高可用的微服务平台解决方案 | |
| Pholcus(幽灵蛛)是一款纯Go语言编写的支持分布式的高并发、重量级爬虫软件,定位于互联网数据采集,为具备一定Go或JS编程基础的人提供一个只需关注规则定制的功能强大的爬虫工具 |
企业用户

开源协议
Teleport 项目采用商业应用友好的 Apache2.0 协议发布