优秀的编程知识分享平台

网站首页 > 技术文章 正文

使用 Angular 和 Spring Boot 进行 WebRTC 视频通话

nanyue 2024-07-26 16:02:17 技术文章 14 ℃

这个深入的教程演示了如何在 TypeScript 中使用 WebRTC 向 Angular/Spring Boot 项目添加视频通话。


WebRTC 视频通话已添加到AngularPwaMessenger项目中。后端支持由 JWT 令牌保护的 WebSocket 连接以启用 WebRTC 信号。当前的浏览器支持使用 WebRTC 进行视频通话。Angular 前端支持 WebRTC 调用,需要访问摄像头和麦克风。浏览器需要能够直接相互连接并使用服务器后端来做到这一点。这意味着阻止传入连接的家庭/公司网络会阻止创建视频通话。在智能手机上安装 PWA 确实有效,因为没有路由器/防火墙会阻止连接。对于开发,使用带有自签名证书的设置,可以在防火墙网络内进行测试。

WebRTC 文档

Mozilla 开发网络有WebRTC文档。WebRTC 协议记录在这里AngularPwaMessenger 后端为 ICE 协议提供 STUN 服务器实现。此处记录了信令和视频通话。图表/代码显示了视频通话连接的创建。

后端

支持 ICE 协议的 STUN 实现在WebSocketConfig类中配置。

爪哇

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{
	private SignalingHandler socketHandler;
	private Environment environment;
	
	public WebSocketConfig(SignalingHandler socketHandler, 
           Environment environment) {
		this.socketHandler = socketHandler;
		this.environment = environment;
	}
	
	@Override
	public void registerWebSocketHandlers(WebSocketHandlerRegistry 
           registry) {
	   WebSocketHandlerRegistration handlerRegistration = 
              registry.addHandler(this.socketHandler, "/signalingsocket");
 	  if(List.of(this.environment.getActiveProfiles()).stream()
             .noneMatch(myProfile -> 
             myProfile.toLowerCase().contains("prod"))) {
		handlerRegistration.setAllowedOrigins("*");
	  }		
	}


Spring 具有在此实现中使用并使用@Configuration注释配置的 WebSocket 支持。@EnableWebSocket注释激活 Spring WebSocket 支持 。

SignalingHandler环境被注入到构造函数中。

该方法registerWebSocketHandlers(...)将 添加SignalingHandler到 path /signalingsocket。环境用于检查配置文件是否包含prod配置文件。对于开发, allowedOrigins(...)检查被禁用。

在类SocketHandler中实现SignalingHandler。消息在这里定义:

爪哇

...
private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
...
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message)
   throws InterruptedException, IOException {
   if (userRoleCheck(session)) {
      SenderReceiver senderReceiver = this.extractSenderReceiver(message);
      String sessionUsername = extractSessionUsername(session);
      for (WebSocketSession webSocketSession : this.sessions) {
         removeStaleSession(webSocketSession);
         String webSocketSessionUsername = 
         this.extractSessionUsername(webSocketSession);
         if (webSocketSession.isOpen() && 
            (checkSenderReceiver(senderReceiver, sessionUsername,
            webSocketSessionUsername) || 
            checkSenderLocalhostToken(senderReceiver, sessionUsername, 
            webSocketSessionUsername))) {		
            LOGGER.debug("Msg send with params: Msg sender: {}, Msg receiver: 
               {}, Session sender: {}, WebSocket receiver: {}",
               senderReceiver.sender, senderReceiver.receiver, 
               sessionUsername, webSocketSessionUsername);
            webSocketSession.sendMessage(message);
         }
      }
   } else {
      if (this.isTokenExpired(session)) {
         session.close();
      }
   }
}

...

@Override
public void afterConnectionEstablished(WebSocketSession session) 
   throws Exception {
   if (userRoleCheck(session)) {
      this.sessions.add(session);
   }
}

@Override
public void afterConnectionClosed(WebSocketSession session, 
   CloseStatus status) throws Exception {
   this.sessions.remove(session);
}


WebSocketSession列表CopyOnWriteArrayList用于确保在方法中添加或删除会话时支持并发写入afterConnectionEstablished(...)afterConnectionClosed(...)

handelTextMessage(...)方法用于处理 WebSocket 的消息。

