twsited(4)–不同模块用redis共享以及用web发送数据到tcpserver

上一章开头我们说,要连接之前flask系列文章中的用户,结果篇幅不够,没有实现。

今天我们把它实现一下。话说,不同模块之间,该如何联系在一起,通常都是mysql、redis、rabbitmq还有RPC等,之所以着重讲redis,因为我太喜欢这个内存数据库了。small stong simple。这就跟我喜欢flask、tornado而不太喜欢django和twisted一样(以后我们着重讲tornado,源码比较简单直观,哈哈,虽然自己用着twisted,但真不喜欢twisted这种大型库,好多虚类的继承,太过于注重设计模式。好了,废话说到这。)

这篇文章的主要目的就如下图所示,不但在http和tcp之间共享数据,而且能在http端发送数据到tcpserver服务器,然后下发到tcpclient。

在twisted中访问redis,就是找一个redis版的twisted库就好了,在redis官网,有推荐,其源码在这 https://github.com/fiorix/txredisapi   我一直使用,很稳定,操作也极其简单。

我们就用这个库,利用之前flask系列中,login的时候,返回的token进行验证,把http模块和tcp模块联系到一起。服务器端代码如下:

frontTCP.py

复制代码
# coding:utf-8
from twisted.internet.protocol import Factory, Protocol
from twisted.internet import reactor, task, defer
import struct
import json
from twisted.python import log
import sys
import time
import txredisapi as redis
log.startLogging(sys.stdout)

REDIS_HOST = 'localhost'
REDIS_PORT = 6380
REDIS_DB = 4
REDIS_PASSWORD = 'dahai123'

redis_store = redis.lazyConnectionPool(dbid=4, host='localhost', port=6380, password='dahai123')

@defer.inlineCallbacks
def check_token(phone_number, token):
    token_in_redis = yield redis_store.hget('user:%s' % phone_number, 'token')
    if token != token_in_redis:
        defer.returnValue(False)
    else:
        defer.returnValue(True)

class Chat(Protocol):
    def __init__(self, factory):
        self.factory = factory
        self.phone_number = None
        self.state = "VERIFY"
        self.version = 0
        self.last_heartbeat_time = 0
        self.command_func_dict = {
            1: self.handle_verify,
            2: self.handle_single_chat,
            3: self.handle_group_chat,
            4: self.handle_broadcast_chat,
            5: self.handle_heartbeat
        }
        self._data_buffer = bytes()

    def connectionMade(self):
        log.msg("New connection, the info is:", self.transport.getPeer())

    def connectionLost(self, reason):
        log.msg("[%s]:断线" % self.phone_number.encode('utf-8'))
        if self.phone_number in self.factory.users:
            del self.factory.users[self.phone_number]

    def dataReceived(self, data):
        """
        接受到数据以后的操作
        """
        self._data_buffer += data

        while True:
            length, self.version, command_id = struct.unpack('!3I', self._data_buffer[:12])

            if length > len(self._data_buffer):
                return

            content = self._data_buffer[12:length]

            if command_id not in [1, 2, 3, 4, 5]:
                return

            if self.state == "VERIFY" and command_id == 1:
                self.handle_verify(content)

            if self.state == "DATA":
                self.handle_data(command_id, content)

            self._data_buffer = self._data_buffer[length:]

            if len(self._data_buffer) < 12:
                return

    def handle_heartbeat(self, content):
        """
        处理心跳包
        """
        self.last_heartbeat_time = int(time.time())

    @defer.inlineCallbacks
    def handle_verify(self, content):
        """
        验证函数
        """
        content = json.loads(content)
        phone_number = content.get('phone_number')
        token = content.get('token')

        result = yield check_token(phone_number, token)

        if not result:
            send_content = json.dumps({'code': 0})
            self.send_content(send_content, 101, [phone_number])
            length = 12 + len(send_content)
            version = self.version
            command_id = 101
            header = [length, version, command_id]
            header_pack = struct.pack('!3I', *header)
            self.transport.write(header_pack + send_content)
            return

        if phone_number in self.factory.users:
            log.msg("电话号码<%s>存在老的连接." % phone_number.encode('utf-8'))
            self.factory.users[phone_number].connectionLost("")
            self.factory.users.pop(phone_number)

        log.msg("欢迎, %s!" % (phone_number.encode('utf-8'),))
        self.phone_number = phone_number
        self.factory.users[phone_number] = self
        self.state = "DATA"

        send_content = json.dumps({'code': 1})

        self.send_content(send_content, 101, [phone_number])

    def handle_data(self, command_id, content):
        """
        根据command_id来分配函数
        """
        self.command_func_dict[command_id](content)

    def handle_single_chat(self, content):
        """
        单播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(send_content, 102, [chat_to])

    def handle_group_chat(self, content):
        """
        组播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_to = content.get('chat_to')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = chat_to
        self.send_content(send_content, 103, phone_numbers)

    def handle_broadcast_chat(self, content):
        """
        广播
        """
        content = json.loads(content)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        send_content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        phone_numbers = self.factory.users.keys()
        self.send_content(send_content, 104, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        发送函数
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.factory.users.keys():
                self.factory.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在线." % phone_number.encode('utf-8'))

class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self)

    def check_users_online(self):
        for key, value in self.users.items():
            if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
                log.msg("[%s]没有检测到心跳包,主动切断" % key.encode('utf-8'))
                value.transport.abortConnection()

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

