當前位置:
首頁 > 最新 > 全棧開發——動手打造屬於自己的直播間

全棧開發——動手打造屬於自己的直播間

前言

大學的學習時光臨近尾聲,感嘆時光匆匆,三年一晃而過。同學們都忙著找工作,我也在這裡拋一份簡歷吧,歡迎各位老闆和獵手誠邀。我們進入正題。直播行業是當前火熱的行業,誰都想從中分得一杯羹,直播養活了一大批人,一個平台主播粗略估計就有幾千號人,但是實時在線觀看量有的居然到了驚人的百萬級別,特別是遊戲主播,可想而知,直播間是一個磁鐵式的廣告傳播媒介,也難怪這麼多巨頭公司都搶著做直播。我不太清楚直播行業技術有多深,畢竟自己沒做過,但是咱們可以自己實現一個滿足幾百號人同時觀看的直播間呀。

最終成果

手機端效果

GIF/2611K

這個場景很熟悉吧~~ 通過obs推流軟體來推流。

戶外直播,通過yasea手機端推流軟體,使用手機攝像頭推流。

電腦端效果

播放香港衛視

直播畫面

項目總覽

項目分為三個部分:

客戶端直播間視頻拉流、播放和聊天室,炫酷的彈幕以及直播間信息

服務端處理直播間、用戶的數據業務,聊天室消息的處理

伺服器部署視頻伺服器和web伺服器

技術棧移動客戶端

VUE全家桶

UI層vonic

axios

websocket客戶端: vue-stomp

彈幕插件: vue-barrage

打包工具:webpack

電腦端客戶端服務端

IDE: IntelliJ IDEA

項目架構: SpringBoot1.5.4 +Maven3.0

主資料庫: Mysql5.7

輔資料庫: redis3.2

資料庫訪問層: spring-boot-starter-data-jpa + spring-boot-starter-data-redis

websocket: spring-boot-starter-websocket

消息中間件: RabbitMQ/3.6.10

伺服器部署

視頻直播模塊: nginx-rtmp-module

web應用伺服器: tomcat8.0

伺服器: 騰訊雲centos6.5

技術點講解直播間主要涉及到兩個主要功能:第一是視頻直播、第二是聊天室。這兩個都是非常講究實時性。視頻直播

說到直播我們先了解下幾個常用的直播流協議,看了挺多的流媒體協議文章博客,但都是非常粗略,這裡有個比較詳細的流媒體協議介紹,如果想詳細了解協議內容估計去要看看專業書籍了。這裡我們用到的只是rtmp和hls,實踐後發現:rtmp只能夠在電腦端播放,hls只能夠在手機端播放。而且rtmp是相當快的儘管沒有rtsp那麼快,延遲只有幾秒,我測試的就差不多2-5秒,但是hls大概有10幾秒。所以如果你體驗過demo,就會發現手機延遲比較多。

直播的流程:

直播分為推流和拉流兩個過程,那麼流推向哪裡,拉流又從哪裡拉取呢?那當然需要視頻伺服器啦,千萬不要以為視頻直播伺服器很複雜,其實在nginx伺服器中一切都變得簡單。後面我會講解如何部署Nginx伺服器並配置視頻模塊(nginx-rtmp-module).

首先主播通過推流軟體,比如OBS Studio推流軟體,這個是比較專業級別的,很多直播平台的推薦主播使用這個軟體來推送視頻流,這裡我也推薦一個開源的安卓端推流工具Yasea,下載地址,文件很小,但是很強大。

聊天室

直播間裡面的聊天室跟我們的群聊天差不多,只不過它變成了web端,web端的即時通信方案有很多,這裡我們選擇websocket協議來與服務端通信,websocket是基於http之上的傳輸協議,客戶端向服務端發送http請求,並攜帶Upgrade:websocket升級頭信息表示轉換websocket協議,通過與服務端握手成功後就可以建立tcp通道,由此來傳遞消息,它與http最大的差別就是,服務端可以主動向客戶端發送消息。

既然建立了消息通道,那我們就需要往通道里發消息,但是總得需要一個東西來管控消息該發給誰吧,要不然全亂套了,所以我們選擇了消息中間件RabbitMQ.使用它來負責消息的路由去向。

拉取伺服器的直播視頻流(hls)並播放直播畫面

與服務端創建websocket連接,收發聊天室消息

通過websocket獲取消息並發送到彈幕

通過websocket實時更新在線用戶

結合服務端獲取訪問歷史記錄

問題反饋模塊

效果圖

GIF/507K

項目說明服務端實操

由於個人比較喜歡接觸新的東西,所以後端選擇了springboot,前端選擇了Vue.js年輕人嘛總得跟上潮流。SpringBoot實踐過後發現真的太省心了,不用再理會各種配置文件,全自動化裝配。

這裡貼一下pom.xml

4.0.0 com.hushangjie rtmp-demo 0.0.1-SNAPSHOT jar rtmp-demo Demo project for Spring Boot org.springframework.boot spring-boot-starter-parent 1.5.4.RELEASE UTF-8 UTF-8 1.8 org.springframework.boot spring-boot-devtools true org.springframework.boot spring-boot-starter-actuator org.springframework.boot spring-boot-actuator-docs org.springframework.boot spring-boot-starter-data-jpa org.springframework.boot spring-boot-starter-data-redis org.springframework.boot spring-boot-starter-thymeleaf net.sourceforge.nekohtml nekohtml 1.9.22 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-websocket org.springframework.boot spring-boot-starter-test test org.webjars vue 2.1.3 mysql mysql-connector-java joda-time joda-time 2.9.2 io.projectreactor reactor-core 2.0.8.RELEASE io.projectreactor reactor-net 2.0.8.RELEASE io.netty netty-all 4.1.6.Final org.springframework.boot spring-boot-maven-plugin true

application.properties文件

spring.datasource.url=jdbc:mysql://host:3306/database?characterEncoding=utf8&useSSL=false spring.datasource.username=username spring.datasource.password=password spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.thymeleaf.mode=LEGACYHTML5 server.port=8085 # REDIS (RedisProperties) # Redis資料庫索引(默認為0) spring.redis.database=0 # Redis伺服器地址 spring.redis.host=127.0.0.1 # Redis伺服器連接埠 spring.redis.port=6379 # Redis伺服器連接密碼(默認為空) spring.redis.password= # 連接池最大連接數(使用負值表示沒有限制) spring.redis.pool.max-active=8 # 連接池最大阻塞等待時間(使用負值表示沒有限制) spring.redis.pool.max-wait=-1 # 連接池中的最大空閑連接 spring.redis.pool.max-idle=8 # 連接池中的最小空閑連接 spring.redis.pool.min-idle=0 # 連接超時時間(毫秒) spring.redis.timeout=0websocket配置@Configuration @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { //攔截器注入service失敗解決辦法 @Bean public MyChannelInterceptor myChannelInterceptor(){ return new MyChannelInterceptor(); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { //添加訪問域名限制可以防止跨域socket連接 //setAllowedOrigins("http://localhost:8085") registry.addEndpoint("/live").setAllowedOrigins("*").addInterceptors(new HandShkeInceptor()).withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { /*.enableSimpleBroker("/topic","/queue");*/ //假如需要第三方消息代理,比如rabitMQ,activeMq,在這裡配置 registry.setApplicationDestinationPrefixes("/demo") .enableStompBrokerRelay("/topic","/queue") .setRelayHost("127.0.0.1") .setRelayPort(61613) .setClientLogin("guest") .setClientPasscode("guest") .setSystemLogin("guest") .setSystemPasscode("guest") .setSystemHeartbeatSendInterval(5000) .setSystemHeartbeatReceiveInterval(4000); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { ChannelRegistration channelRegistration = registration.setInterceptors(myChannelInterceptor()); super.configureClientInboundChannel(registration); } @Override public void configureClientOutboundChannel(ChannelRegistration registration) { super.configureClientOutboundChannel(registration); } }

配置類繼承了消息代理配置類,意味著我們將使用消息代理rabbitmq.使用registerStompEndpoints方法註冊一個websocket終端連接。這裡我們需要了解兩個東西,第一個是stomp和sockjs,sockjs是啥呢,其實它是對於websocket的封裝,因為如果單純使用websocket的話效率會非常低,我們需要的編碼量也會增多,而且如果瀏覽器不支持websocket,sockjs會自動降級為輪詢策略,並模擬websocket,保證客戶端和服務端可以通信。

stomp有是什麼看這裡

stomp是一種簡單(流)文本定向消息協議,它提供了一個可互操作的連接格式,允許STOMP客戶端與任意STOMP消息代理(Broker)進行交互,也就是我們上面的RabbbitMQ,它就是一個消息代理。

我們可以通過configureMessageBroker來配置消息代理,需要注意的是我們將要部署的伺服器也應該要有RabbitMQ,因為它是一個中間件,安裝非常容易,這裡就不說明了。這裡我們配置了「/topic,/queue」兩個代理轉播策略,就是說客戶端訂閱了前綴為「/topic,/queue」頻道都會通過消息代理(RabbitMQ)來轉發。跟spring沒啥關係啦,完全解耦。

websocke如何保證安全

一開始接觸 stomp的時候一直有個問題困擾我,客戶端只要與服務端通過websocket建立了連接,那麼他就可以訂閱任何內容,意味著可以接受任何消息,這樣豈不是亂了套啦,於是我翻閱了大量博客文章,很多都是官方的例子並沒有解決實際問題。經過琢磨,其實websocket是要考慮安全性的。具體在以下幾個方面

跨域websocket連接

協議升級前握手攔截器

消息信道攔截器

對於跨域問題,我們可以通過setAllowedOrigins方法來設置可連接的域名,防止跨站連接。

對於站內用戶是否允許連接我們可以如下配置