登录用户的 JWT 令牌作为 URL 参数添加到 WebSocket 路径 ( /signalingsocket?token=abc...)。从会话中提取 JWT 令牌,解析令牌,检查令牌中的角色。

该方法extractSenderReceiver(...)从 JSON 消息中提取发送方/接收方记录。这是使用字符串函数完成的,以避免解析整个消息。

该方法extractSessionUsername(...)从会话的 JWT 令牌中获取用户名。

注册的会话被迭代:

  • 该方法removeStaleSession(...)关闭 JWT 令牌已超时的会话。
  • 该方法extractSessionUsername(...)获取会话的 JWT 令牌用户名。
  • 检查会话是否打开并且消息发送者/接收者与会话的发送者/接收者匹配。匹配会话发送消息。

如果 JWT 令牌的 userRole 不是User,则检查 JWT 令牌是否已超时以关闭此类会话。

前端

主要部件

Angular 前端使用 WebRTC 提供视频通话。如果有视频通话到达, main.component.ts会显示voice.component.ts 。该方法syncMsgs()在登录后间隔调用。

...
private offerMsgSub: Subscription;
...
private async syncMsgs(): Promise<void> {
   if ( this.ownContact && this.netConnectionService.connetionStatus && 
      !this.jwttokenService.localLogin ) {
      ...
      const result = await 
         this.voiceService.connect(this.jwttokenService.jwtToken);
      if(!!result) {
         this.webrtcService.addIncominMessageHandler();
         this.webrtcService.senderId = this.ownContact.name;
         this.webrtcService.receiverId = this?.selectedContact?.name;
         this.offerMsgSub = this.webrtcService.offerMsgSubject
           .pipe(filter(offerMsg => !!offerMsg.receiverId && 
              !!offerMsg.senderId)).subscribe(offerMsg => {
              this.selFeature = MyFeature.phone;
           });
      }
   }
}


offerMsgSub订阅在ngOnDestroy()方法 中被取消订阅。

connect(...)方法使用 JWT 令牌创建一个新的 WebSocket 连接,如果不存在,则返回 true。创建的连接会自动重新连接。初次连接后,结果为假。成功登录后创建连接,因为令牌用于保护 WebSocket 连接。

检查初始 WebSocket 连接后,初始化 WebRTCService。

addIncominMessageHandler()服务添加回调以接收 WebRTC 消息。

ThesenderId和 thereceiverId设置为唯一的名称。

然后offerMsgSubject订阅 WebRTCService 以在收到报价时显示 voice.component.ts。使用管道过滤带有 setreceiverId和的消息senderId

语音服务

voice.service.ts创建并维护与后端的 WebSocket 连接:

public async connect(jwtToken: string): Promise<boolean> {
   this.webSocketConnectionRequested = true;
   if (!this.socket$ || this.socket$.closed) {
      return Promise.resolve<WebSocketSubject<any>>
         (this.getNewWebSocket(jwtToken)).then<boolean>(mySocket => {
         this.socket$ = mySocket;
         // Called whenever there is a message from the server
         this.socket$.pipe(takeUntil(this.ngUnsubscribeMsg))
            .subscribe(msg => {
                   console.log('Received message of type: ' + msg.type);
                   this.messagesSubject.next(msg);
         });
         return true;
       });
    }
    return Promise.resolve(false);
  }

public disconnect(): void {
   this.webSocketConnectionRequested = false;
   this.ngUnsubscribeMsg.next();
   this.ngUnsubscribeMsg.unsubscribe();
}

public sendMessage(msg: VoiceMsg): void {
   console.log('sending message: ' + msg.type + 
      ' sid: '+msg.senderId +' remoteId: '+msg.receiverId);
   this.socket$.next(msg);
}

private getNewWebSocket(jwtToken: string): WebSocketSubject<any> {
   return webSocket({
      url: `${this.wsEndpoint}?token=${encodeURI(jwtToken)}`,
      openObserver: {
        next: () => {
          console.log('[DataService]: connection ok');
        }
      },
      closeObserver: {
        next: () => {
          console.log('[DataService]: connection closed');
          this.socket$ = undefined;
          if(!!this.webSocketConnectionRequested) {
            this.connect(jwtToken);
          }
        }
      }
   });
}


