体验过 OpenAI的ChatGPT的朋友们应该都会发现交谈的内容都是一个字一个字蹦出来的,熟悉其背后原理的朋友都知道除了一些预先调校的回复实际上AI的回复都是实时生成的,或者更专业的说法应该是实时预测出来的。所以这种显示效果也是一种无奈之举,但小伙伴有没有发现其实它还蛮酷的,至少不太容易让人产生反感。
既然内容是实时生成的,那自然是后端接口实时喂数据给前端。我们一般的接口都是一次性返回所有数据,如何才能实时不断地给客户端返回数据呢?我们先来看 OpenAI官方是如何实现的?
打开控制台,可以看到核心就是下图红框中所标识的。
text/event-stream
是何方神圣呢?它称为Server Sent Events (SSE),也就是服务端实时向客户端推送数据。下面我们通过 Golang 来实现服务端,首先构造一段 ChatGPT tell me a story
中预先编辑的一段数据,文字比较多,方便演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
var data = `Once upon a time in a small village nestled deep in the heart of a lush forest, there lived a young and adventurous girl named Lily. She had always been enchanted by the stories her grandmother told her about magical creatures and hidden treasures that lay beyond their village. Lily's curiosity grew day by day, and she yearned for an adventure of her own. One sunny morning, as she roamed the outskirts of the village, she stumbled upon an old, tattered map hidden beneath a mossy rock. The map depicted a path through the dense forest, leading to a legendary waterfall said to possess magical powers. With a surge of excitement, Lily decided to embark on a grand quest to find the hidden waterfall and uncover the secrets it held. She packed a small bag with provisions, put on her sturdy boots, and set off on her journey, following the twists and turns marked on the map. As Lily ventured deeper into the forest, she encountered numerous challenges along the way. She traversed treacherous ravines, crossed roaring rivers using makeshift bridges, and navigated through thickets of thorns. Yet, her determination never wavered, fueled by the hope of discovering something extraordinary. After days of relentless exploration, Lily finally emerged from the thick foliage and found herself standing before a breathtaking sight—the legendary waterfall. It cascaded down from great heights, sparkling in the sunlight, and filling the air with a melodious symphony of rushing water. Lily approached the shimmering pool at the base of the waterfall and cupped her hands, taking a sip of the crystal-clear water. Instantly, a wave of warmth coursed through her body, and she felt a surge of newfound energy. The legends were true—the waterfall possessed magical powers! Overwhelmed with joy, Lily spent hours basking in the magical aura of the waterfall. As she sat there, she realized that the real treasure she had discovered was not material wealth but the courage and resilience she had found within herself during her quest. Filled with a sense of contentment, Lily retraced her steps back to the village, carrying the memories of her adventure close to her heart. Word of her incredible journey spread throughout the village, inspiring others to embrace their own dreams and embark on their own quests. From that day forward, Lily became known as the village's greatest storyteller. She shared tales of her adventure, captivating both young and old with her vivid descriptions of the magical waterfall and the wonders that lay hidden in the world beyond their village. And so, the spirit of adventure lived on in the hearts of the villagers, thanks to the courage and curiosity of a young girl named Lily, who had dared to chase her dreams and discovered the true magic that lies within us all.` |
下面使用net/http
来创建接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
func main() { http.HandleFunc("/event", eventHandler) log.Println("Server is running on port 8000") log.Fatal(http.ListenAndServe(":8000", nil)) } func eventHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) return } for _, item := range data { w.Write([]byte(string(item) + "\n")) flusher.Flush() time.Sleep(time.Millisecond * 50) } w.Write([]byte("\n")) } |
客户端使用 Python 来做网络请求:
1 2 3 4 5 6 7 8 |
import requests url = 'http://127.0.0.1:8000/event' r = requests.get(url, stream=True) for line in r.iter_lines(): if line: print(line.decode(), end='', flush=True) |
效果如下:
注:这里使用iter_lines来进行处理,因此换行存在一些问题,可通过指定delimiter参数使用其它特殊字母来处理数据的刷新。
与之对应的还有一种application/x-ndjson
,ndjson是 Newline Delimited JSON的简写,顾名思义是通过换行来不断以JSON格式返回新的数据。比如我们构建一个简单的结构体和测试数据:
1 2 3 4 5 6 7 8 9 10 |
type Person struct { ID int `json:"id"` Name string `json:"name"` } var jsonData = []Person{ {ID: 1, Name: "John"}, {ID: 2, Name: "Jane"}, {ID: 3, Name: "Alice"}, } |
接下来配置路由:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
... http.HandleFunc("/ndjson", ndjsonHandler) ... func ndjsonHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/x-ndjson") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) return } for _, item := range jsonData { jsonData, err := json.Marshal(item) if err != nil { log.Println("Error marshaling JSON:", err) continue } w.Write(jsonData) w.Write([]byte("\n")) flusher.Flush() time.Sleep(500 * time.Millisecond) } } |
同样使用Python 客户端来完成请求:
1 2 3 4 5 6 7 8 |
import requests url = 'http://127.0.0.1:8000/ndjson' r = requests.get(url, stream=True) for line in r.iter_lines(): if line: print(line.decode(), flush=True) |
或者前端同学也可通axios
来完成请求:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
const axios = require('axios'); const url = 'http://127.0.0.1:8000/ndjson'; const headers = { 'Accept': 'application/x-ndjson' }; axios.get(url, { headers: headers, responseType: 'stream' }) .then(response => { response.data.on('data', chunk => { console.log(chunk.toString()); }); }) .catch(error => { console.error(error); }); |
后端Java可通过响应式的Webflux实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
@RestController public class CommentController { @GetMapping(path = "/comment/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<Comment> feed() { //... } @GetMapping(path = "/comment/stream/ndjson", produces = MediaType.APPLICATION_NDJSON_VALUE) public Flux<Comment> feedNdjson() { //... } } |
对于流式响应我们使用 Websocket 长连接进行处理,只不过 Websocket 可同时用于双向数据传输,较多用于聊天应用,这里不再赘述。
对 ChatGPT训练感兴趣的朋友,下面附上OpenAI 联合创始人 Andrej Karpathy近期演讲State of GPT中的截图: