python --实现sse推送(nginx配置)

flask-sse

import json

import redis
from flask import Flask, current_app
from flask_cors import CORS
from flask_sse import sse
from redis import Redis

app=Flask(__name__)
CORS(app, supports_credentials=True)

app.config["REDIS_URL"]="redis://localhost"
app.register_blueprint(sse,url_prefix='/stream')   # sse请求地址(订阅消息)

@app.route('/send')   # 发布消息
def send_message():
    sse.publish({"message":"Hello!"},type='greeting')
    return"Message sent!"

页面demo

<!DOCTYPE html>
<html>
<head>
    <title>SSE Demo</title>
</head>
    <body>
    <div id="message-div"></div>
    <script>
        var source = new EventSource("http://127.0.0.1:5000/stream") // 创建EventSource对象,连接到SSE流

        source.addEventListener("message", function (event) { // 监听"message"事件
            console.log(event.data)});
    </script>
    </body>
</html>

自实现:

import json

import loguru
import redis
from flask import Flask, current_app, request
from flask_cors import CORS

class PubSub(object):
    '''发布订阅者'''

    def __init__(self):
        self._conn = redis.Redis(connection_pool=redis.ConnectionPool(decode_responses=True))

    def pub(self, message, channel_name, type):
        '''
        发布
        @params message       --> 消息内容; 字符串格式;
        @params channel_name  --> 频道;
        @params type          --> 自定义类型;
        '''
        self._conn.publish(channel_name, json.dumps({
    
    'data': {
    
    'message': message}, 'type': type}))

    def sub(self, channel_name):
        '''
        订阅
        @params channel_name  --> 频道;
        响应格式:
            event:greeting
            data:{"message": "Hello!"}

            event:greeting
            data:{"message": "Hello!"}
        '''
        pubsub = self._conn.pubsub()  # 生成订阅对象
        pubsub.subscribe(channel_name)
        try:
            for pubsub_message in pubsub.listen():
                loguru.logger.debug(f'消息中间件:{
      
      pubsub_message}')
                yield getattr(self, pubsub_message['type'])(pubsub_message)
        finally:
            try:
                pubsub.unsubscribe(channel_name)
            except ConnectionError: ...

    def message(self, pubsub_message):
        '''常规消息'''
        msg_dict = json.loads(pubsub_message['data'])
        return f'event:{
      
      msg_dict["type"]}' + '\n' + f"data:{
      
      str(msg_dict['data'])}" + '\n\n'

    def subscribe(self, *args):
        '''发起连接消息'''
        return f'event:connect' + '\n' + f"data:ok" + '\n\n'


app=Flask(__name__)
CORS(app, supports_credentials=True)
p = PubSub()
@app.route('/sse')
def sse():
    '''
    订阅消息
    Eg:
    <script>
        var source = new EventSource("/sse?channel_name=demo") // 创建EventSource对象,连接到SSE流

        source.addEventListener("message", function (event) { // 监听"message"事件
            console.log(event.data)});
    </script>
    '''
    channel_name = request.args.get('channel_name')
    return current_app.response_class(
        p.sub(channel_name),
        mimetype='text/event-stream',
    )

@app.route('/send')
def send():
    '''
    发布消息
    /send?channel_name=demo&mes=测试下
    '''
    mes = request.args.get('mes')
    channel_name = request.args.get('channel_name')
    p.pub(channel_name=channel_name, message=mes, type='message')
    return f'send:{
      
      mes}'


if __name__ == '__main__':
    app.run(debug=True)

nginx配置

 server {
        listen       443 default ssl;
        listen       [::]:443 default ssl;
        server_name  _;

        ssl_certificate /root/project/code/tree_hole_gpt/ssl/(文件);
        ssl_certificate_key /root/project/code/tree_hole_gpt/ssl/(文件);
        ssl_session_timeout 5m;
        ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;
        ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
        ssl_prefer_server_ciphers on;

        gzip on;
        gzip_buffers 4 32k;
        gzip_types "*";
        gzip_vary on;
        gzip_min_length 1k;
        gzip_comp_level 6;
        gzip_http_version 1.1;

        proxy_set_header Host $http_host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_read_timeout 120s;      #nginx代理等待后端服务器的响应时间
        proxy_connect_timeout 120s;  #nginx代理与后端服务器连接超时时间(代理连接超时)
        proxy_send_timeout 120s;    #后端服务器数据回传给nginx代理超时时间


        location / {
            proxy_pass http://127.0.0.1:8001;
        }

		location /ws/ {#ws协议时
            proxy_http_version 1.1;
            proxy_set_header Host  $host;
            proxy_set_header X-Real-Ip $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Nginx-Proxy true;
            proxy_redirect off;
            client_max_body_size 10m;
            proxy_pass http://127.0.0.1:8001;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
            proxy_connect_timeout 300s;
            proxy_read_timeout 300s;
            proxy_send_timeout 300s;
       }

        location /sse/ {
            proxy_set_header Host $http_host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            proxy_cache off;
            proxy_buffering off;
            proxy_pass http://127.0.0.1:8001;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "Upgrade";
        }
        location /media/ {
            alias /root/project/code/tree_hole_gpt/media/;
            expires 30d;
        }

        location /static/ {
            alias /root/project/static/;
            expires 30d;
        }
    }
}

猜你喜欢

转载自blog.csdn.net/weixin_44634704/article/details/131821259