reactor.listenTCP(8124, cf)
reactor.run()
复制代码

上一章有点地方有错误,这次在这一起更正了。比如users,在每个Protocol里面不保存,直接存储在Factory里面,每次引用的时候,直接去取就可以了;还有,如果经过验证之前,如果验证错误,users里面是没有连接的值的,只能self.transport.write(),不能通过send_content()来发送。

好了,上面2处修改了,并且在此基础上,加了一个访问redis的函数,非常简单,跟http访问一样,就是要注意异步化的问题。

再看看客户端的代码,客户端这边要获取token,首先要引用flask restful api 系列中,我们模拟的client.py,这次我也把它引用进来了,先登录,获取token,拿到token以后,再用tcpclient进行验证,其实这个在生产环境中也这么做的。

下面是frontClient.py的代码。

复制代码
# coding:utf-8
from twisted.internet import reactor, task
from twisted.internet.protocol import Protocol, ClientFactory
import struct
from twisted.python import log
import sys
import json
from client import API_1_1

log.startLogging(sys.stdout)

class EchoClient(Protocol):
    def __init__(self):
        self.command_func_dict = {
            101: self.handle_verify_s,
            102: self.handle_single_chat_s,
            103: self.handle_group_chat_s,
            104: self.handle_broadcast_chat_s
        }
        self.version = 0
        self.state = "VERIFY"
        self.phone_number = ""

    def connectionMade(self):
        log.msg("New connection", self.transport.getPeer())

    def dataReceived(self, data):
        length, self.version, command_id = struct.unpack('!3I', data[:12])
        content = data[12:length]
        if self.state == "VERIFY" and command_id == 101:
            self.handle_verify_s(content)
        else:
            self.handle_data(command_id, content)

    def handle_data(self, command_id, pack_data):
        self.command_func_dict[command_id](pack_data)

    def connectionLost(self, reason):
        log.msg("connection lost")

    def handle_verify_s(self, pack_data):
        """
        接受验证结果
        """
        content = json.loads(pack_data)
        code = content.get('code')
        if code == 1:
            log.msg('验证通过')
        else:
            log.msg('验证没有通过,请重新输入,程序暂停')
            reactor.stop()
        self.state = "Data"

    def handle_single_chat_s(self, pack_data):
        """
        接受单聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[单聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_group_chat_s(self, pack_data):
        """
        接受组聊
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[组聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def handle_broadcast_chat_s(self, pack_data):
        """
        接受广播
        """
        content = json.loads(pack_data)
        chat_from = content.get('chat_from')
        chat_content = content.get('chat_content')
        log.msg("[群聊][%s]:%s" % (chat_from.encode('utf-8'), chat_content.encode('utf-8')))

    def send_verify(self, phone_number, token):
        """
        发送验证
        """
        content = json.dumps(dict(phone_number=phone_number, token=token))
        self.send_data(content, 1)

    def send_single_chat(self, chat_from, chat_to, chat_content):
        """
        发送单聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 2)

    def send_group_chat(self, chat_from, chat_to, chat_content):
        """
        发送组聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        self.send_data(content, 3)

    def send_broadcast_chat(self, chat_from, chat_content):
        """
        发送群聊内容
        """
        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))
        self.send_data(content, 4)

    def send_data(self, send_content, command_id):
        """
        发送函数
        """
        length = 12 + len(send_content)
        version = self.version
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack + send_content)

    def send_heartbeat(self):
        """
        发送心跳包
        """
        length = 12
        version = self.version
        command_id = 5
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        self.transport.write(header_pack)

class EchoClientFactory(ClientFactory):
    def __init__(self):
        self.p = EchoClient()

    def startedConnecting(self, connector):
        log.msg("Started to connect")

    def buildProtocol(self, addr):
        log.msg("Connected.")
        return self.p

    def clientConnectionFailed(self, connector, reason):
        log.msg("Lost connection. Reason:", reason)

    def clientConnectionLost(self, connector, reason):
        log.msg("Connection failed. Reason:", reason)

if __name__ == '__main__':
    api = API_1_1()
    chat_from = sys.argv[1]
    chat_password = sys.argv[2]

    u = api.login(chat_from, chat_password)
    token = api.token

    cf = EchoClientFactory()
    chat_from = sys.argv[1]

    all_phone_numbers = ['13565208554', '13764408552', '1390854961g']
    all_phone_numbers.remove(chat_from)
    import random

    task_send_heartbeat = task.LoopingCall(cf.p.send_heartbeat)
    task_send_heartbeat.start(2, now=False)

    reactor.callLater(10, cf.p.send_verify, chat_from, token)
    reactor.callLater(20, cf.p.send_group_chat, chat_from, all_phone_numbers, '你好,这是10秒的时候发送')
    reactor.callLater(30, cf.p.send_group_chat, chat_from, all_phone_numbers, '你好,这是20秒的时候发送')

    reactor.connectTCP('192.168.5.60', 8124, cf)

    reactor.run()
复制代码

分别把之前项目中的账号拉出来运行一下吧。客户端的认证函数也改变了,先引用之前的api客户端,直接获取正确的token,把token拿来发给tcp服务器端,tcp服务器端再到redis里面去找,如果正确,就验证通过,否则,返回code=0给客户端,这时候服务器端的记录当前客户端状态还是未验证通过,因此下面的客户端再发其他请求,服务器端全部丢弃。这跟http的思想是一样的。

这是一个客户端的调试结果,看,一切都正常。

复制代码
yudahai@yudahaiPC:tcpserver$ python frontClient.py 13565208554 123456
2016-06-24 13:28:36+0800 [-] Log opened.
2016-06-24 13:28:36+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fe377d25440>
2016-06-24 13:28:36+0800 [-] Started to connect
2016-06-24 13:28:36+0800 [Uninitialized] Connected.
2016-06-24 13:28:36+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-24 13:28:46+0800 [EchoClient,client] 验证通过
2016-06-24 13:28:53+0800 [EchoClient,client] [组聊][13764408552]:你好,这是10秒的时候发送
2016-06-24 13:29:03+0800 [EchoClient,client] [组聊][13764408552]:你好,这是20秒的时候发送
复制代码

其实利用redis把http模块和tcp模块集合起来比较简单,但难的地方在于设计思想,如何很好的通过redis把几个模块联系起来。

上面其实本质上讲的就是如何通过redis来共享状态。

下面我们再深入一下,通过纯web端发送命令到后台,然后后台接受到以后,通过redis来做消息系统,本来这部分应该是rabbitmq的事,毕竟rabbitmq是专门做消息系统的。但简单的消息系统可以用redis做,redis中有个list模型,每个消息发送的时候从左边push进来,接受的时候从右边pop,这样就是一个简单的消息系统。这边用redis先做一个简单的,主要可以让大家非常直观的看到twisted如何作为消费者客户端运行的,下一章讲rabbitmq的时候,就更简单了。

好了,废话少说,现在之前flask restful api那一个系列的项目中,加一个web页面,进入web页面,只有一个按钮,输入内容,发送一次,就广播一次。

下面是代码和具体的页面。

先是原来的flask项目中,我们增加一个web蓝图,这个我在flask restful api的第七篇  http://www.cnblogs.com/yueerwanwan0204/p/5522749.html 中讲过,增加相应的文件夹web,然后在里面添加2个文件__init__.py,view.py。

结构图下下所示:

编辑原来的run.py文件,添加蓝图指向

复制代码
    from app_1_0 import api as api_1_0_blueprint
    app.register_blueprint(api_1_0_blueprint, url_prefix='/api/v1000')

    from api_1_1 import api as api_1_1_blueprint
    app.register_blueprint(api_1_1_blueprint, url_prefix='/api/v1100')

    from web import web as web_blueprint
    app.register_blueprint(web_blueprint, url_prefix='/web')
复制代码

然后在web/__init__.py下面添加蓝图对象

复制代码
# coding:utf-8
from flask import Blueprint

web = Blueprint('web', __name__)

from . import view
复制代码

web/view.py就跟简单了,就渲染一个页面,同时具有get和post方法

复制代码
# coding:utf-8
from flask import Flask, request, jsonify, g, render_template, redirect, url_for, session, current_app
from app.model import User, db_session
import json
from . import web

