在初次使用 ChatGPT 时,我就被打字机的视觉效果吸引。总是感觉似曾相识,因为经常在一些科幻电影中看到,高级文明回传的信息在通讯设备的屏幕上以打字机效果逐步出现,在紧张的氛围下,输出人类可读的内容,拉动着观众的神经,一步步将故事情节拉向高潮。
在很早之前我就了解过 Server-Sent Events 这门服务端推送技术,当时看过很多博客介绍其原理和使用场景,最后也没有留下深刻的印象。这一次 ChatGPT 的使用感受带给我一些触动,也激发了对技术的思考,究竟什么样的技术是一门好的技术 ”需要一个杀手级的应用,现实应用会促进技术发展“,技术不是冰冷无情的,贴近生活挖掘其实用价值,一样可以表现出感性的艺术效果。
Server-Sent Events(SSE)是一种允许服务器单向推送信息到客户端的技术,与传统的请求/响应模式相比,这种模式更加适合处理实时数据。以下是一些常见的 Server-Sent Events 应用场景:
2.1 SSE 工作原理
SSE 的基本工作原理是客户端首先向服务器发送一个 HTTP 请求,然后服务器保持这个连接打开,并周期性地通过这个连接向客户端发送数据。每个数据块都是一个独立的消息,每个消息都以一个空行结束。
使用 SSE 的主要步骤如下:
let source = new EventSource("http://xxx/chat/completions");
HTTP/1.1 200 OK Content-Type: text/event-stream Connection: keep-alive Cache-Control: no-cache
data: This is a message\n\n
source.onmessage = function(event) { console.log(event.data); };
注意,由于 SSE 是基于 HTTP 的,因此它受到同源策略的限制。如果你需要进行跨域 SSE,你需要在服务器端设置适当的 CORS 头部信息。另外,SSE 只支持文本数据,不支持二进制数据。如果你需要发送二进制数据,你可能需要考虑使用 WebSockets。
2.2 Fetch API 模拟 SSE
Fetch API 是一种通用的 HTTP 请求和响应模型,它可以用于发送和接收任何类型的 HTTP 请求,支持文本和二进制数据。由于其对流(Stream)的支持,可以模拟 Server-Sent Events (SSE),需要手动处理重连和流式数据。
在某些情况下,你可能会选择使用 Fetch API 模拟 SSE,而不是直接使用 SSE:
const url = 'https://your-server.com/events'; fetch(url) .then(response => { const reader = response.body.getReader(); const decoder = new TextDecoder(); // done 为数据是否接收完成 boolean 值 // value 为接收到的数据, Uint8Array 格式 return reader.read().then(function processMessage({ done, value }) { if (done) { return; } console.log(decoder.decode(value)); return reader.read().then(processMessage); }); });
在这个示例中,我们使用 fetch() 函数发起 HTTP 请求。然后,使用 response.body.getReader() 获取一个可读流的 reader,用来读取数据。还创建了一个 TextDecoder 对象,用来将二进制数据解码为文本,然后打印出来。然后,再次调用 reader.read() 方法,等待下一批数据。
这样,就可以使用 Fetch API 来接收服务器推送的实时更新,就像使用 SSE 一样,ChatGPT 采用的就是这种实现。
Server-Sent Events (SSE) 是一种服务器推送技术,允许服务器向客户端发送实时更新。在服务器端,我们需要创建一个 endpoint,发送正确的 HTTP 头部并持续推送数据。
func main() { http.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported!", http.StatusInternalServerError) return } // 事件流媒体 (MIME 类型) w.Header().Set("Content-Type", "text/event-stream") // 阻止缓存 w.Header().Set("Cache-Control", "no-cache") // 保持长连接 w.Header().Set("Connection", "keep-alive") // 跨域支持 w.Header().Set("Access-Control-Allow-Origin", "*") phrase := []string{"dolor ", "sit amet", ", consectetur", " adipiscing elit. ", "Ut consequat", " diam at ", "justo efficitur", " mattis."} for _, delta := range phrase { // 数据内容用 data 表示, 如果数据很长, 可以分成多行用 \n 结尾, fmt.Fprintf(w, "data: %s\n", delta) flusher.Flush() time.Sleep(200 * time.Millisecond) } // 最后一行使用 \n\n 结尾 fmt.Fprintf(w, "data: %s\n\n", "[DONE]") }) if err := http.ListenAndServe("127.0.0.1:8080", nil); err != nil { panic(err) } }
在 Go 语言中,http.Flusher 是一个接口,它允许 HTTP 响应数据在写入后立即发送到客户端,而不是等待所有响应数据都写入后再一次性发送。这对于长连接和服务器推送的场景非常有用。
// Flush 将用户层的数据写入到 TCP 缓冲区,内核会尽快将 TCP 缓存区数据发送出去 type Flusher interface { Flush() }
扩展:每个 TCP socket 连接在内核中都有一个发送缓存区和接收缓冲区
发送缓冲区用于暂存应用程序写入的数据,直到数据被发送出去并得到对方的确认。接收缓冲区用于暂存收到的数据,直到应用程序读取这些数据。
当应用程序调用发送数据的系统调用(如 write 或 send)时,数据会被复制到发送缓冲区。然后,内核会尽快将这些数据发送出去。但具体发送的时机取决于许多因素,包括但不限于以下几点:
当数据成功发送并得到确认后,内核会从发送缓冲区中删除这些数据,释放缓冲区空间。
上面我们讨论下 SSE 的工作原理,也知道由于 Web API EventSource 的局限性,ChatGPT 采用了 Fetch API 来手动处理和解析 SSE 服务端端点接收的数据流。那么接下来通过一个简单的打字机案例,加深对所学内容的理解。
这里借鉴了 《ChatGPT 打字机消息回复实现原理》 文章中的前端代码,在其基础上增加了消息处理逻辑,用于适配上面的 SSE 服务端。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Chat Completion</title> </head> <body> <button onclick="connectFetch()">建立 fetchSSE 连接</button> <button onclick="closeSSE()">断开 fetchSSE 连接</button> <br/> <br/> <div id="text"></div> <script> const divTyping = document.getElementById('text') let ctrl const connectFetch = () => { ctrl = new AbortController() fetchEventSource('http://127.0.0.1:8080/v1/chat/completions', { method: 'POST', body: JSON.stringify({ prompt: 'Lorem ipsum', max_tokens: 20, stream: true, }), signal: ctrl.signal, onopen: () => { console.log('Connection successful.') }, onclose: () => { console.log('Connection closed.') }, onmessage: (delta) => { let prefix = 'data: ' if (!delta.startsWith(prefix)) { return } delta = delta.slice(prefix.length) delta = delta.replace(/\n$/, '') if (delta === '[DONE]\n') { return } divTyping.innerText += delta } }) } const closeSSE = () => { if (ctrl) { ctrl.abort() ctrl = null } } const fetchEventSource = (url, options) => { fetch(url, options).then(resp => { if (resp.status === 200) { options.onopen && options.onopen() return resp.body } }).then(rb => { const reader = rb.getReader() const push = () => { // done 为数据是否接收完成 boolean 值 // value 为接收到的数据, Uint8Array 格式 return reader.read().then(({done, value}) => { if (done) { options.onclose && options.onclose() return } options.onmessage && options.onmessage(new TextDecoder().decode(value)) return push() }); } // 开始读取流信息 return push() }).catch((e) => { options.error && options.error(e) }) } </script> </body> </html>