public class HandShkeInceptor extends HttpSessionHandshakeInterceptor { private static final Set ONLINE_USERS = new HashSet(); @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { System.out.println("握手前"+request.getURI()); //http協議轉換websoket協議進行前,通常這個攔截器可以用來判斷用戶合法性等 //鑒別用戶 if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; //這句話很重要如果getSession(true)會導致移動端無法握手成功 //request.getSession(true):若存在會話則返回該會話,否則新建一個會話。 //request.getSession(false):若存在會話則返回該會話,否則返回NULL //HttpSession session = servletRequest.getServletRequest().getSession(false); HttpSession session = servletRequest.getServletRequest().getSession(); UserEntity user = (UserEntity) session.getAttribute("user"); if (user != null) { //這裡只使用簡單的session來存儲用戶,如果使用了springsecurity可以直接使用principal return super.beforeHandshake(request, response, wsHandler, attributes); }else { System.out.println("用戶未登錄,握手失敗!"); return false; } } return false; } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception ex) { //握手成功後,通常用來註冊用戶信息 System.out.println("握手後"); super.afterHandshake(request, response, wsHandler, ex); } }

HttpSessionHandshakeInterceptor 這個攔截器用來管理握手和握手後的事情,我們可以通過請求信息,比如token、或者session判用戶是否可以連接,這樣就能夠防範非法用戶。

那如何限制用戶只能訂閱指定內容呢?我們接著往下看

public class MyChannelInterceptor extends ChannelInterceptorAdapter { @Autowired private StatDao statDao; @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public boolean preReceive(MessageChannel channel) { System.out.println("preReceive"); return super.preReceive(channel); } @Override public Message preSend(Message message, MessageChannel channel) { StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); //檢測用戶訂閱內容(防止用戶訂閱不合法頻道) if (StompCommand.SUBSCRIBE.equals(command)) { //從資料庫獲取用戶訂閱頻道進行對比(這裡為了演示直接使用set集合代替) Set subedChannelInDB = new HashSet(); subedChannelInDB.add("/topic/group"); subedChannelInDB.add("/topic/online_user"); if (subedChannelInDB.contains(accessor.getDestination())) { //該用戶訂閱的頻道合法 return super.preSend(message, channel); } else { //該用戶訂閱的頻道不合法直接返回null前端用戶就接受不到該頻道信息。 return null; } } else { return super.preSend(message, channel); } } @Override public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex) { //System.out.println("afterSendCompletion"); //檢測用戶是否連接成功,搜集在線的用戶信息如果數據量過大我們可以選擇使用緩存資料庫比如redis, //這裡由於需要頻繁的刪除和增加集合內容,我們選擇set集合來存儲在線用戶 StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); StompCommand command = accessor.getCommand(); if (StompCommand.SUBSCRIBE.equals(command)){ Map map = (Map) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.add(map.get("user")); UserEntity user = map.get("user"); if(user != null){ statDao.pushOnlineUser(user); Guest guest = new Guest(); guest.setUserEntity(user); guest.setAccessTime(Calendar.getInstance().getTimeInMillis()); statDao.pushGuestHistory(guest); //通過websocket實時返回在線人數 this.simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } //如果用戶斷開連接,刪除用戶信息 if (StompCommand.DISCONNECT.equals(command)){ Map map = (Map) accessor.getHeader("simpSessionAttributes"); //ONLINE_USERS.remove(map.get("user")); UserEntity user = map.get("user"); if (user != null){ statDao.popOnlineUser(user); simpMessagingTemplate.convertAndSend("/topic/online_user",statDao.getAllUserOnline()); } } super.afterSendCompletion(message, channel, sent, ex); } }

在stomp裡面,Channel信道就是消息傳送的通道,客戶端與服務端建立了連接就相當於建立了通道,以後的信息就是通過這個通道來傳輸。所有的消息都有消息頭,被封裝在了spring 的messag介面中,比如建立連接時候消息頭就含有CONNECT,當然還有一些其他的信息。客戶端訂閱的時候也有訂閱頭信息SUBSCRIBE,那麼我是不是可以在這個攔截器ChannelInterceptorAdapter 中攔截每個人的訂閱信息,然後與資料庫的信息作比對,最後決定這個用戶是否可以訂閱這個頻道的信息呢,對的,這是我的想法,按照這樣的思路,做單聊不是迎刃而解了嗎。

那客戶端通過websocket發送的消息如何到達訂閱者手中呢,按照rabbitmq的規則,訂閱者屬於消費者,發送消息的一方屬於生產者,生產者通過websocket把消息發送到服務端,服務端通過轉發給消息代理(rabbitmq),消息代理負責存儲消息,管理髮送規則,推送消息給訂閱者,看下面的代碼

@MessageMapping(value = "/chat") @SendTo("/topic/group") public MsgEntity testWst(String message , @Header(value = "simpSessionAttributes") Map session){ UserEntity user = (UserEntity) session.get("user"); String username = user.getRandomName(); MsgEntity msg = new MsgEntity(); msg.setCreator(username); msg.setsTime(Calendar.getInstance()); msg.setMsgBody(message); return msg; }

@MessageMapping看起來跟springmvc方法特別像,它即可以用在類級別上也可以用在方法級別上

當發送者往『/chat』發送消息後,服務端接受到消息,再發送給「/topic/group」的訂閱者,@SendTo就是發送給誰,這裡需要注意的有,如果我們沒有配置消息代理,只使用了enableSimpleBroker("/topic","/queue")簡單消息代理,那麼就是直接發送到消息訂閱者,如果配置了消息代理,那還要通過消息代理,由它來轉發。

如果我們想在服務端隨時發送消息,而不是在客戶端發送(這樣的場景很常見,比如發送全局通知),可以使用SimpMessagingTemplate類,通過注入該bean,在合適的業務場景中發送消息。

Redis統計數據

直播間經常需要統計數據,比如實時在線人數,訪問量,貢獻排行榜,訂閱量。我選擇的方案是使用redis來計數,儘管這個demo可能不會太多人訪問,但是我的目的是學習如何使用redis

先看springboot中redis的配置

@Configuration public class RedisConfig extends CachingConfigurerSupport{ /** * 生成key的策略 * * @return */ @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } return sb.toString(); } }; } /** * 管理緩存 * * @param redisTemplate * @return */ @SuppressWarnings("rawtypes") @Bean public CacheManager cacheManager(RedisTemplate redisTemplate) { RedisCacheManager rcm = new RedisCacheManager(redisTemplate); //設置緩存過期時間 // rcm.setDefaultExpiration(60);//秒 //設置value的過期時間 Map map=new HashMap(); map.put("test",60L); rcm.setExpires(map); return rcm; } /** * RedisTemplate配置 * @param factory * @return */ @Bean public RedisTemplate redisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer);//如果key是String 需要配置一下StringSerializer,不然key會亂碼 /XX/XX template.afterPropertiesSet(); //template.setStringSerializer(); return template; } }

redis數據統計Dao的實現

@Repository public class StatDao { @Autowired RedisTemplate redisTemplate; public void pushOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().add("OnlineUser",userEntity); } public void popOnlineUser(UserEntity userEntity){ redisTemplate.opsForSet().remove("OnlineUser" ,userEntity); } public Set getAllUserOnline(){ return redisTemplate.opsForSet().members("OnlineUser"); } public void pushGuestHistory(Guest guest){ //最多存儲指定個數的訪客 if (redisTemplate.opsForList().size("Guest") == 200l){ redisTemplate.opsForList().rightPop("Guest"); } redisTemplate.opsForList().leftPush("Guest",guest); } public List getGuestHistory(){ return redisTemplate.opsForList().range("Guest",0,-1); } }

Dao層非常簡單,因為我們只需要統計在線人數和訪客。但是在線人數是實時更新的,既然我們使用了websocket實時數據更新就非常容易了,前面我們講過,通過信道攔截器可以攔截連接,訂閱,斷開連接等等事件信息,所以我們就可以當用戶連接時存儲在線用戶,通過websocket返回在線用戶信息。

由於這個項目有移動端和電腦端,所以需要根據請求代理UserAgent來判斷客戶端屬於哪一種類型。這個工具類在源碼上有。我就不貼了。

伺服器部署

說了這麼多即時通信,卻沒發現視頻直播。不要著急我們馬上進入視頻環節。文章開頭就說明了幾種媒體流協議,這裡不講解詳細的協議流程,只需要知道,我們是通過推流軟體採集視頻信息,如何採集也不是我們關注的。採集到信息後通過軟體來推送到指定的伺服器,如下圖

obs推流設置

yasea手機端推流設置

紅色部分是伺服器開放的獲取流介面。

Nginx-rtmp-module配置

視頻伺服器有很多,也支持很多媒體流協議。這裡我們選擇nginx-rtmp-module來做視頻服務,接下來我們需要在linux下安裝nginx,並安裝rtmp模塊。本人也是linux初學者,一步步摸索著把伺服器搭建好,聽說tomcat和nginx很配哦,所以作為免費開源的當然首選這兩個。

接下來需要在linux安裝一下軟體和服務。

Nginx以及Nginx-rtmp-module

Tomcat

Mysql

Redis

RabbitMQ

安裝步驟我就不說了,大家搜索一下啦,這裡貼一下nginx.conf文件配置

上面配置了location 指向/hls,別名是/yjdata/www/www/live/hls/,所以可以在前端直接通過域名+/hls/+文件名.m3u8獲取直播視頻。

關於nginx的配置還有很多,我也在學習當中。總而言之nginx非常強大。

總結

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 推酷 的精彩文章:

Banner設計好不好?看這5點就夠了
三星 Galaxy S8/S8+圖賞:不只有讓人「中毒」的「全視曲面屏」
Airy3D研發出新型3D圖像感測器,捕捉深度信息不再需要多個鏡頭
Google 為懶得扭頭看的用戶推出了新的 VR 視頻格式
谷歌VR新力作居然是個「殘廢品」,而且還將再一次引領行業潮流?

TAG:推酷 |

您可能感興趣

大眾將與蘋果一起開發自動自動駕駛車輛
傳育碧也在開發吃雞遊戲 將由《全境封鎖》開發商打造
微軟開發智能耳機 能夠自動暫停播放
蘋果牽手大眾,合作開發自動駕駛員工班車
傳蘋果將聯合大眾開發自動駕駛電動通勤車
用遊戲開發遊戲,玩家高手將《我的世界》變成開發工具
戴姆勒開發新電動自動駕駛汽車,可在貨運車和客車之間切換
完全自主獨立:華為正在開發自己的操作系統
谷歌在可穿戴領域動作頻頻 又將開發三款智能手錶?
這些動作能促進寶寶的大腦開發
「錸」真的有助於航空發動機的開發嗎?
現代軟體開發:銷售催產品,產品催開發,開發催測試
蘋果開發者大會時間敲定 阿里自動駕駛團隊浮出水面
六種方法讓自動化安全成為開發人員的得力助手
嘗試竭盡全力去開發自己的潛能
道路自動駕駛算什麼?路虎想要玩更難的,開發自動越野技術!
韓國計劃在當地開發導彈,用於未來自製的噴氣式飛機
戴姆勒開發模塊化電動自動駕駛汽車,能載人也能運貨
三星開發自動駕駛技術 欲做平台提供商
賣不動!「安卓之父」或停止開發智能手機並出售公司