@web.teardown_request
def handle_teardown_request(exception):
    db_session.remove()

@web.route('/send-command', methods=['GET', 'POST'])
def send_command():
    if request.method == 'GET':
        users = User.query.all()
        return render_template('web/send-command.html', users=users)
    else:
        data = request.get_json()
        command_id = data.get('command_id')
        chat_from = '13764408552'
        chat_to = data.get('chat_to')
        chat_content = data.get('content')

        print data

        if not chat_to or not chat_content or not command_id:
            return jsonify({'code': 0, 'message': '信息不完整'})

        send_data = json.dumps(dict(command_id=command_id, chat_from=chat_from, chat_to=chat_to, chat_content=chat_content))
        current_app.redis.lpush('front_tcp', send_data)

        return jsonify({'code': 1, 'message': '发送成功'})
复制代码

get的时候,就渲染;post的时候,接受页面上传的数据。

页面就一个页面,主要使用ajax上传,由于本人好长时间没有开发html了,所以页面丑了一点,js代码也丑了一点,但是能用,以后有空,我优化一下,大家先看吧,功能达到了。

templates/web/send-command.html的代码如下:

复制代码
<!DOCTYPE html>
<html  lang="zh_CN">
<head>
    <meta charset="UTF-8">
    <title>发送命令</title>
    <script src="../../static/js/jquery-2.1.4.min.js"></script>
</head>
<body>
<div>
    <select id="single_object">
        {% for user in users %}
            <option value="{{ user.phone_number }}">{{ user.phone_number }}</option>
        {% endfor %}
    </select>
    <input type="text" name="single_content">
    <button id="single_chat">单聊</button>
</div>
<br>
<br>
<br>
<br>
<div>
    <select multiple id="group_object">
        {% for user in users %}
            <option value="{{ user.phone_number }}">{{ user.phone_number }}</option>
        {% endfor %}
    </select>
    <input type="text" name="group_content">
    <button id="group_chat">组聊</button>
</div>
<br>
<br>
<br>
<br>
<div>
    <input type="text" name="broadcast_content">
    <button id="broadcast_chat">群聊</button>
</div>
<script>
    var baseurl = '/web/';
    $(function(){
        $("#single_chat").click(function(){
            var chat_to = [];
            chat_to.push($("#single_object option:selected").val());
            var content = $("input[name=single_content]").val();
            console.log("chat_to:" + chat_to + "    content:" + content);
            $.ajax({
                type: "POST",
                url: baseurl + "send-command",
                data: JSON.stringify({chat_to:chat_to, content:content, command_id:102}),
                dataType: "json",
                contentType: "application/json",
                success: function(data){
                    if (data["code"] == 1){
                        $("input[name=single_content]").val("");
                        console.log(data["message"]);
                    }else
                    {
                        console.log(data["message"]);
                    }

                }
            });
        });

        $("#group_chat").click(function(){
            var chat_tos = [];
            var chat_to = $("#group_object  option:selected").each(function(){
                chat_tos.push($(this).val());
            });
            var content = $("input[name=group_content]").val();
            console.log("chat_to:" + chat_tos + "    content:" + content);
            $.ajax({
                type: "POST",
                url: baseurl + "send-command",
                data: JSON.stringify({chat_to:chat_tos, content:content, command_id:103}),
                dataType: "json",
                contentType: "application/json",
                success: function(data){
                    if (data["code"] == 1){
                        $("input[name=group_content]").val("");
                        console.log(data["message"]);
                    }else
                    {
                        console.log(data["message"]);
                    }
                }
            });
        });

        $("#broadcast_chat").click(function(){
            var chat_to = [];
            {% for user in users %}
                chat_to.push("{{ user.phone_number }}");
            {% endfor %}
            var content = $("input[name=broadcast_content]").val();
            console.log("content:" + content);
            $.ajax({
                type: "POST",
                url: baseurl + "send-command",
                data: JSON.stringify({chat_to:chat_to, content:content, command_id:104}),
                dataType: "json",
                contentType: "application/json",
                success: function(data){
                    if (data["code"] == 1){
                        $("input[name=broadcast_content]").val("");
                        console.log(data["message"]);
                    }else
                    {
                        console.log(data["message"]);
                    }

                }
            });
        });
    });

</script>
</body>
</html>
复制代码

效果有点丑,

这样就可以直接发送了,发送到http服务器端,http服务器再把数据打包成json格式,发送到frontTCP端,那自然,frontTCP需要增加一点代码,之前的Protocol不变,只是在Factory里面增加2个函数,再增加一个循环任务,不停的接受redis的消息。