connect(...)方法将webSocketConnectionRequested属性设置为 true 以启用自动重新连接并检查 WebSocket 是 false 还是关闭。然后,创建一个新的 WebSocket 连接。

该方法getNewWebSocket(..)创建一个新的webSocket(...). 参数是带有端点的 URL 和 URI 编码的 JWT 令牌。openObserver记录套接字创建并将closeObserver套接字设置为未定义并检查是否激活了重新连接。

connect(...)方法继续将提供的 WebSocket 设置为socket$属性。该socket$属性获取一个用于取消订阅的管道takeUntil并订阅套接字以将消息发送到messagesSubject. 然后true被退回。

disconnect()方法将属性webSocketConnectionRequested设置false为禁用自动重新连接。ReplaySubject向管道ngUnsubscribeMsg发送信号以takeUntil取消订阅,然后取消订阅。

该方法sendMessage(...)使用 WebSocket 发送和记录消息。

语音组件

voice.component.ts 使用 video/audio HTML 标签播放远端视频和音频,并显示本地视频:

@Component({
  selector: 'app-voice',
  templateUrl: './voice.component.html',
  styleUrls: ['./voice.component.scss']
})
export class VoiceComponent implements OnInit, OnDestroy, AfterViewInit {
  @ViewChild('local_video') localVideo: ElementRef;
  @ViewChild('remote_video') remoteVideo: ElementRef;

  @Input()
  receiver: Contact;
  @Input()
  sender: Contact;

  localVideoActivated = false;
  remoteMuted = false;
  localMuted = false;
  onLocalhost: boolean;
  inCall = false;

  private localhostReceiver = '';
  private componentSubscribtions: Subscription[] = [];

  constructor(private voiceService: VoiceService, 
     private webrtcService: WebrtcService) {
     this.onLocalhost = this.voiceService.localhostCheck();
   }

   public ngAfterViewInit(): void {
      this.componentSubscribtions.push(this.webrtcService.offerMsgSubject
	   .pipe(filter(offerMsg => !!offerMsg.senderId && 
              !!offerMsg.receiverId), debounceTime(500))
	   .subscribe(offerMsg => this.handleOfferMessage(offerMsg)));
      this.componentSubscribtions.push(this.webrtcService.hangupMsgSubject
         .pipe(debounceTime(500))
  	 .subscribe(hangupMsg => this.handleHangupMessage(hangupMsg)));
      this.componentSubscribtions.push(this.webrtcService
         .remoteStreamSubject.subscribe(remoteStream => 
         this.handleRemoteStream(remoteStream)));
   }

   public ngOnDestroy(): void {
      this.componentSubscribtions.forEach(mySub => mySub.unsubscribe());
   }

  public ngOnInit(): void {
     this.localhostReceiver = this.sender.name + 
        this.voiceService.localHostToken;
     this.requestMediaDevices();
  }


@ViewChild注释通过 id 创建对视频 HTML 标记的引用。

组件获取receiver/sender对象作为输入参数。

该属性componentSubscribtions是方法中未订阅的订阅数组ngOnDestroy()

构造函数获取VoiceService用于WebSocket 连接的WebRTCService,以及注入WebRTC 协议的WebRTCService。

ngOnInit()方法调用requestMediaDevices()以访问可用Mediadevices并停止任何当前使用。

该方法ngAfterViewInit()使用 RxJS 管道对消息进行去抖动、订阅offerMsgSubject和处理方法中的消息handleOfferMessage(...)。挂断消息像报价消息一样处理,并在handleHangupMessage(...)方法中进行处理。消息的remoteStream处理方式与没有管道的报价消息一样,并在handleRemoteStream(...)方法中进行处理。

voice.component.ts 调用处理如下:

public async call(): Promise<void> {
   const peerConnectionContainer = this.webrtcService.createPeerConnection();
   this.voiceService.peerConnections
      .set(peerConnectionContainer.senderId, peerConnectionContainer);

    if (!this.localVideoActivated) {
      this.startLocalVideo();
    }

    this.webrtcService.localStream.getTracks().forEach(myTrack =>
       peerConnectionContainer.rtcPeerConnection
       .addTrack(myTrack, this.webrtcService.localStream));

    try {
      const offer = await this.voiceService.peerConnections
         .get(peerConnectionContainer.senderId)
         .rtcPeerConnection.createOffer(offerOptions);
      // Establish the offer as the local peer's current description.
      await peerConnectionContainer.rtcPeerConnection
         .setLocalDescription(new RTCSessionDescription(offer));

      this.inCall = true;

      this.voiceService.sendMessage({type: VoiceMsgType.offer, 
         senderId: peerConnectionContainer.senderId, 
         receiverId: peerConnectionContainer.receiverId, data: offer});
    } catch (err) {
      this.handleGetUserMediaError(err, peerConnectionContainer.senderId);
    }
  }
}

