使用 Python、Golang 和 React 构建音乐流媒体服务:从系统设计到编码
流式处理是软件工程中的一个引人入胜的话题。无论是音乐、视频还是简单的数据,若考虑不周,从设计、架构到编码的过程中应用这一概念可能会迅速变得复杂。
在本系列文章中,我们将构建一个使用 Python、Golang 和 React 流式传输音乐的服务。在本系列文章结束时,您将了解:
- 如何使用 Python 和 Golang 构建 API
- 如何提供 HTTP 范围请求
- 如何为音乐流媒体服务创建系统设计/架构
在本系列的最终部分,我们构建了一个应用程序版本,用户可以从浏览器中检索歌曲列表并播放选定的音乐,同时该架构能够支持大量请求。
然而,我们注意到架构中存在一些缺陷,主要是关于歌曲的安全性和隐私保护。任何用户都可以直接下载歌曲,这削弱了音乐流媒体应用程序的必要性。在本文中,我们将通过增加streaming service组件来解决这一问题,并确保它在面临大量请求时能够扩展。
如果您对涵盖此类主题的更多内容感兴趣,请订阅我的时事通讯,以获取有关软件编程、架构和技术相关见解的定期更新。
设置
您可以按照以下说明克隆项目的 v1.5:
git clone -b v1.5 https://github.com/koladev32/golang-react-music-streaming.git
cd golang-react-music-streaming
make setup
完成项目设置需要克隆所需的分支并安装所有包和依赖项。
一旦项目设置完毕,我们就可以着手讨论架构决策及其增强了。
建筑
下图表示项目当前状态下的体系结构。
此体系结构是一个简单的整体,它将所有后端功能(例如缓存、歌曲管理和数据库连接)整合到单个服务器或域中。为了确保服务器的带宽不被异常使用,我们将文件服务委托给了外部存储,这在架构中体现为将存储组件置于服务器域之外。
面对这种新构建的可扩展架构,我们遇到了一个挑战:用户能够获取直接下载链接,可能会绕过流媒体服务,从而违背了通过我们的应用程序进行内容流式传输的初衷。
为了优化系统,我们将通过整合流服务组件来重新规划架构。
在更新后的架构中,我们在服务器上新增了一个名为Streaming Service的组件。此组件负责与数据库交互,以检索歌曲的相关信息。值得注意的是,存储组件不再直接与客户端进行通信,而是改为与Streaming Service进行通信。
那么,Streaming Service是如何运作的呢?当用户发起歌曲请求时,API Gateway会将该请求重定向至Streaming Service。随后,Streaming Service会查询数据库以获取歌曲信息,并利用存储文件的URL进行下载。下载完成后,文件会被分割成多个字节块,并通过HTTP范围请求向客户端进行缓冲传输。
尽管恶意用户可能会尝试寻找下载歌曲的途径,但我们可以通过加密缓冲的块来增强安全性,加密密钥由客户端和服务器共同持有。不过,这一加密选项我们将在后文中详细讨论。
我们计划使用Golang来编写Streaming Service。之所以选择Golang,是因为我们打算构建一个流式处理引擎,并充分利用其强大的并发处理能力,以应对每分钟可能需要处理的数千个请求。尽管Python也可用于构建流式处理服务,特别是在开发便捷性更为关键且性能要求相对宽松的场景下,但对于需要高效处理大量并发请求且延迟要求低的服务来说,Go通常是更优的选择。
现在,我们对架构的变更有了更深入的理解,接下来,让我们着手添加流式处理引擎。
添加 Golang 流引擎
Streaming Engine 将用 Golang 编写,并将作为单独的服务运行。以下是这项服务的要求:
- 流式处理引擎将仅服务一个特定终结点,格式为
/songs/listen/{id}
,其中<id>
代表用户希望收听的歌曲的 ID。该终结点将用于从存储组件中检索歌曲。 - 流式处理服务将使用 HTTP Range Requests 响应客户端。列出要求后,我们将了解有关该概念的更多信息。
- 流式处理服务应接受标头中的范围,并发送部分响应。例如,这可确保客户端可以根据 Internet 速度指定他可以处理的文件字节范围。
- 缓冲和读取下载文件的块等任务应同时处理。
在明确这些要求后,我们接下来讨论 HTTP 范围请求的相关知识。
解释 HTTP 范围请求
HTTP 范围请求是一种技术,它使客户端能够请求资源的特定部分,这一功能在处理大文件(如视频或音频流)时尤为重要。流式处理服务普遍采用这一方法,以确保内容能够几乎立即开始播放,而无需用户等待整个文件的预先下载。
为了深入理解HTTP范围请求的工作机制,让我们逐步剖析这一过程。
HTTP 范围请求的工作原理
当客户端请求资源时,服务器通常会使用整个文件进行响应。但是,在处理大文件时,服务器可能会表明它支持范围请求,从而允许客户端分批下载文件。
- 初始请求
* The client starts by sending a standard HTTP `GET` request to retrieve the resource.
* If the file is large, the server might respond with the full resource or include an `Accept-Ranges: bytes` header to indicate that range requests are supported.
接下来,客户端可以使用范围请求功能仅下载文件的必要部分。
- 客户端发送范围请求
* To request a specific portion of the resource, the client includes a `Range` header in its request:
```json
Range: bytes=0-1023
```
* This header specifies the desired byte range, in this case, the first 1024 bytes.
收到此请求后,服务器将仅使用文件的请求部分进行响应。
- 服务器使用部分内容进行响应
* The server responds with an HTTP `206 Partial Content` status, indicating that it is sending only a portion of the resource:
```json
Content-Range: bytes 0-1023/5000
```
客户端收到此部分后,可以根据需要继续请求文件的其他部分。
- 后续请求
* If more data is needed, the client requests the next segment:
```json
Range: bytes=1024-2047
```
* The server then responds with the next chunk, continuing this process until the entire file is downloaded or the client has obtained all the necessary parts.
HTTP 范围请求具有多种优势,使其在涉及大型文件的情况下特别有用。
HTTP 范围请求的好处
通过允许客户端仅下载他们需要的文件部分,HTTP 范围请求提供了几个关键优势。
- 改进的用户体验:用户几乎可以立即开始使用内容,例如流式传输视频或音频文件,而无需等待整个文件下载。此外,他们可以跳到文件的不同部分,而无需完整下载文件。
- 更好的带宽管理:仅下载文件的必要部分,从而减少不必要的数据传输。这也允许在中断时从上次中断的位置继续下载。
- 服务器的可扩展性: 服务器可以通过仅提供资源的所需部分来更好地管理其负载,从而更有效地分配带宽和资源。
为了说明此过程在实际场景中的工作原理,请考虑流式传输歌曲的示例。
示例流:流式传输歌曲
当用户流式传输歌曲时,客户端和服务器会通过一系列请求和响应进行通信。
- 客户端请求音频启动:客户端首先请求音频文件的第一个块:
GET /audio.mp3 HTTP/1.1 Range: bytes=0-2047
- 服务器以部分内容响应:服务器发送文件的前 2048 个字节:
HTTP/1.1 206 Partial Content Content-Range: bytes 0-2047/100000
- 客户端请求下一个片段:当音频播放时,客户端请求下一个片段:
GET /audio.mp3 HTTP/1.1 Range: bytes=2048-4095
- Server Responds with Next Chunk(服务器使用 Next Chunk 响应):服务器发送文件的下一部分:
HTTP/1.1 206 Partial Content Content-Range: bytes 2048-4095/100000
- Client Seek to another part(客户端查找另一部分):如果用户向前跳,则 Client 请求文件的不同部分:
GET /audio.mp3 HTTP/1.1 Range: bytes=8192-10239
- Server Responds with the New Range(服务器使用新范围响应):服务器使用请求的部分进行响应:
HTTP/1.1 206 Partial Content Content-Range: bytes 8192-10239/100000
此示例凸显了 HTTP 范围请求如何助力实现高效且用户友好的流式传输,通过允许立即播放和更顺畅的文件传输,从而提供更流畅的用户体验。
解释了 HTTP Range 请求后,让我们用 Golang 编写实现。我们将构建一个 API 来为终端节点提供服务,然后编写一个函数来处理通过 HTTP Range Requests 进行的流式处理。
使用 Golang 构建流式处理服务
在本节中,我们将使用 Golang 构建流式处理服务。在项目的根目录中,创建一个名为 的新文件夹。此目录将包含用 Golang 编写的流式处理后端。
mkdir streaming-engine
cd streaming-engine
然后,在此目录中,运行以下行以创建 Golang 项目。
go mod init streaming-engine
然后安装所需的依赖项,例如 Mux、Sqlite3 驱动程序和 gorm 以与数据库交互。
go get github.com/gorilla/mux
go get gorm.io/driver/sqlite
go get gorm.io/gorm
安装完成后,创建一个名为 main.go
的文件(我们将在这个文件中放置后端逻辑内容)。
编写流式处理引擎后端逻辑
现在项目已经设置完成,我们可以着手为流式处理引擎编写代码了。首先,我们从必要的导入和基本结构体定义开始。
package main
import (
"fmt"
"io"
"log"
"net/http"
"strconv"
"strings"
"github.com/gorilla/mux"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)
var db *gorm.DB
var err error
// Song represents the song model in the existing music_song table
type Song struct {
ID uint `gorm:"column:id"`
Name string `gorm:"column:name"`
File string `gorm:"column:file"`
Author string `gorm:"column:author"`
Thumbnail string `gorm:"column:thumbnail"`
}
// TableName overrides the table name used by Gorm
func (Song) TableName() string {
return "music_song"
}
在上面的代码中,我们导入了所需的包,以帮助编写流处理程序函数和设置 API。我们还定义了变量,例如用于数据库初始化和跟踪整个应用程序中的错误。该结构体被定义为表示表中的歌曲模型。我们覆盖了 Gorm 使用的默认表名,以确保它正确映射到现有的数据库表。
接下来,我们继续编写用于数据库初始化的函数:
func initDB() {
// Initialize SQLite connection
db, err = gorm.Open(sqlite.Open("../backend/db.sqlite3"), &gorm.Config{})
if err != nil {
log.Fatal("Failed to connect to database:", err)
}
}
在之前的步骤中,我们定义了使用特定包的函数来初始化数据库连接。请确保 SQLite 数据库文件的路径与您的项目结构相匹配,如有需要,请进行相应调整。接下来,我们将编写一个名为 initDBgorm
的函数来打开数据库。
继续进行,我们还将编写一个将在流处理程序函数中使用的函数。
- 从请求 URL 中提取歌曲 ID:
func getSongID(r *http.Request) (int, error) {
params := mux.Vars(r)
id, err := strconv.Atoi(params["id"])
return id, err
}
在上面的代码中,我们使用 从 URL 参数中提取歌曲 ID。该函数将 ID 从字符串转换为整数,并返回 ID 以及遇到的任何错误。
- 从数据库中检索歌曲详细信息:
func getSongFromDB(id int) (Song, error) {
var song Song
err := db.First(&song, id).Error
return song, err
}
该函数查询数据库以检索给定 ID 的歌曲详细信息。它返回歌曲数据以及查询期间出现的任何错误。
- 从 URL 获取文件:
func fetchFile(fileURL string) (*http.Response, error) {
fullURL := "http://localhost:8000/media/" + fileURL
resp, err := http.Get(fullURL)
if err != nil || resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("file not found on the server")
}
return resp, nil
}
在上面的代码中,通过将 附加到基 URL 来构建媒体文件的完整 URL,并执行 HTTP GET 请求以检索文件。如果未找到文件或出现问题,它将返回响应或错误。
- 解析 Range 标头以获取文件的开始和结束字节:
func parseRangeHeader(rangeHeader string, fileSize int64) (int64, int64, error) {
bytesRange := strings.Split(strings.TrimPrefix(rangeHeader, "bytes="), "-")
start, err := strconv.ParseInt(bytesRange[0], 10, 64)
if err != nil {
return 0, 0, err
}
var end int64
if len(bytesRange) > 1 && bytesRange[1] != "" {
end, err = strconv.ParseInt(bytesRange[1], 10, 64)
if err != nil {
return 0, 0, err
}
} else {
end = fileSize - 1
}
if start > end || end >= fileSize {
return 0, 0, fmt.Errorf("invalid range")
}
return start, end, nil
}
在上面的代码中,我们解析HTTP请求的头,以确定客户端想要接收的文件的开始和结束字节。我们处理范围规范中的任何错误,并返回开始和结束字节位置。
- 将部分内容写入响应:
func writePartialContent(w http.ResponseWriter, start, end, fileSize int64, resp *http.Response) error {
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, fileSize))
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Length", strconv.FormatInt(end-start+1, 10))
w.Header().Set("Content-Type", "audio/mpeg")
w.WriteHeader(http.StatusPartialContent)
// Create a channel for the buffered data and a wait group for synchronization
dataChan := make(chan []byte)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
buffer := make([]byte, 1024) // 1KB buffer size
bytesToRead := end - start + 1
for bytesToRead > 0 {
n, err := resp.Body.Read(buffer)
if err != nil && err != io.EOF {
http.Error(w, "Error reading file", http.StatusInternalServerError)
return
}
if n == 0 {
break
}
if int64(n) > bytesToRead {
n = int(bytesToRead)
}
dataChan <- buffer[:n]
bytesToRead -= int64(n)
}
close(dataChan)
}()
go func() {
defer wg.Wait()
for chunk := range dataChan {
if _, err := w.Write(chunk); err != nil {
http.Error(w, "Error writing response", http.StatusInternalServerError)
return
}
}
}()
// Skip the bytes until the start position
io.CopyN(io.Discard, resp.Body, start)
return nil
}
在上面的代码中,我们设置了必要的HTTP头部以传递请求所需的信息,并处理了指定字节范围的并发读取与写入操作。为了确保数据流的高效性,我们运用了goroutines来同步进行数据的缓冲与写入。若在此过程中遭遇任何错误,这些错误将被捕获并以HTTP错误的形式返回。此外,我们还实现了writePartialContent
函数来处理部分内容的写入。
现在,我们可以在流处理程序函数中使用这些函数,并创建 API 服务器来为流式处理终结点提供服务。
// Handles streaming of the file via HTTP range requests
func streamHandler(w http.ResponseWriter, r *http.Request) {
id, err := getSongID(r)
if err != nil {
http.Error(w, "Invalid song ID", http.StatusBadRequest)
return
}
song, err := getSongFromDB(id)
if err != nil {
http.Error(w, "Song not found", http.StatusNotFound)
return
}
resp, err := fetchFile(song.File)
if err != nil {
http.Error(w, err.Error(), http.StatusNotFound)
return
}
defer resp.Body.Close()
fileSize := resp.ContentLength
rangeHeader := r.Header.Get("Range")
if rangeHeader == "" {
http.ServeFile(w, r, song.File)
return
}
start, end, err := parseRangeHeader(rangeHeader, fileSize)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := writePartialContent(w, start, end, fileSize, resp); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
func main() {
initDB()
r := mux.NewRouter()
r.HandleFunc("/songs/listen/{id}", streamHandler).Methods("GET")
log.Println("Server is running on port 8005")
log.Fatal(http.ListenAndServe(":8005", r))
}
streamHandler
函数负责处理流文件的HTTP范围请求。它首先通过提取歌曲ID来工作,接着从数据库中检索该歌曲的详细信息,并从指定的URL获取文件。之后,该函数会解析HTTP请求中的范围标头(如果存在),以确定需要流式传输文件的哪一部分。一旦确定了要传输的部分,该函数就会将这部分内容写入HTTP响应中。
在 main
函数中,我们对数据库进行了初始化设置,并配置了一个HTTP路由器来处理流媒体歌曲的请求。最后,我们在8005端口上启动了服务器。要在流引擎目录中启动服务器,运行以下命令以启动服务器。
go run .
我们现已编写完成Golang服务,允许客户端通过 /songs/listen/{id}
路径(其中 {id}
是歌曲的 ID)来流式传输歌曲。
现在服务已经编写完毕,我们必须对 Django 后端和前端进行一些调整。
修改 Backend 和 Frontend 以使用 Streaming 服务
后端修改:限制公开的字段
在Django后端,为了控制通过API暴露的数据,我们对API响应进行了更新,排除了某些字段。具体来说,我们更新了 fileSongSerializer
配置,故意省略了某个字段,以确保其不会暴露给客户端。
# music/serializers.py
class SongSerializer(serializers.ModelSerializer):
class Meta:
model = Song
fields = ['id', 'name', 'artist', 'duration', 'thumbnail']
前端调整:利用流式处理终结点
前端方面,我们调整了逻辑以利用新创建的流式处理终结点。该前端包能够自动处理流,因此无需手动管理。我们使用了 react-h5-audio-player
组件。
此外,我们还更新了 playSongapp.js
中的函数,以正确设置歌曲的URL。
// app.js
...
const playSong = (song) => {
setCurrentSong(`http://localhost:8005/songs/listen/${song.id}`);
};
...
通过此更新,前端使用更新的 API 自动流式传输音频,为用户提供无缝体验。
在编写完应用程序的最终版本后,我们来探讨一些可能的增强措施。
增强
在构建此应用程序及规划其架构时,我们已确保架构能够支持大量请求,从而实现可扩展性和可靠性。鉴于我们选择了较为简洁的编码方案,因此有必要说明在架构和编码方面可以进行的一些增强。
架构增强功能
当前架构中,流式处理服务组件位于服务器上。尽管流式传输是通过HTTP范围请求实现的,但我们也必须考虑带宽的使用情况。以下是对架构可能的增强措施:
- 将流式处理引擎组件移出服务器,并将其放在另一个具有缓存组件的服务器/域上。缓存很重要,因为流式处理引擎已连接到数据库。
- API Gateway 也将从服务器中删除。这将有助于根据 URL 将请求重定向到流式处理引擎或 API 服务器。
下面是这些更改后的体系结构的新关系图。
现在我们有了一个更好的架构建议,让我们来讨论编码增强。
编码增强
许多流媒体服务通过使用加密来保护传输期间的数据,这可以保护内容免受未经授权的访问和篡改。该过程通常包括两个主要步骤:
- 首先,在后端,内容在通过网络发送之前被加密。AES-128或AES-256等加密算法通常用于此目的。然后,加密的内容通过HTTP或HTTPS传递到客户端。根据流媒体格式的不同,加密可以应用于文件级别,也可以应用于单个段(如果内容是分块的)。
- 其次,在前端,客户端(如web播放器、移动的应用程序或智能电视)接收加密的内容,并使用后端提供的密钥对其进行解密。解密通常发生在浏览器或应用程序运行时,通过HTTPS或数字版权管理(DRM)技术等协议保护密钥交换。
加密流的示例
- HLS(HTTP Live Streaming)支持AES-128加密:HLS作为一种广泛应用的流协议,能够实现对媒体文件的分段处理,并对每个片段进行单独的AES-128加密。解密密钥被安全地存储在服务器上,客户端则通过安全的HTTPS连接来检索这些密钥。
- DASH(Dynamic Adaptive Streaming over HTTP)with Widevine DRM:DASH是另一种流行的流媒体协议,通常与Google Widevine等DRM系统配对。内容被加密,解密密钥由DRM系统管理,确保安全的密钥管理和许可。此设置仅允许授权客户端解密和播放内容。
- 带有SSL/TLS的RTMP(实时消息传递协议):RTMP用于低延迟流,可以使用SSL/TLS进行加密传输。尽管与HLS和DASH相比,RTMP的使用现在已不太普遍,但在某些直播场景中,它仍然具有一定的应用价值。
- DRM系统(如PlayReady或FairPlay):DRM系统(如Microsoft PlayReady和Apple FairPlay)是Netflix、Hulu和Apple TV等服务不可或缺的组成部分。这些系统对服务器上的内容进行加密,并通过许可服务器控制对解密密钥的访问,确保只有授权或付费用户才能访问内容。
这是一个可以考虑添加到流媒体引擎中的有趣步骤,以确保流媒体传输的安全性和可靠性。
结论
在本文中,我们利用Golang和HTTP范围请求开发了一个流媒体服务,并深入探讨了关键的架构改进方案,旨在提升应用程序的安全性和效率。
本系列的这一部分内容至此告一段落,但请您持续关注我们的下一期。在接下来的一期中,我们将把这里所提及的所有概念融入一个全球性的架构体系中。我们将详细阐述如何构建一个能够向全球用户提供服务的流媒体平台,同时确保其高性能表现。
如果您喜欢本文,请考虑订阅我们的时事通讯,以便您不会错过后续的更新内容。
您的反馈对我们来说至关重要!如果您有任何建议、批评或问题,请在下方留言。
让我们共同期待更多精彩内容的呈现!🚀