frontTCP新增代码如下:

复制代码
class ChatFactory(Factory):
    def __init__(self):
        self.users = {}

    def buildProtocol(self, addr):
        return Chat(self)

    def check_users_online(self):
        for key, value in self.users.items():
            if value.last_heartbeat_time != 0 and int(time.time()) - value.last_heartbeat_time > 4:
                log.msg("[%s]没有检测到心跳包,主动切断" % key.encode('utf-8'))
                value.transport.abortConnection()

    @defer.inlineCallbacks
    def receive_from_mq(self):
        data = yield redis_store.rpop('front_tcp')
        if data:
            log.msg("接受到来自消息队列的消息:", data)
            self.process_data_from_mq(data)

    def process_data_from_mq(self, data):
        loads_data = json.loads(data)
        command_id = loads_data.get('command_id')
        phone_numbers = loads_data.get('chat_to')
        chat_from = loads_data.get('chat_from')
        chat_content = loads_data.get('chat_content')

        content = json.dumps(dict(chat_from=chat_from, chat_content=chat_content))

        self.send_content(content, command_id, phone_numbers)

    def send_content(self, send_content, command_id, phone_numbers):
        """
        发送函数
        """
        length = 12 + len(send_content)
        version = 1100
        command_id = command_id
        header = [length, version, command_id]
        header_pack = struct.pack('!3I', *header)
        for phone_number in phone_numbers:
            if phone_number in self.users.keys():
                self.users[phone_number].transport.write(header_pack + send_content)
            else:
                log.msg("Phone_number:%s 不在线." % phone_number.encode('utf-8'))

cf = ChatFactory()

task1 = task.LoopingCall(cf.check_users_online)
task1.start(3, now=False)

task_receive_data_from_mq = task.LoopingCall(cf.receive_from_mq)
task_receive_data_from_mq.start(0.1, now=False)

reactor.listenTCP(8124, cf)
reactor.run()
复制代码
    receive_from_mq就是接受来之redis的消息,异步化一下,然后建一个循环任务task_receive_data_from_mq,这个循环任务,每0.1秒触发一次(以后rabbitmq也是这样),
如果队列消息里面有数据,就处理,否则继续循环。
  process_data_from_mq这是拿到具体的data,然后处理的过程,基本就解包、打包,然后发送。
  send_content这就是发送函数,我基本就把Protocol里面的发送函数重新抄了一遍,以后我们会做一个虚类,然后具体的处理函数来继承它,这次我就直接抄了。
  好了,整个过程就这样,我们来运行一下,启动2个客户端,看看客户端接受情况吧。

复制代码
yudahai@yudahaiPC:tcpserver$ python frontClient.py 13764408552 123456
2016-06-24 16:01:09+0800 [-] Log opened.
2016-06-24 16:01:09+0800 [-] Starting factory <__main__.EchoClientFactory instance at 0x7fd9a7eff3b0>
2016-06-24 16:01:09+0800 [-] Started to connect
2016-06-24 16:01:09+0800 [Uninitialized] Connected.
2016-06-24 16:01:09+0800 [Uninitialized] New connection IPv4Address(TCP, '192.168.5.60', 8124)
2016-06-24 16:01:10+0800 [EchoClient,client] 验证通过
2016-06-24 16:01:19+0800 [EchoClient,client] [单聊][13764408552]:ddddddddd
2016-06-24 16:01:32+0800 [EchoClient,client] [组聊][13764408552]:fghytjhnuyjuyjmuikiuk
2016-06-24 16:01:41+0800 [EchoClient,client] [群聊][13764408552]:ffffffffffffffffffffffffffffff
2016-06-24 16:18:26+0800 [EchoClient,client] [组聊][13764408552]:你好,这是web组聊
2016-06-24 16:18:27+0800 [EchoClient,client] [群聊][13764408552]:你好,这是web群聊
复制代码

看,是不是全接受到了?

这章就讲到这,主要讲到了如果通过redis把不同的模块联系在一起,其实本质上就是把客户端的状态在模块之间共享;之后我们讲了如何通过redis做一个简单的消息队列,这个其实是rabbitmq的特性,之所以要先讲一下,就是用最简单的方式来预热一下,因为rabbitmq的应用很广,可能一下子接受不了。还有就是把上一章的一些小bug解决掉。至于异步化,这个概念稍微有点大(好吧,我也不是研究特别的深,以后我会专门抽出一章讲这个内容)。



发表评论

电子邮件地址不会被公开。 必填项已用 * 标注

*

您可以使用这些 HTML 标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>

(Spamcheck Enabled)