private handleRemoteStream(remoteStream: MediaStream): void {
  console.log('remote mediastream handled: ' + remoteStream.id);
  if(!!this.remoteVideo.nativeElement.srcObject) {
    remoteStream.getTracks().forEach(myTrack =>
	  this.remoteVideo.nativeElement.srcObject.addTracks(myTrack));
  } else {
    this.remoteVideo.nativeElement.srcObject = remoteStream;
  }
  this.remoteMuted = false;
}

public hangUp(): void {
  this.voiceService.sendMessage({type: VoiceMsgType.hangup, 
     senderId: this.sender.name, receiverId: this.onLocalhost ? 
        this.localhostReceiver : this.receiver.name, data: ''});
  this.closeVideoCall();
}

private closeVideoCall(): void {
  console.log('Closing call');

  this.voiceService.peerConnections.forEach((container, sid) => {
    console.log('--> Closing the peer connection');

    container.rtcPeerConnection.ontrack = null;
    container.rtcPeerConnection.onicecandidate = null;
    container.rtcPeerConnection.oniceconnectionstatechange = null;
    container.rtcPeerConnection.onsignalingstatechange = null;

    // Stop all transceivers on the connection
    container.rtcPeerConnection.getTransceivers().forEach(transceiver => {
      transceiver.stop();
    });

    // Close the peer connection
    container.rtcPeerConnection.close();
  });
  this.voiceService.peerConnections.clear();
  this.voiceService.pendingCandidates.clear();
  this.stopLocalVideo();
  this.remoteMuted = true;
  this.remoteVideo.nativeElement.srcObject = null;
  this.inCall = false;
} 


要开始视频通话,请使用该call()方法。它使用 WebRTCService 创建一个 PeerConnectionContainer,用于存储创建调用所需的所有信息。PeerConnectionContainer 被添加到peerConnectionsVoiceService 中。该方法startLocalVideo()初始化本地视频。然后将 的曲目localStream添加到peerConnectionContainer.

然后使用创建报价,RtcConnectionLocalDescription在连接中创建/设置。报价是通过 JSON 中的 WebSocket 与 VoiceService 一起发送的,如下例所示:

{
 type: VoiceMsgType.offer, //Messagetype
 senderId: peerConnectionContainer.senderId, //the userId of the current user
 receiverId: peerConnectionContainer.receiverId, //the userId of the receiver
 data: offer //the offer that has been created with the RtcPeerConnection
}


handleRemoteStream(...)方法用于将 Answer Message 的 remoteStream 设置为 remoteVideo 对象的源对象并取消静音。

hangup()方法使用 VoiceService 发送挂断消息并调用closeVideoCall()迭代 peerConnections 并关闭收发器/连接并删除回调。然后连接和未决候选被清除,本地/远程视频停止。

结论:STUN 服务和前端

Spring 对 WebSocket 连接有很好的支持,可以通过 STUN 服务器提供对 WebRTC 信令的支持。Angular 前端的这一部分是视频通话的集成,这是由于 Angular 组件/服务得到了很好的支持。

带信令的 WebRTC 协议

现在讨论 AngularPwaMessenger 项目的 WebRTC/Signaling 实现。

WebRTC 协议

应用程序需要支持的信令在之前链接的 MDN 文章中进行了描述。基本上是这样的:

首先,发送者需要创建一个offer消息并通过STUN服务器发送给接收者。接收方必须处理报价并为报价创建一个答案,然后通过 STUN 服务器将其发送给发送方。然后建立连接。

