grow/src/ws.ts
2025-04-24 11:33:04 -07:00

270 lines
9.3 KiB
TypeScript

import { MediaStream, MediaStreamTrack, nonstandard, RTCPeerConnection, RTCDataChannel } from '@roamhq/wrtc';
import { ChildProcessWithoutNullStreams, spawn } from 'child_process';
import * as ws from 'ws';
import { ISensors } from './io';
// Constants
const WIDTH = 640; // Video width
const HEIGHT = 480; // Video height
const FRAME_SIZE = WIDTH * HEIGHT * 1.5; // YUV420P frame size
export default class VideoSocket {
videoDevice: string;
public constructor(port: number, videoDevice, getSensors: () => ISensors) {
this.videoDevice = videoDevice
const ffmpegProcess = this.startFFmpeg();
const videoTrack = this.createVideoTrack(ffmpegProcess);
const audioTrack = this.createAudioTrack(ffmpegProcess);
// ffmpegProcess.stdio[2].on('data', data => {
// console.log("stdio[2] ",data.toString())
// })
ffmpegProcess.stdio[2].on('error', data => {
console.log("stdio[2] error",data.toString())
})
ffmpegProcess.stderr.on('data', (data) => {
// console.error(`ffmpeg stderr data: ${data}`);
});
// WebSocket signaling server
const wss = new ws.WebSocketServer({ port });
wss.on('connection', async (ws: ws.WebSocket) => {
const peerConnection: RTCPeerConnection = this.createPeerConnection(videoTrack, audioTrack);
// The client created the data channel. The server should access it as follows:
peerConnection.ondatachannel = (event) => {
const dataChannel = event.channel; // This is the data channel created by the client
dataChannel.onopen = () => {
console.log('📬 Server: Data channel opened');
// Now you can send data through the channel
setInterval(() => {
if (dataChannel.readyState === 'open') {
const sensorData = getSensors(); // Example function to fetch data
dataChannel.send(JSON.stringify(sensorData)); // Send data to the client
}
}, 1000);
};
dataChannel.onmessage = (event) => {
// console.log("📦 Server received message:", event.data);
};
dataChannel.onclose = () => {
console.log("📴 Server: Data channel closed");
};
};
ws.on('message', async (message: Buffer) => {
const { type, data } = JSON.parse(message.toString());
if (type == 'offer') {
await peerConnection.setRemoteDescription(data);
const answer = await peerConnection.createAnswer();
await peerConnection.setLocalDescription(answer);
ws.send(JSON.stringify({ type: 'answer', data: peerConnection.localDescription }));
}
if (type === 'ice-candidate') {
await peerConnection.addIceCandidate(data);
}
});
peerConnection.oniceconnectionstatechange = () => {
if (peerConnection.iceConnectionState === 'failed') {
console.error('ICE connection failed');
}
};
// Send ICE candidates to the client
peerConnection.onicecandidate = ({ candidate }) => {
if (candidate) {
ws.send(JSON.stringify({ type: 'ice-candidate', data: candidate }));
}
};
ws.on('close', () => {
console.log('Client disconnected');
peerConnection.close();
});
});
}
// Function to start FFmpeg and capture raw video
startFFmpeg = (): ChildProcessWithoutNullStreams => {
const p = spawn('ffmpeg', [
// '-loglevel', 'debug',
'-i', this.videoDevice,
// Video
'-map', '0:v:0',
'-framerate', '24',
'-video_size', `${WIDTH}x${HEIGHT}`,
'-vf', `scale=${WIDTH}:${HEIGHT}:flags=fast_bilinear`,
'-pix_fmt', 'yuv420p',
'-f', 'rawvideo',
'-threads', '1',
//quality
// '-fflags', '+discardcorrupt',
// '-err_detect', 'ignore_err',
// '-analyzeduration', '100M',
// '-probesize', '100M',
'pipe:3',
// Audio
// '-map', '0:a:0',
// // '-acodec', 'pcm_s16le',
// '-ac', '1',
// '-ar', '48000',
// '-f', 'alsa',
// 'pipe:4'
], {
stdio: ['ignore', 'pipe', 'pipe', 'pipe'/*, 'pipe'*/]
});
process.on('SIGINT', () => {
console.log('🔻 Server shutting down...');
p.kill('SIGINT');
process.exit(0);
});
process.on('SIGTERM', () => {
console.log('🔻 SIGTERM received');
p.kill('SIGTERM');
process.exit(0);
});
process.on('exit', () => {
p.kill('SIGHUP'); //this one
p.kill('SIGTERM');
});
return p;
}
createVideoTrack = (ffmpegProcess: ChildProcessWithoutNullStreams) => {
let videoBuffer = Buffer.alloc(0);
const videoSource = new nonstandard.RTCVideoSource();
const videoStream = ffmpegProcess.stdio[3]; // pipe:3
// Start FFmpeg and pipe video frames to the source
videoStream.on('data', (chunk: Buffer) => {
videoBuffer = Buffer.concat([videoBuffer, chunk]);
if (videoBuffer.length > FRAME_SIZE * 2) {
console.warn('Video buffer overrun — possible freeze trigger');
}
while (videoBuffer.length >= FRAME_SIZE) {
const frameData = videoBuffer.slice(0, FRAME_SIZE);
videoBuffer = videoBuffer.slice(FRAME_SIZE);
const frame: nonstandard.RTCVideoFrame = {
width: WIDTH,
height: HEIGHT,
data: new Uint8Array(frameData),
}
videoSource.onFrame(frame);
}
});
videoStream.on('exit', (code) => {
console.log(`FFmpeg exited with code ${code}`);
});
return videoSource.createTrack();
}
createAudioTrack = (ffmpegProcess: ChildProcessWithoutNullStreams) => {
let audioBuffer = Buffer.alloc(0);
const audioSource = new nonstandard.RTCAudioSource();
const audioStream = ffmpegProcess.stdio[4]; // pipe:4
// --- AUDIO handling ---
const AUDIO_FRAME_SIZE = 480 * 2; // 480 samples * 2 bytes (s16le)
/*
audioStream.on('data', (chunk: Buffer) => {
audioBuffer = Buffer.concat([audioBuffer, chunk]);
while (audioBuffer.length >= AUDIO_FRAME_SIZE) {
const frameData = audioBuffer.slice(0, AUDIO_FRAME_SIZE);
audioBuffer = audioBuffer.slice(AUDIO_FRAME_SIZE);
const samples = new Int16Array(480);
for (let i = 0; i < 480; i++) {
samples[i] = frameData.readInt16LE(i * 2);
}
audioSource.onData({
samples,
sampleRate: 48000,
bitsPerSample: 16,
channelCount: 1,
numberOfFrames: 480
});
}
});
audioStream.on('data', (data: Buffer) => {
// console.error('FFmpeg Error:', data.toString());
});
audioStream.on('exit', (code) => {
console.log(`FFmpeg exited with code ${code}`);
});
*/
return audioSource.createTrack();
}
createDataChannel = (peerConnection: RTCPeerConnection, getSensors: () => any) => {
const dataChannel = peerConnection.createDataChannel('sensors')
console.log("create data channel");
dataChannel.onopen = () => {
console.log('✅ Data channel is open');
// Send dummy JSON for testing
setInterval(() => {
const sensorData = getSensors(); // Assuming getSensors returns JSON
dataChannel.send(JSON.stringify(sensorData));
}, 1000);
};
dataChannel.onerror = (error) => {
console.error('❌ DataChannel error:', error);
};
dataChannel.onclose = () => {
console.log('❎ DataChannel closed');
};
return peerConnection;
}
createPeerConnection = (videoTrack: MediaStreamTrack, audioTrack: MediaStreamTrack): RTCPeerConnection => {
const peerConnection = new RTCPeerConnection({
iceServers: [
// {
// urls: 'stun:192.168.0.3:3478'
// },
// {
// urls: 'turn:192.168.0.3:3478?transport=udp',
// username: 'webrtcuser',
// credential: 'webrtccred'
// }
]
});
const stream = new MediaStream()
stream.addTrack(videoTrack)
stream.addTrack(audioTrack);
peerConnection.addTrack(videoTrack, stream);
peerConnection.addTrack(audioTrack, stream);
return peerConnection;
}
}