这个深入的教程演示了如何在 TypeScript 中使用 WebRTC 向 Angular/Spring Boot 项目添加视频通话。
WebRTC 视频通话已添加到AngularPwaMessenger项目中。后端支持由 JWT 令牌保护的 WebSocket 连接以启用 WebRTC 信号。当前的浏览器支持使用 WebRTC 进行视频通话。Angular 前端支持 WebRTC 调用,需要访问摄像头和麦克风。浏览器需要能够直接相互连接并使用服务器后端来做到这一点。这意味着阻止传入连接的家庭/公司网络会阻止创建视频通话。在智能手机上安装 PWA 确实有效,因为没有路由器/防火墙会阻止连接。对于开发,使用带有自签名证书的设置,可以在防火墙网络内进行测试。
WebRTC 文档
Mozilla 开发网络有WebRTC文档。WebRTC 协议记录在这里AngularPwaMessenger 后端为 ICE 协议提供 STUN 服务器实现。此处记录了信令和视频通话。图表/代码显示了视频通话连接的创建。
后端
支持 ICE 协议的 STUN 实现在WebSocketConfig类中配置。
爪哇
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer{
private SignalingHandler socketHandler;
private Environment environment;
public WebSocketConfig(SignalingHandler socketHandler,
Environment environment) {
this.socketHandler = socketHandler;
this.environment = environment;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry
registry) {
WebSocketHandlerRegistration handlerRegistration =
registry.addHandler(this.socketHandler, "/signalingsocket");
if(List.of(this.environment.getActiveProfiles()).stream()
.noneMatch(myProfile ->
myProfile.toLowerCase().contains("prod"))) {
handlerRegistration.setAllowedOrigins("*");
}
}
Spring 具有在此实现中使用并使用@Configuration注释配置的 WebSocket 支持。@EnableWebSocket注释激活 Spring WebSocket 支持 。
和SignalingHandler环境被注入到构造函数中。
该方法registerWebSocketHandlers(...)将 添加SignalingHandler到 path /signalingsocket。环境用于检查配置文件是否包含prod配置文件。对于开发, allowedOrigins(...)检查被禁用。
在类SocketHandler中实现SignalingHandler。消息在这里定义:
爪哇
...
private List<WebSocketSession> sessions = new CopyOnWriteArrayList<>();
...
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message)
throws InterruptedException, IOException {
if (userRoleCheck(session)) {
SenderReceiver senderReceiver = this.extractSenderReceiver(message);
String sessionUsername = extractSessionUsername(session);
for (WebSocketSession webSocketSession : this.sessions) {
removeStaleSession(webSocketSession);
String webSocketSessionUsername =
this.extractSessionUsername(webSocketSession);
if (webSocketSession.isOpen() &&
(checkSenderReceiver(senderReceiver, sessionUsername,
webSocketSessionUsername) ||
checkSenderLocalhostToken(senderReceiver, sessionUsername,
webSocketSessionUsername))) {
LOGGER.debug("Msg send with params: Msg sender: {}, Msg receiver:
{}, Session sender: {}, WebSocket receiver: {}",
senderReceiver.sender, senderReceiver.receiver,
sessionUsername, webSocketSessionUsername);
webSocketSession.sendMessage(message);
}
}
} else {
if (this.isTokenExpired(session)) {
session.close();
}
}
}
...
@Override
public void afterConnectionEstablished(WebSocketSession session)
throws Exception {
if (userRoleCheck(session)) {
this.sessions.add(session);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session,
CloseStatus status) throws Exception {
this.sessions.remove(session);
}
该WebSocketSession列表CopyOnWriteArrayList用于确保在方法中添加或删除会话时支持并发写入afterConnectionEstablished(...)和afterConnectionClosed(...)。
该handelTextMessage(...)方法用于处理 WebSocket 的消息。
登录用户的 JWT 令牌作为 URL 参数添加到 WebSocket 路径 ( /signalingsocket?token=abc...)。从会话中提取 JWT 令牌,解析令牌,检查令牌中的角色。
该方法extractSenderReceiver(...)从 JSON 消息中提取发送方/接收方记录。这是使用字符串函数完成的,以避免解析整个消息。
该方法extractSessionUsername(...)从会话的 JWT 令牌中获取用户名。
注册的会话被迭代:
- 该方法removeStaleSession(...)关闭 JWT 令牌已超时的会话。
- 该方法extractSessionUsername(...)获取会话的 JWT 令牌用户名。
- 检查会话是否打开并且消息发送者/接收者与会话的发送者/接收者匹配。匹配会话发送消息。
如果 JWT 令牌的 userRole 不是User,则检查 JWT 令牌是否已超时以关闭此类会话。
前端
主要部件
Angular 前端使用 WebRTC 提供视频通话。如果有视频通话到达, main.component.ts会显示voice.component.ts 。该方法syncMsgs()在登录后间隔调用。
...
private offerMsgSub: Subscription;
...
private async syncMsgs(): Promise<void> {
if ( this.ownContact && this.netConnectionService.connetionStatus &&
!this.jwttokenService.localLogin ) {
...
const result = await
this.voiceService.connect(this.jwttokenService.jwtToken);
if(!!result) {
this.webrtcService.addIncominMessageHandler();
this.webrtcService.senderId = this.ownContact.name;
this.webrtcService.receiverId = this?.selectedContact?.name;
this.offerMsgSub = this.webrtcService.offerMsgSubject
.pipe(filter(offerMsg => !!offerMsg.receiverId &&
!!offerMsg.senderId)).subscribe(offerMsg => {
this.selFeature = MyFeature.phone;
});
}
}
}
offerMsgSub订阅在ngOnDestroy()方法 中被取消订阅。
该connect(...)方法使用 JWT 令牌创建一个新的 WebSocket 连接,如果不存在,则返回 true。创建的连接会自动重新连接。初次连接后,结果为假。成功登录后创建连接,因为令牌用于保护 WebSocket 连接。
检查初始 WebSocket 连接后,初始化 WebRTCService。
为addIncominMessageHandler()服务添加回调以接收 WebRTC 消息。
ThesenderId和 thereceiverId设置为唯一的名称。
然后offerMsgSubject订阅 WebRTCService 以在收到报价时显示 voice.component.ts。使用管道过滤带有 setreceiverId和的消息senderId。
语音服务
voice.service.ts创建并维护与后端的 WebSocket 连接:
public async connect(jwtToken: string): Promise<boolean> {
this.webSocketConnectionRequested = true;
if (!this.socket$ || this.socket$.closed) {
return Promise.resolve<WebSocketSubject<any>>
(this.getNewWebSocket(jwtToken)).then<boolean>(mySocket => {
this.socket$ = mySocket;
// Called whenever there is a message from the server
this.socket$.pipe(takeUntil(this.ngUnsubscribeMsg))
.subscribe(msg => {
console.log('Received message of type: ' + msg.type);
this.messagesSubject.next(msg);
});
return true;
});
}
return Promise.resolve(false);
}
public disconnect(): void {
this.webSocketConnectionRequested = false;
this.ngUnsubscribeMsg.next();
this.ngUnsubscribeMsg.unsubscribe();
}
public sendMessage(msg: VoiceMsg): void {
console.log('sending message: ' + msg.type +
' sid: '+msg.senderId +' remoteId: '+msg.receiverId);
this.socket$.next(msg);
}
private getNewWebSocket(jwtToken: string): WebSocketSubject<any> {
return webSocket({
url: `${this.wsEndpoint}?token=${encodeURI(jwtToken)}`,
openObserver: {
next: () => {
console.log('[DataService]: connection ok');
}
},
closeObserver: {
next: () => {
console.log('[DataService]: connection closed');
this.socket$ = undefined;
if(!!this.webSocketConnectionRequested) {
this.connect(jwtToken);
}
}
}
});
}
该connect(...)方法将webSocketConnectionRequested属性设置为 true 以启用自动重新连接并检查 WebSocket 是 false 还是关闭。然后,创建一个新的 WebSocket 连接。
该方法getNewWebSocket(..)创建一个新的webSocket(...). 参数是带有端点的 URL 和 URI 编码的 JWT 令牌。openObserver记录套接字创建并将closeObserver套接字设置为未定义并检查是否激活了重新连接。
该connect(...)方法继续将提供的 WebSocket 设置为socket$属性。该socket$属性获取一个用于取消订阅的管道takeUntil并订阅套接字以将消息发送到messagesSubject. 然后true被退回。
该disconnect()方法将属性webSocketConnectionRequested设置false为禁用自动重新连接。ReplaySubject向管道ngUnsubscribeMsg发送信号以takeUntil取消订阅,然后取消订阅。
该方法sendMessage(...)使用 WebSocket 发送和记录消息。
语音组件
voice.component.ts 使用 video/audio HTML 标签播放远端视频和音频,并显示本地视频:
@Component({
selector: 'app-voice',
templateUrl: './voice.component.html',
styleUrls: ['./voice.component.scss']
})
export class VoiceComponent implements OnInit, OnDestroy, AfterViewInit {
@ViewChild('local_video') localVideo: ElementRef;
@ViewChild('remote_video') remoteVideo: ElementRef;
@Input()
receiver: Contact;
@Input()
sender: Contact;
localVideoActivated = false;
remoteMuted = false;
localMuted = false;
onLocalhost: boolean;
inCall = false;
private localhostReceiver = '';
private componentSubscribtions: Subscription[] = [];
constructor(private voiceService: VoiceService,
private webrtcService: WebrtcService) {
this.onLocalhost = this.voiceService.localhostCheck();
}
public ngAfterViewInit(): void {
this.componentSubscribtions.push(this.webrtcService.offerMsgSubject
.pipe(filter(offerMsg => !!offerMsg.senderId &&
!!offerMsg.receiverId), debounceTime(500))
.subscribe(offerMsg => this.handleOfferMessage(offerMsg)));
this.componentSubscribtions.push(this.webrtcService.hangupMsgSubject
.pipe(debounceTime(500))
.subscribe(hangupMsg => this.handleHangupMessage(hangupMsg)));
this.componentSubscribtions.push(this.webrtcService
.remoteStreamSubject.subscribe(remoteStream =>
this.handleRemoteStream(remoteStream)));
}
public ngOnDestroy(): void {
this.componentSubscribtions.forEach(mySub => mySub.unsubscribe());
}
public ngOnInit(): void {
this.localhostReceiver = this.sender.name +
this.voiceService.localHostToken;
this.requestMediaDevices();
}
@ViewChild注释通过 id 创建对视频 HTML 标记的引用。
组件获取receiver/sender对象作为输入参数。
该属性componentSubscribtions是方法中未订阅的订阅数组ngOnDestroy()。
构造函数获取VoiceService用于WebSocket 连接的WebRTCService,以及注入WebRTC 协议的WebRTCService。
该ngOnInit()方法调用requestMediaDevices()以访问可用Mediadevices并停止任何当前使用。
该方法ngAfterViewInit()使用 RxJS 管道对消息进行去抖动、订阅offerMsgSubject和处理方法中的消息handleOfferMessage(...)。挂断消息像报价消息一样处理,并在handleHangupMessage(...)方法中进行处理。消息的remoteStream处理方式与没有管道的报价消息一样,并在handleRemoteStream(...)方法中进行处理。
voice.component.ts 调用处理如下:
public async call(): Promise<void> {
const peerConnectionContainer = this.webrtcService.createPeerConnection();
this.voiceService.peerConnections
.set(peerConnectionContainer.senderId, peerConnectionContainer);
if (!this.localVideoActivated) {
this.startLocalVideo();
}
this.webrtcService.localStream.getTracks().forEach(myTrack =>
peerConnectionContainer.rtcPeerConnection
.addTrack(myTrack, this.webrtcService.localStream));
try {
const offer = await this.voiceService.peerConnections
.get(peerConnectionContainer.senderId)
.rtcPeerConnection.createOffer(offerOptions);
// Establish the offer as the local peer's current description.
await peerConnectionContainer.rtcPeerConnection
.setLocalDescription(new RTCSessionDescription(offer));
this.inCall = true;
this.voiceService.sendMessage({type: VoiceMsgType.offer,
senderId: peerConnectionContainer.senderId,
receiverId: peerConnectionContainer.receiverId, data: offer});
} catch (err) {
this.handleGetUserMediaError(err, peerConnectionContainer.senderId);
}
}
}
private handleRemoteStream(remoteStream: MediaStream): void {
console.log('remote mediastream handled: ' + remoteStream.id);
if(!!this.remoteVideo.nativeElement.srcObject) {
remoteStream.getTracks().forEach(myTrack =>
this.remoteVideo.nativeElement.srcObject.addTracks(myTrack));
} else {
this.remoteVideo.nativeElement.srcObject = remoteStream;
}
this.remoteMuted = false;
}
public hangUp(): void {
this.voiceService.sendMessage({type: VoiceMsgType.hangup,
senderId: this.sender.name, receiverId: this.onLocalhost ?
this.localhostReceiver : this.receiver.name, data: ''});
this.closeVideoCall();
}
private closeVideoCall(): void {
console.log('Closing call');
this.voiceService.peerConnections.forEach((container, sid) => {
console.log('--> Closing the peer connection');
container.rtcPeerConnection.ontrack = null;
container.rtcPeerConnection.onicecandidate = null;
container.rtcPeerConnection.oniceconnectionstatechange = null;
container.rtcPeerConnection.onsignalingstatechange = null;
// Stop all transceivers on the connection
container.rtcPeerConnection.getTransceivers().forEach(transceiver => {
transceiver.stop();
});
// Close the peer connection
container.rtcPeerConnection.close();
});
this.voiceService.peerConnections.clear();
this.voiceService.pendingCandidates.clear();
this.stopLocalVideo();
this.remoteMuted = true;
this.remoteVideo.nativeElement.srcObject = null;
this.inCall = false;
}
要开始视频通话,请使用该call()方法。它使用 WebRTCService 创建一个 PeerConnectionContainer,用于存储创建调用所需的所有信息。PeerConnectionContainer 被添加到peerConnectionsVoiceService 中。该方法startLocalVideo()初始化本地视频。然后将 的曲目localStream添加到peerConnectionContainer.
然后使用创建报价,RtcConnection并LocalDescription在连接中创建/设置。报价是通过 JSON 中的 WebSocket 与 VoiceService 一起发送的,如下例所示:
{
type: VoiceMsgType.offer, //Messagetype
senderId: peerConnectionContainer.senderId, //the userId of the current user
receiverId: peerConnectionContainer.receiverId, //the userId of the receiver
data: offer //the offer that has been created with the RtcPeerConnection
}
该handleRemoteStream(...)方法用于将 Answer Message 的 remoteStream 设置为 remoteVideo 对象的源对象并取消静音。
该hangup()方法使用 VoiceService 发送挂断消息并调用closeVideoCall()迭代 peerConnections 并关闭收发器/连接并删除回调。然后连接和未决候选被清除,本地/远程视频停止。
结论:STUN 服务和前端
Spring 对 WebSocket 连接有很好的支持,可以通过 STUN 服务器提供对 WebRTC 信令的支持。Angular 前端的这一部分是视频通话的集成,这是由于 Angular 组件/服务得到了很好的支持。
带信令的 WebRTC 协议
现在讨论 AngularPwaMessenger 项目的 WebRTC/Signaling 实现。
WebRTC 协议
应用程序需要支持的信令在之前链接的 MDN 文章中进行了描述。基本上是这样的:
首先,发送者需要创建一个offer消息并通过STUN服务器发送给接收者。接收方必须处理报价并为报价创建一个答案,然后通过 STUN 服务器将其发送给发送方。然后建立连接。
发送方需要为协议协商创建一个 ICE 候选消息,并通过 STUN 服务器发送。接收方必须处理 ICE 候选消息并创建响应消息以通过 STUN 服务器发送给发送方。
WebRTC服务
webrtc.service.ts管理发送者和接收者的消息传递。它创建连接并提供回调来处理消息。
服务设置
要初始化 webrtc.service.ts,addIncomingMessageHandler()需要调用该方法:
public async addIncomingMessageHandler(): Promise<void> {
console.log('Message Handler added');
await this.requestMediaDevices();
this.voiceService.messages.subscribe(
msg => {
console.log('Received message: ' + msg.type);
// console.log(msg);
switch (msg.type) {
case VoiceMsgType.offer:
this.handleOfferMessage(msg);
break;
case VoiceMsgType.answer:
this.handleAnswerMessage(msg);
break;
case VoiceMsgType.hangup:
this.handleHangupMessage(msg);
break;
case VoiceMsgType.iceCandidate:
this.handleICECandidateMessage(msg);
break;
default:
console.log('unknown message of type ' + msg.type);
}
},
error => console.log(error)
);
}
该方法requestMediaDevices()初始化localStream。
订阅 VoiceService 的消息以处理通过 WebSocket 从接收方返回的消息。具有VoiceMsg类型枚举以显示消息类型。交换机调用消息处理程序来处理消息类型并记录未知类型的消息。
public createPeerConnection(): RTCPeerConnectionContainer {
console.log('creating PeerConnection...');
const peerConnection = new RTCPeerConnection(environment.RTCPeerConfiguration);
//const senderId = window.crypto.randomUUID();
const senderId = this.senderId;
const receiverId = this.onLocalhost ? this.localhostReceiver : this.receiverId;
peerConnection.onicecandidate = this.handleICECandidateEvent;
peerConnection.oniceconnectionstatechange =
this.handleICEConnectionStateChangeEvent;
peerConnection.onsignalingstatechange = this.handleSignalingStateChangeEvent;
peerConnection.ontrack = this.handleTrackEvent;
const container = new RTCPeerConnectionContainer(senderId,
receiverId, peerConnection);
return container;
}
该方法使用、和来createPeerConnection()创建一个。获取设置为处理连接事件的回调函数。RTCPeerConnectionContainersenderIdreceiverIdpeerConnectionpeerConnection
private handleOfferMessage(msg: VoiceMsg): void {
console.log('handle incoming offer sid:: '+msg.senderId);
const peerConnectionContainer = this.createPeerConnection();
peerConnectionContainer.receiverId = msg.senderId;
peerConnectionContainer.senderId = this.onLocalhost ? this.localhostReceiver :
peerConnectionContainer.senderId;
this.voiceService.peerConnections.set(peerConnectionContainer.senderId,
peerConnectionContainer);
this.localStream.getTracks().forEach(myTrack => !!peerConnectionContainer
&& peerConnectionContainer?.rtcPeerConnection?.addTrack(myTrack,
this.localStream));
this.voiceService.peerConnections.get(peerConnectionContainer.senderId)
.rtcPeerConnection.setRemoteDescription(new RTCSessionDescription(msg.data))
.then(() =>
// Build SDP for answer message
this.voiceService.peerConnections.get(peerConnectionContainer.senderId)
.rtcPeerConnection.createAnswer()
).then((answer) =>
// Set local SDP
this.voiceService.peerConnections.get(peerConnectionContainer.senderId)
.rtcPeerConnection.setLocalDescription(answer).then(() => answer)
).then(answer => {
// Send local SDP to remote part
this.voiceService.sendMessage({type: VoiceMsgType.answer, senderId:
peerConnectionContainer.senderId,
receiverId: peerConnectionContainer.receiverId, data: answer} as VoiceMsg);
this.offerMsgSubject.next(msg);
}).catch(e => this.reportError(e));
}
为了处理方法的提议,创建handleOfferMessage(..)了一个新方法PeerConnetionContainer。它可以设置receiverId并senderId创建响应。将PeerConnectionContainer添加到VoiceService.peerConnections地图中。
然后将 localStream 的轨道添加到PeerConnectionContainer.rtcPeerConnection.
然后,使用以下步骤创建应答消息:
- 添加报价消息的PeerConnectionContainer.rtcPeerConnectionremoteDescription。
- PeerConnectionContainer.rtcPeerConnection用于创建答案对象。
- 答案对象作为 localDescription 添加到PeerConnectionContainer.rtcPeerConnection.
- VoiceService 用于通过 WebSocket 发送 answer 消息,并将 offer 消息添加到offerMsgSubject以使前端能够对 offer 做出反应。
private handleAnswerMessage(msg: VoiceMsg): void {
console.log('handle incoming answer sid: ' +msg.receiverId);
if(this.voiceService.peerConnections.get(msg.receiverId).rtcPeerConnection
.signalingState !== 'stable') {
this.voiceService.peerConnections.get(msg.receiverId).rtcPeerConnection
.setRemoteDescription(new RTCSessionDescription(msg.data))
.then(() => console.log('answer handled'));
}
}
private handleHangupMessage(msg: VoiceMsg): void {
console.log(msg);
this.hangupMsgSubject.next(msg);
}
该方法handleAnswerMessage(...)获取答案消息并检查 是否不稳定以忽略重复项rtcPeerConnection.signalingState。PeerConnectionContainer然后PeerConnectionContainer.rtcPeerConnection根据应答消息添加了 remoteDescription。
该方法handleHangupMessage(...)将消息添加到hangupMsgSubject以使前端能够做出反应。
private handleICECandidateMessage(msg: VoiceMsg): void {
console.log('ICECandidateMessage sid: '+msg.senderId+' remoteId: '+msg.receiverId);
if (!!this.voiceService.peerConnections.get(msg.receiverId).rtcPeerConnection
?.currentRemoteDescription) {
this.voiceService.peerConnections.get(msg.receiverId).rtcPeerConnection
.addIceCandidate(new RTCIceCandidate(msg.data)).catch(this.reportError);
} else {
if (!this.voiceService.pendingCandidates.get(msg.receiverId)) {
this.voiceService.pendingCandidates.set(msg.receiverId,
[] as RTCIceCandidateInit[]);
}
this.voiceService.pendingCandidates.get(msg.receiverId).push(msg.data);
}
}
为了处理 ICECandidate 消息,检查方法handleICECandidateMessage(...)和rtcPeerConnection,然后将新的 ICECandidate 添加到rtcPeerConnection. 如果检查失败,则将 ICECandidate 消息添加到pendingCandidatesreceiverId 的映射中的数组中。
private handleICECandidateEvent = (event: RTCPeerConnectionIceEvent) => {
if (event.candidate && this.voiceService.peerConnections
.get(this.getEventSid(event))?.receiverId) {
this.voiceService.sendMessage({
type: VoiceMsgType.iceCandidate,
senderId: this.getEventSid(event),
receiverId: this.voiceService.peerConnections
.get(this.getEventSid(event)).receiverId,
data: event.candidate
});
}
此函数处理由回调RTCPeerConnectionIceEvent生成的 s 。RtcPeerConnection.onicecandidate消息VoiceService通过 WebSocket 发送。senderId 和receiverId 是根据RtcConnection 设置的。event.candidate是 RtcConnection 生成的消息数据。
其他回调正在检查是否应该关闭连接。
本地测试
要使用本地网络上的 2 个设备测试语音呼叫,需要使用ssl配置文件启动 Spring Boot 应用程序。该配置文件使用“ resources/testCert/server.p12 ”文件中的 SSL 证书。创建证书的步骤可以在addIngress.sh脚本中找到。运行后端的服务器的 DNS 名称必须放在cert.conf和server.conf中。必须将 server.p12 复制到资源中,并且必须将 rootCa.pem 作为用于测试的浏览器中的权限导入。
结论
接收到的 WebSocket 消息和 WebRTCConnection 回调的处理封装在此服务中。消息处理使用 TypeScript 来帮助处理属性和方法参数。
谢谢大家阅读,喜欢的朋友请关注点赞转发,带你了解最新技术趋势。