发送方需要为协议协商创建一个 ICE 候选消息,并通过 STUN 服务器发送。接收方必须处理 ICE 候选消息并创建响应消息以通过 STUN 服务器发送给发送方。

WebRTC服务

webrtc.service.ts管理发送者和接收者的消息传递。它创建连接并提供回调来处理消息。

服务设置

要初始化 webrtc.service.ts,addIncomingMessageHandler()需要调用该方法:

public async addIncomingMessageHandler(): Promise<void> {
	console.log('Message Handler added');
	await this.requestMediaDevices();
    this.voiceService.messages.subscribe(
      msg => {
        console.log('Received message: ' + msg.type);
        // console.log(msg);
        switch (msg.type) {
          case VoiceMsgType.offer:
            this.handleOfferMessage(msg);
            break;
          case VoiceMsgType.answer:
            this.handleAnswerMessage(msg);
            break;
          case VoiceMsgType.hangup:
            this.handleHangupMessage(msg);
            break;
          case VoiceMsgType.iceCandidate:
            this.handleICECandidateMessage(msg);
            break;
          default:
            console.log('unknown message of type ' + msg.type);
        }
      },
      error => console.log(error)
    );
  }


该方法requestMediaDevices()初始化localStream。

订阅 VoiceService 的消息以处理通过 WebSocket 从接收方返回的消息。具有VoiceMsg类型枚举以显示消息类型。交换机调用消息处理程序来处理消息类型并记录未知类型的消息。

 public createPeerConnection(): RTCPeerConnectionContainer {
    console.log('creating PeerConnection...');
    const peerConnection = new RTCPeerConnection(environment.RTCPeerConfiguration);
    //const senderId = window.crypto.randomUUID();
    const senderId = this.senderId;
    const receiverId = this.onLocalhost ? this.localhostReceiver : this.receiverId;

    peerConnection.onicecandidate = this.handleICECandidateEvent;
    peerConnection.oniceconnectionstatechange =  
       this.handleICEConnectionStateChangeEvent;
    peerConnection.onsignalingstatechange = this.handleSignalingStateChangeEvent;
    peerConnection.ontrack = this.handleTrackEvent;
    const container = new RTCPeerConnectionContainer(senderId, 
       receiverId, peerConnection);
    return container;
  }


该方法使用、和来createPeerConnection()创建一个。获取设置为处理连接事件的回调函数。RTCPeerConnectionContainersenderIdreceiverIdpeerConnectionpeerConnection

private handleOfferMessage(msg: VoiceMsg): void {
   console.log('handle incoming offer sid:: '+msg.senderId);
   const peerConnectionContainer = this.createPeerConnection();
   peerConnectionContainer.receiverId = msg.senderId;
   peerConnectionContainer.senderId = this.onLocalhost ? this.localhostReceiver : 
      peerConnectionContainer.senderId;
   this.voiceService.peerConnections.set(peerConnectionContainer.senderId, 
      peerConnectionContainer);

   this.localStream.getTracks().forEach(myTrack => !!peerConnectionContainer
      && peerConnectionContainer?.rtcPeerConnection?.addTrack(myTrack, 
      this.localStream));

   this.voiceService.peerConnections.get(peerConnectionContainer.senderId)
      .rtcPeerConnection.setRemoteDescription(new RTCSessionDescription(msg.data))
      .then(() =>
         // Build SDP for answer message      
         this.voiceService.peerConnections.get(peerConnectionContainer.senderId)
            .rtcPeerConnection.createAnswer()        
      ).then((answer) =>
        // Set local SDP
         this.voiceService.peerConnections.get(peerConnectionContainer.senderId)
            .rtcPeerConnection.setLocalDescription(answer).then(() => answer)
      ).then(answer => {
        // Send local SDP to remote part
         this.voiceService.sendMessage({type: VoiceMsgType.answer, senderId:  
            peerConnectionContainer.senderId,
            receiverId: peerConnectionContainer.receiverId, data: answer} as VoiceMsg);
         this.offerMsgSubject.next(msg);
      }).catch(e => this.reportError(e));
}


为了处理方法的提议,创建handleOfferMessage(..)了一个新方法PeerConnetionContainer。它可以设置receiverIdsenderId创建响应。将PeerConnectionContainer添加到VoiceService.peerConnections地图中。

