服务器主逻辑代码的重构

时间:2014-04-29 16:48:52   收藏:0   阅读:491

      不知道前主程是处于什么目的,总之我接手这套程序的时候,出现了超级多的问题,也发现了超级多的问题。

     比如说吧,接受网络消息逻辑是线程独立的,而发送消息给客户端缺阻塞在了逻辑线程里面;原本可以放在一个进程里面处理的逻辑,却分散在了四个进程里面去处理,导致我完成一个功能,大部分时间要话费了进程之间的玩家信息的同步上面,在我无法忍受的情况下,我终于是用NIO将网络底层从新写了,而且将四个进程合并,但是在很多逻辑上还是尽量保持了和原逻辑处理的兼容!

      先说说这个重构的底层吧!

     我们看下最重要的ClientHandle类,主要处理每个连接的收发数据的!

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ClientHandle implements ISession
{
    public final static int RW_BUFFER_SIZE = 1024;
    private SocketChannel socket = null;
     
    private java.nio.ByteBuffer reader = java.nio.ByteBuffer.allocate(RW_BUFFER_SIZE);
    private java.nio.ByteBuffer writer = java.nio.ByteBuffer.allocate(4*RW_BUFFER_SIZE);
     
    BlockingQueue<ByteBuffer> writeQueue = new LinkedBlockingQueue<ByteBuffer>();
     
    private IPlayer player = null;
     
    private boolean active = false;

   包含SocketChannel对象不用说了,reader和writer是用来做消息收发的缓冲的,因为服务器广播的压力会大一些,所以将writer的大小设置为reader的4倍,当然这个可以调整。

     writeQueue是用来存储需要发送给客户端的ByteBuffer,每次在这个链接可以写数据的时候,就将writeQueue里面存储的数据转移到writer中,并且一次发送,减少了writer的系统调用次数。ByteBuffer的结构简单说下,不同于java.nio.ByteBuffer,而是自己封装的一个消息解析器,给出源代码

   下面看下 ClientHandle的可读逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public int handleRead() throws IOException
    {
        int r = this.socket.read(this.reader);
        if(r <= 0)
        {
            return -1;
        }
        this.reader.flip();
         
        ByteBuffer data = this.createBuffer();
        while(data != null)
        {
            this.reader.get(data.getByteArray(), data.top(), data.capacity());
            this.processData(data);
            data = this.createBuffer();
        }
         
        this.reader.clear();
        return 0;
    }

  依次将数据读入到reader中,并且按照LC(L表示长度,C表示内容)结构将reader中的数据解析成一个个ByteBuffer对象处理。下面是createBuffer函数和processData函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private ByteBuffer createBuffer()
    {
        if(reader.remaining() < 4)
        {
            return null;
        }
         
        int len = reader.getInt();
        if(len > reader.remaining())
        {
            reader.clear();
            return null;
        }
         
        if (len > 0 && len <= 10 * 1024)
        {
            return new ByteBuffer(len);
        }
         
        return null;
    }
 
public void processData(ByteBuffer data)
{
       player.insertData(data);
}

   这里要注意,1:解析reader中的消息一定要做容错处理;2:将解析的待处理包放到玩家身上,让玩家自己处理!

     发送函数的处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public int handleWrite() throws IOException
    {
        ByteBuffer data = writeQueue.poll();
        while(data != null)
        {
            this.writer.putInt(data.length());
            this.writer.put(data.toByteArray(), 0, data.length());
            data = writeQueue.poll();
        }
         
        this.writer.flip();
        if(!this.writer.hasRemaining())
        {
            this.writer.limit(this.writer.capacity());
            return 0;
        }
         
        this.socket.write(writer);
         
        if(this.writer.hasRemaining())
        {
            this.writer.compact();
            this.writer.position(this.writer.limit());
            this.writer.limit(this.writer.capacity());
        }
        else
        {
            this.writer.compact();
            this.writer.limit(this.writer.capacity());
        }
         
        return 0;
    }

  发送函数的处理相对复杂些,首先要做的就是每个连接的发送函数每100ms(可以调整)触发一次,每次触发时候,要将待发送的数据包bytebuffer填充到writer缓冲区,然后一次发送!

     管理协调这些链接的新建和处理都是使用了java nio的selector结构,具体的代码就不贴出来了,想要的可以联系我,需要注意的有两点,1:对于空闲连接的处理,2:对于发送数据的处理

     大致讲完了网络线程,那么讲一讲主逻辑线程,逻辑线程采用线程绑定地图的设计;在服务器启动之时,启动n(可以调整)个地图线程,每个地图线程绑定N(可以调整)个地图,这N个地图上的所有玩家的逻辑处理,都有地图所在线程来处理,具体处理方式:

     地图线程的主逻辑:

  场景Scene的心跳函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Scene implements IScene
{
        public void beatHeart()
    {
        long now = System.currentTimeMillis();
        List<IPlayer> players = null;
        synchronized (idPlayerMap)
        {
            players = new ArrayList<IPlayer>(idPlayerMap.values());  
        }
         
        for(IPlayer player : players)
        {
            player.beatHeart(now);
        }
         
    }
         
}

  玩家的心跳函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
BlockingQueue<ByteBuffer> dataToProcess = new LinkedBlockingDeque<ByteBuffer>();
     
    public void insertData(ByteBuffer data)
    {
        this.dataToProcess.offer(data);
    }
 
public void beatHeart(long now)
{
         
        ByteBuffer data = this.dataToProcess.poll();
        while(data != null)
        {
            this.processData(data);
            data = this.dataToProcess.poll();
        }
                //.....处理心跳定时器,上一篇有讲到
}

  好了,大概的服务器的主逻辑就这些了,是不是精简小巧。晚上的时候还做了一下广播压力测试,效果还不错!

     欢迎大家提出宝贵意见!

服务器主逻辑代码的重构,码迷,mamicode.com

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!