一个基于 Netty + Protobuf 构建的高性能 TCP 网关开源组件
您目前处于:开发&语言  -  Java  2017年08月18日  阅读 3050

TCP 网关

本文将为大家介绍一个基于 Netty + Protobuf 构建的高性能 TCP 网关开源组件。该组件部署业务化运行2年以上,实现TCP 双向通道通信,维持高并发在线长连接,优化传输字节码等。


安装

从 GitHub(https://github.com/linkedkeeper/tcp-gateway)克隆这个工程,并将它作为一个依赖包添加到 Maven 项目中。


使用

1. 创建 TCP 服务

基于 Spring 配置文件:spring-tcp-server.xml 启动服务器

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
	               http://www.springframework.org/schema/beans/spring-beans.xsd"
       default-autowire="byName">

    <!-- tcp server config start. -->
    <bean id="tcpServer" class="com.linkedkeeper.tcp.connector.tcp.server.TcpServer" init-method="init"
          destroy-method="shutdown">
        <!-- port is tcp server port -->
        <property name="port" value="2000"/>
    </bean>
    <bean id="tcpSessionManager" class="com.linkedkeeper.tcp.connector.tcp.TcpSessionManager">
        <property name="maxInactiveInterval" value="500"/>
        <!-- you can add listener to listen session event, include session create, destroy and so on. -->
        <property name="sessionListeners">
            <list>
                <ref bean="logSessionListener"/>
            </list>
        </property>
    </bean>
    <!-- logSessionListener is related tcpSessionManager, those listener should implements SessionListener -->
    <bean id="logSessionListener" class="com.linkedkeeper.tcp.connector.api.listener.LogSessionListener"/>
    <!-- tcp sender is a container that can send message to client from server -->
    <bean id="tcpSender" class="com.linkedkeeper.tcp.remoting.TcpSender">
        <constructor-arg ref="tcpConnector"/>
    </bean>
    <!-- server config is combine the config, don't modify -->
    <bean id="serverConfig" class="com.linkedkeeper.tcp.connector.tcp.config.ServerTransportConfig">
        <constructor-arg ref="tcpConnector"/>
        <constructor-arg ref="proxy"/>
        <constructor-arg ref="notify"/>
    </bean>
    <!-- tcp connector is container that manage the connection between server and client -->
    <bean id="tcpConnector" class="com.linkedkeeper.tcp.connector.tcp.TcpConnector" init-method="init"
          destroy-method="destroy"/>
    <!-- notify proxy is proxy that implement send notify to client -->
    <bean id="notify" class="com.linkedkeeper.tcp.notify.NotifyProxy">
        <constructor-arg ref="tcpConnector"/>
    </bean>
    <!-- default tcp server config end. -->
    
    <!-- this proxy is your proxy that can receive message from client -->
    <bean id="proxy" class="com.linkedkeeper.tcp.server.TestSimpleProxy"/>
</beans>

说明:

  • TcpServer:提供 TCP 连接的服务

  • TcpSessionManager:你可以添加监听事件,用于监听 TCP 会话的创建、销毁等

  • LogSessionListener:一个日志监听器,它和 tcpSessionManager 关联,监听器必须事先 SessionListener

  • TcpSender:TCP 发送者,用于向客户端发送通知消息,实现下行逻辑

  • ServerConfig:TCP 的配置管理类

  • TcpConnector:TCP 容器,用于管理服务端和客户端的连接

  • NotifyProxy:发送通知的代理类

上面的配置都是默认的,你可以不更改,但是你可能需要换个 TCP 端口。


例子1 创建测试代理用于从客户端接收消息

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.linkedkeeper.tcp.connector.tcp.codec.MessageBuf;
import com.linkedkeeper.tcp.data.Login;
import com.linkedkeeper.tcp.data.Protocol;
import com.linkedkeeper.tcp.invoke.ApiProxy;
import com.linkedkeeper.tcp.message.MessageWrapper;
import com.linkedkeeper.tcp.message.SystemMessage;

public class TestSimpleProxy implements ApiProxy {

    public MessageWrapper invoke(SystemMessage sMsg, MessageBuf.JMTransfer message) {
        ByteString body = message.getBody();

        if (message.getCmd() == 1000) {
            try {
                Login.MessageBufPro.MessageReq messageReq 
                    = Login.MessageBufPro.MessageReq.parseFrom(body);
                if (messageReq.getCmd().equals(Login.MessageBufPro.CMD.CONNECT)) {
                    return new MessageWrapper(MessageWrapper.MessageProtocol.CONNECT, 
                        message.getToken(), null);
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        } else if (message.getCmd() == 1002) {
            try {
                Login.MessageBufPro.MessageReq messageReq 
                    = Login.MessageBufPro.MessageReq.parseFrom(body);
                if (messageReq.getCmd().equals(Login.MessageBufPro.CMD.HEARTBEAT)) {
                    MessageBuf.JMTransfer.Builder resp = Protocol.generateHeartbeat();
                    return new MessageWrapper(MessageWrapper.MessageProtocol.HEART_BEAT, 
                        message.getToken(), resp);
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
        return null;
    }
}

输入参数

  • SystemMessage:TcpServer 构建的参数,包括远端地址,本地地址等

  • MessageBuf.JMTransfer:这个很重要,这是由 Protobuf 创建的,它包括 header 和 body,header 是系统及参数,你可以从项目中看到具体参数,body 是 protobuf 序列化的字节码,也是有 protobuf 类生成

输出参数

  • MessageWrapper:这是一个消息响应的包装类,它包括 protocol,sessionId 和 body。body 是一个 bytes,它和 protobuf 对应。


例子2 发送通知到客户端

private NotifyProxy notify;

final int timeout = 10 * 1000;
final int NOTIFY = 3;

public boolean send(long seq, String sessionId, int cmd, ByteString body) throws Exception {
    boolean success = false;
    MessageBuf.JMTransfer.Builder builder = generateNotify(sessionId, seq, cmd, body);
    if (builder != null) {
        MessageWrapper wrapper = new MessageWrapper(MessageWrapper.MessageProtocol.NOTIFY, sessionId, builder);
        int ret = notify.notify(seq, wrapper, timeout);
        if (ret == Constants.NOTIFY_SUCCESS) {
            success = true;
        } else if (ret == Constants.NOTIFY_NO_SESSION) {
            /** no session on this machine **/
            success = true;
        }
    } else {
        /** no session in the cache **/
        success = true;
    }
    return success;
}

/**
 * session
 */
final String VERSION = "version";
final String DEVICE_ID = "deviceId";
final String PLATFORM = "platform";
final String PLATFORM_VERSION = "platformVersion";
final String TOKEN = "token";
final String APP_KEY = "appKey";
final String TIMESTAMP = "timestamp";
final String SIGN = "sign";

/**
 * need session into redis, then when you notify you can get info from redis by session
 */
final Map<String, Map<String, Object>> testSessionMap = null;

protected MessageBuf.JMTransfer.Builder generateNotify(String sessionId, long seq, int cmd, ByteString body) 
    throws Exception {
    Map<String, Object> map = testSessionMap.get(sessionId);

    MessageBuf.JMTransfer.Builder builder = MessageBuf.JMTransfer.newBuilder();
    builder.setVersion(String.valueOf(map.get(VERSION)));
    builder.setDeviceId(String.valueOf(map.get(DEVICE_ID)));
    builder.setCmd(cmd);
    builder.setSeq(seq);
    builder.setFormat(NOTIFY);
    builder.setFlag(0);
    builder.setPlatform(String.valueOf(map.get(PLATFORM)));
    builder.setPlatformVersion(String.valueOf(map.get(PLATFORM_VERSION)));
    builder.setToken(String.valueOf(map.get(TOKEN)));
    builder.setAppKey(String.valueOf(map.get(APP_KEY)));
    builder.setTimeStamp(String.valueOf(map.get(TIMESTAMP)));
    builder.setSign(String.valueOf(map.get(SIGN)));
    builder.setBody(body);

    return builder;
}

2. 创建 TCP 客户端

支持 iOS,Android,C++ 等语言构建的客户端


3. 序列化 Protobuf

Java

/protobuf/protoc --proto_path=/protobuf/ --java_out=/protobuf/MessageBuf.proto

Object-C

protoc --plugin=/protobuf/protoc-gen-objc MessageBuf.proto --object_out="/protobuf/"


附件:

你可以点击 下载 protobuf 编译器


本文受原创保护,未经作者授权,禁止转载。 linkedkeeper.com (文/张松然)

如果觉得我的文章对您有用,请随意赞赏。您的支持将鼓励我继续创作!

赞赏支持
分享到: 更多
作者  张强  发布于 2017年12月13日  阅读 410
作者  张强  发布于 2017年12月04日  阅读 845
作者  张强  发布于 2017年11月20日  阅读 2364
Spring AOP是我们日常开发中经常使用的工具,常被用来做统一的日志、异常处理、监控等功能,使用方法在此不多赘述,有兴趣的读者可以自行去网上查阅资料进行学习,我们以注解的使用方式为例,分析其相关源码,其他方式大同小异。开启Spring AOP注解方式首先要配置标签,我们就以这个标签的解析作为入口来分析,这里需要读者对Spring自定义标签解析的过程有一定的了解,笔者后续也会出相关的文章。锁定A...
作者  张强  发布于 2017年11月06日  阅读 2959
Spring事务是我们日常工作中经常使用的一项技术,Spring提供了编程、注解、aop切面三种方式供我们使用Spring事务,其中编程式事务因为对代码入侵较大所以不被推荐使用,注解和aop切面的方式可以基于需求自行选择,我们以注解的方式为例来分析Spring事务的原理和源码实现。首先我们简单看一下Spring事务的使用方式,配置: 在需要开启事务的方法上加上@Transactiona...
作者  张松然  发布于 2017年11月05日  阅读 7998
分布式事务分布式系统的特性分布式事务的基本介绍常用的分布式技术说明理解2PC和3PC协议「点击阅读」分布式服务协调技术什么是ZookeeperZookeeper和CAP的关系Zookeeper节点特性及节点属性分析Zookeeper的实现原理分析Zookeeper实践,共享锁,Master选举「点击阅读」分布式消息技术Kafka的基本介绍Kafka的设计原理分析Kafka数据传输的事务特点Kafk...