然后将 localStream 的轨道添加到PeerConnectionContainer.rtcPeerConnection.

然后,使用以下步骤创建应答消息:

  • 添加报价消息的PeerConnectionContainer.rtcPeerConnectionremoteDescription。
  • PeerConnectionContainer.rtcPeerConnection用于创建答案对象。
  • 答案对象作为 localDescription 添加到PeerConnectionContainer.rtcPeerConnection.
  • VoiceService 用于通过 WebSocket 发送 answer 消息,并将 offer 消息添加到offerMsgSubject以使前端能够对 offer 做出反应。
private handleAnswerMessage(msg: VoiceMsg): void {
   console.log('handle incoming answer sid: ' +msg.receiverId);
   if(this.voiceService.peerConnections.get(msg.receiverId).rtcPeerConnection
      .signalingState !== 'stable') {
   this.voiceService.peerConnections.get(msg.receiverId).rtcPeerConnection
      .setRemoteDescription(new RTCSessionDescription(msg.data))
      .then(() => console.log('answer handled'));
   }
}

private handleHangupMessage(msg: VoiceMsg): void {
   console.log(msg);
   this.hangupMsgSubject.next(msg);
}


该方法handleAnswerMessage(...)获取答案消息并检查 是否不稳定以忽略重复项rtcPeerConnection.signalingStatePeerConnectionContainer然后PeerConnectionContainer.rtcPeerConnection根据应答消息添加了 remoteDescription。

该方法handleHangupMessage(...)将消息添加到hangupMsgSubject以使前端能够做出反应。


private handleICECandidateMessage(msg: VoiceMsg): void {
   console.log('ICECandidateMessage sid: '+msg.senderId+' remoteId: '+msg.receiverId);
   if (!!this.voiceService.peerConnections.get(msg.receiverId).rtcPeerConnection
      ?.currentRemoteDescription) {
      this.voiceService.peerConnections.get(msg.receiverId).rtcPeerConnection
         .addIceCandidate(new RTCIceCandidate(msg.data)).catch(this.reportError);
    } else {
      if (!this.voiceService.pendingCandidates.get(msg.receiverId)) {
         this.voiceService.pendingCandidates.set(msg.receiverId, 
            [] as RTCIceCandidateInit[]);
      }
      this.voiceService.pendingCandidates.get(msg.receiverId).push(msg.data);
   }
}


为了处理 ICECandidate 消息,检查方法handleICECandidateMessage(...)rtcPeerConnection,然后将新的 ICECandidate 添加到rtcPeerConnection. 如果检查失败,则将 ICECandidate 消息添加到pendingCandidatesreceiverId 的映射中的数组中。

private handleICECandidateEvent = (event: RTCPeerConnectionIceEvent) => {
   if (event.candidate && this.voiceService.peerConnections
      .get(this.getEventSid(event))?.receiverId) {      
      this.voiceService.sendMessage({
        type: VoiceMsgType.iceCandidate,
        senderId: this.getEventSid(event),
        receiverId: this.voiceService.peerConnections
           .get(this.getEventSid(event)).receiverId,
        data: event.candidate
      });
   }


此函数处理由回调RTCPeerConnectionIceEvent生成的 s 。RtcPeerConnection.onicecandidate消息VoiceService通过 WebSocket 发送。senderId 和receiverId 是根据RtcConnection 设置的。event.candidate是 RtcConnection 生成的消息数据。

其他回调正在检查是否应该关闭连接。

本地测试

要使用本地网络上的 2 个设备测试语音呼叫,需要使用ssl配置文件启动 Spring Boot 应用程序。该配置文件使用“ resources/testCert/server.p12 ”文件中的 SSL 证书。创建证书的步骤可以在addIngress.sh脚本中找到。运行后端的服务器的 DNS 名称必须放在cert.conf和server.conf中。必须将 server.p12 复制到资源中,并且必须将 rootCa.pem 作为用于测试的浏览器中的权限导入。

结论

接收到的 WebSocket 消息和 WebRTCConnection 回调的处理封装在此服务中。消息处理使用 TypeScript 来帮助处理属性和方法参数。

谢谢大家阅读,喜欢的朋友请关注点赞转发,带你了解最新技术趋势。



最近发表
标签列表