动手搓一个kubernetes管理平台(5)-WebSocket和TTY

2024-01-23 15:28

本文主要是介绍动手搓一个kubernetes管理平台(5)-WebSocket和TTY,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

所有的kubernetes管理平台,都会用到TTY的功能,既通过前端直接进入到容器内部,这是一个交互式的操作,或者说是一个流式操作,简单的http协议肯定不能满足这个需求,使用websocket就能很好的满足这个需求。

用通俗的话来描述websocket, 其实就三点:

  1. 可以直接在浏览器里使用
  2. 支持双向通信
  3. 封装简单

既然决定了使用websocket作为前端进入容器的方式,那么可以看看后端是如何进入容器的。

后端封装

golang中,一般使用github.com/gorilla/websocket 对websocket进行封装。

package wsconnectimport ("errors""github.com/gorilla/websocket""net/http""sync"
)// http升级websocket协议的配置
var wsUpgrader = websocket.Upgrader{// 允许所有CORS跨域请求CheckOrigin: func(r *http.Request) bool {return true},
}// websocket消息
type WsMessage struct {MessageType intData        []byte
}// 封装websocket连接
type WsConnection struct {wsSocket *websocket.Conn // 底层websocketinChan   chan *WsMessage // 读取队列outChan  chan *WsMessage // 发送队列mutex     sync.Mutex // 避免重复关闭管道isClosed  boolcloseChan chan byte // 关闭通知
}// 读取协程
func (wsConn *WsConnection) wsReadLoop() {var (msgType intdata    []bytemsg     *WsMessageerr     error)for {// 读一个messageif msgType, data, err = wsConn.wsSocket.ReadMessage(); err != nil {goto ERROR}msg = &WsMessage{msgType,data,}// 放入请求队列select {case wsConn.inChan <- msg:case <-wsConn.closeChan:goto CLOSED}}
ERROR:wsConn.WsClose()
CLOSED:
}// 发送协程
func (wsConn *WsConnection) wsWriteLoop() {var (msg *WsMessageerr error)for {select {// 取一个应答case msg = <-wsConn.outChan:// 写给websocketif err = wsConn.wsSocket.WriteMessage(msg.MessageType, msg.Data); err != nil {goto ERROR}case <-wsConn.closeChan:goto CLOSED}}
ERROR:wsConn.WsClose()
CLOSED:
}func InitWebsocket(resp http.ResponseWriter, req *http.Request) (wsConn *WsConnection, err error) {var (wsSocket *websocket.Conn)// 应答客户端告知升级连接为websocketif wsSocket, err = wsUpgrader.Upgrade(resp, req, nil); err != nil {return}wsConn = &WsConnection{wsSocket:  wsSocket,inChan:    make(chan *WsMessage, 1000),outChan:   make(chan *WsMessage, 1000),closeChan: make(chan byte),isClosed:  false,}// 读协程go wsConn.wsReadLoop()// 写协程go wsConn.wsWriteLoop()return wsConn, nil
}// 发送消息
func (wsConn *WsConnection) WsWrite(messageType int, data []byte) (err error) {select {case wsConn.outChan <- &WsMessage{messageType, data}:case <-wsConn.closeChan:err = errors.New("websocket closed")}return
}// 读取消息
func (wsConn *WsConnection) WsRead() (msg *WsMessage, err error) {select {case msg = <-wsConn.inChan:returncase <-wsConn.closeChan:err = errors.New("websocket closed")}return
}// 关闭连接
func (wsConn *WsConnection) WsClose() {wsConn.wsSocket.Close()wsConn.mutex.Lock()defer wsConn.mutex.Unlock()if !wsConn.isClosed {wsConn.isClosed = trueclose(wsConn.closeChan)}
}

上述代码就是一个简单的websocket的封装,来看看这段代码做了哪些事:

  • 首先封装websocket 连接和消息的结构体,这个和tcp传输的方式类似,在连接的结构体中,声明2读写2个channel, 以及对应的websocket消息体,考虑到并发安全,所以加了个mutex的锁。
  • 然后定义读写逻辑,由于上述消息体的Data的字节,所以需要在读写逻辑里定义一个结构体 ,来解析这个字节,这个需要前后端一起约定好,然后发起一个协程不停的调用
  • 最后进行初始化, 注意,初始化的时候需要使用upgrade的方法,将http请求升级成websocket协议,然后启动收发2个协程,返回wsconn的结构体。

上述代码是对websocket的封装,这段代码仅仅是用来获取前端传来的数据,并不会对kubernetes进行任何操作,好在kubernetes的标准库"k8s.io/client-go/tools/remotecommand"提供了一个解决思路,remotecommand提供了连接到容器的方法:

if executor, err = remotecommand.NewSPDYExecutor(restConf, "POST", sshReq.URL()); err != nil {goto END}

然后使用Stream方法,将输入输出以流的方式,连接到容器

// 配置与容器之间的数据流处理回调handler = &streamHandler{wsConn: wsConn, resizeEvent: make(chan remotecommand.TerminalSize)}if err = executor.Stream(remotecommand.StreamOptions{Stdin:             handler,Stdout:            handler,Stderr:            handler,TerminalSizeQueue: handler,Tty:               true,}); err != nil {goto END}return

当然,在实际的执行命令之前,需要提前做一些预处理,比如验证权限,初始化客户端等等,简单的描述一下后端的逻辑:

ws请求
截取token以及其他参数
基于参数和token生成客户端
使用客户端创建客户端到容器的连接
将wss请求转换成流
持续输出和返回

前端封装

前端由于需要模拟一个terminal的窗口,这块可以用到大名鼎鼎的xterm,使用xterm可以在前端模拟出一个完整的terminal,包括颜色 ,字体 ,窗口大小等等,都是可以可配置的。

由于我的前端使用的是ts+vue3的框架进行编写的,所以仅需要单独写一个页面即可。

<template><div class="container"><Breadcrumb:items="[{path: '../workload/listpods',label: $t('menu.dashboard.workload'),},{ path: '', label: $t('menu.dashboard.workload.terminal.get') },]"/><!-- 基础信息 --><div:style="{width: '100%',}"><a-cardclass="general-card":title="$t('menu.dashboard.workload.terminal.get')"><a-row style="margin-bottom: 16px"><a-col :span="4"><a-space size="mini"><a-tag size="large">命名空间:</a-tag><p>{{ route.query.namespace }}</p></a-space></a-col><a-col :span="8" :offset="1"><div><a-space size="mini"><a-tag size="large">Pod:</a-tag><p>{{ route.query.podname }}</p></a-space></div></a-col><a-col :span="4" :offset="1"><div><a-space size="mini"><a-tag size="large">Container:</a-tag><p>{{ route.query.container }}</p></a-space></div></a-col><!-- 选择bash or shell --><a-col :span="4" :offset="1"><div><a-space size="mini"><h4>Bash:</h4><a-select :style="{ width: '100px' }" v-model="currentBash"><a-option>bash</a-option><a-option>sh</a-option></a-select></a-space></div></a-col></a-row><div ref="terminal"></div></a-card></div></div>
</template><script lang="ts" setup>
import { ref, onMounted, onBeforeUnmount, watch } from "vue";
import useLoading from "@/hooks/loading";
import { debounce } from "lodash";
import { Terminal } from "xterm";
import { FitAddon } from "xterm-addon-fit";
import { useRoute } from "vue-router";
import "xterm/css/xterm.css";
import { LabelDesc } from "@/api/common";
import { getToken } from "@/utils/auth";const { loading } = useLoading(true);
const route = useRoute();const descData = ref<LabelDesc[]>([{ label: "命名空间", value: route.query.namespace as string },{ label: "POD", value: route.query.podname as string },{ label: "Container", value: route.query.container as string },
]);
const shrc = ref<string>("b");
const currentBash = ref<string>("bash");// websocket客户端初始化相关
// 打开terminal
const OpenTerminal = () => {loading.value = false;
};
// 关闭terminal
const CloseTerminal = () => {console.log("onclose");
};
// 处理消息
const OnMessage = (event: any) => {term.value.write(event.data);
};
// 处理terminal错误
const OnError = () => {console.log("onerror");
};const terminalSocket = ref();
// 判断连接是否打开
const isWsOpen = () => {const readyState = terminalSocket.value && terminalSocket.value.readyState;return readyState === 1;
};
// 创建WS
const createWS = () => {const token = getToken() as string;if (currentBash.value === "bash") {shrc.value = "b"} else if (currentBash.value === "sh") {shrc.value = "s"}const wsUrl = `wss://${window.location.host}/kubemgr/api/v1/ws/${route.params.clusteruuid}/${route.query.namespace}/${route.query.podname}/${route.query.container}/${shrc.value}/ssh?clusterinfo=${token}`;terminalSocket.value = new WebSocket(wsUrl);terminalSocket.value.onopen = OpenTerminal; // WebSocket 连接已建立terminalSocket.value.onmessage = OnMessage; // 收到服务器消息terminalSocket.value.onclose = CloseTerminal; // WebSocket 连接已关闭terminalSocket.value.onerror = OnError; // WebSocket 连接出错
};
// 初始化WS
const initWS = () => {if (!terminalSocket.value) {createWS();}if (terminalSocket.value && terminalSocket.value.readyState >= 1) {terminalSocket.value.close();createWS();}
};// terminal初始化相关
const term = ref();
const terminal = ref();
const fitAddon = new FitAddon();// 尺寸同步 发送给后端,调整后端终端大小,和前端保持一致,不然前端只是范围变大了,命令还是会换行
const resizeRemoteTerminal = () => {if (isWsOpen()) {const msg = {type: "resize",rows: term.value.rows,cols: term.value.cols,};terminalSocket.value.send(JSON.stringify(msg));}
};// 终端输入绑定事件
const termData = () => {// 输入与粘贴的情况,onData不能重复绑定,不然会发送多次term.value.onData((data: any) => {if (isWsOpen()) {// 写给服务端, 由服务端发给containerconst msg = { type: "input", input: data };terminalSocket.value.send(JSON.stringify(msg));}});// 终端尺寸变化触发term.value.onResize(() => {resizeRemoteTerminal();});
};const initTerm = () => {term.value = new Terminal({lineHeight: 1.2,fontSize: 12,fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",theme: {background: "#181d28",},// 光标闪烁cursorBlink: true,cursorStyle: "underline",scrollback: 100,tabStopWidth: 4,});term.value.open(terminal.value); // 挂载dom窗口,初始化为空数据term.value.loadAddon(fitAddon); // 自适应尺寸// 不能初始化的时候fit,需要等terminal准备就绪,可以设置延时操作setTimeout(() => {fitAddon.fit();}, 1000);termData(); // Terminal 事件挂载
};const resetTerm = () => {term.value.reset()terminal.value.innerHTML = '';term.value = new Terminal({lineHeight: 1.2,fontSize: 12,fontFamily: "Monaco, Menlo, Consolas, 'Courier New', monospace",theme: {background: "#181d28",},// 光标闪烁cursorBlink: true,cursorStyle: "underline",scrollback: 100,tabStopWidth: 4,});term.value.open(terminal.value); // 挂载dom窗口,初始化为空数据term.value.loadAddon(fitAddon); // 自适应尺寸// 不能初始化的时候fit,需要等terminal准备就绪,可以设置延时操作setTimeout(() => {fitAddon.fit();}, 1000);termData(); // Terminal 事件挂载
};// 窗口大小适应相关
// 适应浏览器尺寸变化
const fitTerm = () => {fitAddon.fit();
};
const onResize = debounce(() => fitTerm(), 500);
const onTerminalResize = () => {window.addEventListener("resize", onResize);
};
const removeResizeListener = () => {window.removeEventListener("resize", onResize);
};onMounted(() => {loading.value = true;initWS();initTerm();onTerminalResize();
});onBeforeUnmount(() => {removeResizeListener();if (terminalSocket.value) {terminalSocket.value.close();}
});watch(() => currentBash.value, // 要监视的数据() => {// 回调函数loading.value = true;initWS();resetTerm();// initTerm();onTerminalResize();},{// immediate: true, // 立即执行回调deep: true, // 深层监视}
);
</script><script lang="ts">
export default {name: "GetTerminal",
};
</script><style lang="scss" scoped>
.terminal {width: 90%;// height: calc(100% - 62px);// height: 100%;margin-bottom: 16px;
}
</style>

最终的效果就类似这种

63BIQb

  • Tips: 一般服务部署后,服务暴露会使用nginx或者ingress, 默认是不支持websocket的,所以需要在nginx/ingress上添加一下配置, 将转发的http请求升级到websocket
	proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection upgrade;proxy_set_header Host $host;proxy_pass http://127.0.0.1:8082;
个人公众号, 分享一些日常开发,运维工作中的日常以及一些学习感悟,欢迎大家互相学习,交流

在这里插入图片描述

这篇关于动手搓一个kubernetes管理平台(5)-WebSocket和TTY的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/636748

相关文章

gradle第三方Jar包依赖统一管理方式

《gradle第三方Jar包依赖统一管理方式》:本文主要介绍gradle第三方Jar包依赖统一管理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录背景实现1.顶层模块build.gradle添加依赖管理插件2.顶层模块build.gradle添加所有管理依赖包

基于Python打造一个智能单词管理神器

《基于Python打造一个智能单词管理神器》这篇文章主要为大家详细介绍了如何使用Python打造一个智能单词管理神器,从查询到导出的一站式解决,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 项目概述:为什么需要这个工具2. 环境搭建与快速入门2.1 环境要求2.2 首次运行配置3. 核心功能使用指

HTML5中的Microdata与历史记录管理详解

《HTML5中的Microdata与历史记录管理详解》Microdata作为HTML5新增的一个特性,它允许开发者在HTML文档中添加更多的语义信息,以便于搜索引擎和浏览器更好地理解页面内容,本文将探... 目录html5中的Mijscrodata与历史记录管理背景简介html5中的Microdata使用M

Spring 基于XML配置 bean管理 Bean-IOC的方法

《Spring基于XML配置bean管理Bean-IOC的方法》:本文主要介绍Spring基于XML配置bean管理Bean-IOC的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一... 目录一. spring学习的核心内容二. 基于 XML 配置 bean1. 通过类型来获取 bean2. 通过

python uv包管理小结

《pythonuv包管理小结》uv是一个高性能的Python包管理工具,它不仅能够高效地处理包管理和依赖解析,还提供了对Python版本管理的支持,本文主要介绍了pythonuv包管理小结,具有一... 目录安装 uv使用 uv 管理 python 版本安装指定版本的 Python查看已安装的 Python

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

在Android平台上实现消息推送功能

《在Android平台上实现消息推送功能》随着移动互联网应用的飞速发展,消息推送已成为移动应用中不可或缺的功能,在Android平台上,实现消息推送涉及到服务端的消息发送、客户端的消息接收、通知渠道(... 目录一、项目概述二、相关知识介绍2.1 消息推送的基本原理2.2 Firebase Cloud Me

基于Python和MoviePy实现照片管理和视频合成工具

《基于Python和MoviePy实现照片管理和视频合成工具》在这篇博客中,我们将详细剖析一个基于Python的图形界面应用程序,该程序使用wxPython构建用户界面,并结合MoviePy、Pill... 目录引言项目概述代码结构分析1. 导入和依赖2. 主类:PhotoManager初始化方法:__in

关于WebSocket协议状态码解析

《关于WebSocket协议状态码解析》:本文主要介绍关于WebSocket协议状态码的使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录WebSocket协议状态码解析1. 引言2. WebSocket协议状态码概述3. WebSocket协议状态码详解3

nvm如何切换与管理node版本

《nvm如何切换与管理node版本》:本文主要介绍nvm如何切换与管理node版本问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录nvm切换与管理node版本nvm安装nvm常用命令总结nvm切换与管理node版本nvm适用于多项目同时开发,然后项目适配no