restructure

This commit is contained in:
david 2025-04-01 12:54:04 -07:00
parent 3610c37585
commit a83a5fa605
7 changed files with 342 additions and 233 deletions

38
src/http.ts Normal file
View File

@ -0,0 +1,38 @@
import * as http from "http";
import * as fs from "fs";
import * as path from "path";
export default class HttpServer {
private httpServer: http.Server;
private port: number;
private root: string;
public constructor(port: number, root : string) {
this.port = port;
this.root = root;
this.httpServer = http.createServer((req, res) => {
const filePath = path.join(root, req.url === "/" ? "index.html" : req.url || "");
fs.readFile(filePath, (err, data) => {
if (err) {
res.writeHead(404);
res.end("Not Found");
} else {
res.writeHead(200);
res.end(data);
}
});
});
}
public start() {
this.httpServer.listen(this.port, () => {
console.log(`Serving ${this.root} at http://localhost:${this.port}`);
});
}
}

View File

@ -1,174 +1,15 @@
import { MediaStream, MediaStreamTrack, nonstandard, RTCPeerConnection } from '@roamhq/wrtc';
import { ChildProcessWithoutNullStreams, spawn } from 'child_process';
import * as ws from 'ws';
import HttpServer from './http';
import TVWebSocket from './ws';
// Constants
const VIDEO_DEVICE = '/dev/dvb/adapter0/dvr0'; // Video source device
const WIDTH = 640; // Video width
const HEIGHT = 480; // Video height
const FRAME_SIZE = WIDTH * HEIGHT * 1.5; // YUV420p frame size (460800 bytes)
const HTTP_PORT = process.env.HTTP_PORT ? parseInt(process.env.HTTP_PORT,10) : 8080;
const WS_PORT = process.env.WS_PORT ? parseInt(process.env.WS_PORT, 10) : 3001;
const STATIC_ROOT = process.cwd() + "/dist/static";;
// Function to start FFmpeg and capture raw video
const startFFmpeg = (): ChildProcessWithoutNullStreams => {
const p = spawn('ffmpeg', [
'-loglevel', 'debug',
'-i', VIDEO_DEVICE,
const httpServer = new HttpServer(HTTP_PORT, STATIC_ROOT);
const tvWebSocket = new TVWebSocket(WS_PORT);
// Video
'-map', '0:v:0',
'-vf', `scale=${WIDTH}:${HEIGHT}`,
'-vcodec', 'rawvideo',
'-pix_fmt', 'yuv420p',
'-f', 'rawvideo',
'pipe:3',
// Audio
'-map', '0:a:0',
'-acodec', 'pcm_s16le',
'-ac', '1',
'-ar', '48000',
'-f', 's16le',
'pipe:4'
], {
stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe']
});
process.on('SIGINT', () => {
console.log('🔻 Server shutting down...');
p.kill('SIGINT');
process.exit(0);
});
process.on('SIGTERM', () => {
console.log('🔻 SIGTERM received');
p.kill('SIGTERM');
process.exit(0);
});
process.on('exit', () => {
p.kill('SIGHUP'); //this one
p.kill('SIGTERM');
});
return p;
}
let frameBuffer = Buffer.alloc(0);
const ffmpegProcess = startFFmpeg();
const videoSource = new nonstandard.RTCVideoSource();
const audioSource = new nonstandard.RTCAudioSource();
const createPeerConnection = async (): Promise<RTCPeerConnection> => {
const peerConnection = new RTCPeerConnection({ iceServers: [] });
const videoStream = ffmpegProcess.stdio[3]; // pipe:3
const audioStream = ffmpegProcess.stdio[4]; // pipe:4
// Start FFmpeg and pipe video frames to the source
videoStream.on('data', (chunk: Buffer) => {
frameBuffer = Buffer.concat([frameBuffer, chunk]);
while (frameBuffer.length >= FRAME_SIZE) {
const frameData = frameBuffer.slice(0, FRAME_SIZE);
frameBuffer = frameBuffer.slice(FRAME_SIZE);
const frame: nonstandard.RTCVideoFrame = {
width: WIDTH,
height: HEIGHT,
data: new Uint8Array(frameData),
}
videoSource.onFrame(frame);
}
});
videoStream.on('data', (data: Buffer) => {
// console.error('FFmpeg Error:', data.toString());
});
videoStream.on('exit', (code) => {
console.log(`FFmpeg exited with code ${code}`);
});
// --- AUDIO handling ---
const AUDIO_FRAME_SIZE = 480 * 2; // 480 samples * 2 bytes (s16le)
let audioBuffer = Buffer.alloc(0);
audioStream.on('data', (chunk: Buffer) => {
audioBuffer = Buffer.concat([audioBuffer, chunk]);
while (audioBuffer.length >= AUDIO_FRAME_SIZE) {
const frameData = audioBuffer.slice(0, AUDIO_FRAME_SIZE);
audioBuffer = audioBuffer.slice(AUDIO_FRAME_SIZE);
const samples = new Int16Array(480);
for (let i = 0; i < 480; i++) {
samples[i] = frameData.readInt16LE(i * 2);
}
audioSource.onData({
samples,
sampleRate: 48000,
bitsPerSample: 16,
channelCount: 1,
numberOfFrames: 480
});
}
});
audioStream.on('data', (data: Buffer) => {
// console.error('FFmpeg Error:', data.toString());
});
audioStream.on('exit', (code) => {
console.log(`FFmpeg exited with code ${code}`);
});
// Add the track to the PeerConnection
const videoTrack: MediaStreamTrack = videoSource.createTrack();
const audioTrack: MediaStreamTrack = audioSource.createTrack();
const stream = new MediaStream()
stream.addTrack(videoTrack)
stream.addTrack(audioTrack);
peerConnection.addTrack(videoTrack, stream);
peerConnection.addTrack(audioTrack, stream);
return peerConnection;
}
httpServer.start();
// WebSocket signaling server
const wss = new ws.WebSocketServer({ port: 8080 });
wss.on('connection', async (ws: ws.WebSocket) => {
const peerConnection: RTCPeerConnection = await createPeerConnection();
ws.on('message', async (message: Buffer) => {
const { type, data } = JSON.parse(message.toString());
if (type == 'offer') {
await peerConnection.setRemoteDescription(data);
const answer = await peerConnection.createAnswer();
await peerConnection.setLocalDescription(answer);
ws.send(JSON.stringify({ type: 'answer', data: peerConnection.localDescription }));
}
if (type === 'ice-candidate') {
await peerConnection.addIceCandidate(data);
}
});
peerConnection.oniceconnectionstatechange = () => {
if (peerConnection.iceConnectionState === 'failed') {
console.error('ICE connection failed');
}
};
// Send ICE candidates to the client
peerConnection.onicecandidate = ({ candidate }) => {
if (candidate) {
ws.send(JSON.stringify({ type: 'ice-candidate', data: candidate }));
}
};
ws.on('close', () => {
console.log('Client disconnected');
peerConnection.close();
});
});
console.log('WebRTC signaling server running on ws://localhost:8080');

44
src/static/css/index.scss Normal file
View File

@ -0,0 +1,44 @@
$background-color: #c7c7c7;
$primary-color: #333;
$secondary-color: #d11414;
$text-color: #123455;
body {
font-family: sans-serif;
background-color: $background-color;
header {
nav {
ul {
display: flex;
list-style: none;
gap: 1em;
li {
}
}
a {
&:hover {}
}
}
}
main {
display: flex;
flex-direction: column;
text-align: center;
h1 {}
p {}
img {
width: 20em;
border-radius: 50%;
}
}
footer {
text-align: center;
// background: #eee;
padding: 1em;
}
}

19
src/static/index.html Normal file
View File

@ -0,0 +1,19 @@
<!DOCTYPE html>
<html>
<head>
<title>WebRTC Stream</title>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="css/index.css" />
</head>
<body>
<h1>Video streams</h1>
<h2>WebRTC</h2>
<video id="video" autoplay playsinline controls></video>
<script src="js/index.js"></script>
</body>
</html>

48
src/static/js/index.ts Normal file
View File

@ -0,0 +1,48 @@
const host = window.location.hostname
const ws = new WebSocket(`ws://${host}:3001`);
const pc = new RTCPeerConnection({ iceServers: [] });
const video = document.getElementById('video') as HTMLVideoElement;
pc.onconnectionstatechange = (event) => {
console.log("onconnectionstatechange ", event)
}
pc.ondatachannel = (event) => {
console.log("ondatachannel ", event)
}
pc.ontrack = (event) => {
console.log("Received track event", event.streams);
video.srcObject = event.streams[0];
};
pc.onicecandidate = ({ candidate }) => {
if (candidate) {
ws.send(JSON.stringify({ type: 'ice-candidate', data: candidate })); // Use 'candidate' instead of 'ice-candidate'
}
};
pc.onicegatheringstatechange = () => {
// console.log('ICE state:', pc.iceGatheringState);
};
ws.onopen = async () => {
pc.addTransceiver('video', { direction: 'recvonly' });
pc.addTransceiver('audio', { direction: 'recvonly' })
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
ws.send(JSON.stringify({ type: 'offer', data: offer }));
}
ws.onmessage = async (message) => {
const msg = JSON.parse(message.data);
if (msg.type === 'answer') {
await pc.setRemoteDescription(msg.data);
}
else if (msg.type === 'ice-candidate') {
await pc.addIceCandidate(msg.data);
}
};
;

185
src/ws.ts Normal file
View File

@ -0,0 +1,185 @@
import { MediaStream, MediaStreamTrack, nonstandard, RTCPeerConnection } from '@roamhq/wrtc';
import { ChildProcessWithoutNullStreams, spawn } from 'child_process';
import * as ws from 'ws';
// Constants
const VIDEO_DEVICE = '/dev/dvb/adapter0/dvr0'; // Video source device
const WIDTH = 640; // Video width
const HEIGHT = 480; // Video height
const FRAME_SIZE = WIDTH * HEIGHT * 1.5; // YUV420p frame size (460800 bytes)
export default class TVWebSocket {
public constructor(port: number) {
const ffmpegProcess = this.startFFmpeg();
const videoTrack = this.createVideoTrack(ffmpegProcess);
const audioTrack = this.createAudioTrack(ffmpegProcess);
// WebSocket signaling server
const wss = new ws.WebSocketServer({ port });
wss.on('connection', async (ws: ws.WebSocket) => {
const peerConnection: RTCPeerConnection = this.createPeerConnection(videoTrack, audioTrack);
ws.on('message', async (message: Buffer) => {
const { type, data } = JSON.parse(message.toString());
if (type == 'offer') {
await peerConnection.setRemoteDescription(data);
const answer = await peerConnection.createAnswer();
await peerConnection.setLocalDescription(answer);
ws.send(JSON.stringify({ type: 'answer', data: peerConnection.localDescription }));
}
if (type === 'ice-candidate') {
await peerConnection.addIceCandidate(data);
}
});
peerConnection.oniceconnectionstatechange = () => {
if (peerConnection.iceConnectionState === 'failed') {
console.error('ICE connection failed');
}
};
// Send ICE candidates to the client
peerConnection.onicecandidate = ({ candidate }) => {
if (candidate) {
ws.send(JSON.stringify({ type: 'ice-candidate', data: candidate }));
}
};
ws.on('close', () => {
console.log('Client disconnected');
peerConnection.close();
});
});
}
// Function to start FFmpeg and capture raw video
startFFmpeg = (): ChildProcessWithoutNullStreams => {
const p = spawn('ffmpeg', [
'-loglevel', 'debug',
'-i', VIDEO_DEVICE,
// Video
'-map', '0:v:0',
'-vf', `scale=${WIDTH}:${HEIGHT}`,
'-vcodec', 'rawvideo',
'-pix_fmt', 'yuv420p',
'-f', 'rawvideo',
'pipe:3',
// Audio
'-map', '0:a:0',
'-acodec', 'pcm_s16le',
'-ac', '1',
'-ar', '48000',
'-f', 's16le',
'pipe:4'
], {
stdio: ['ignore', 'pipe', 'pipe', 'pipe', 'pipe']
});
process.on('SIGINT', () => {
console.log('🔻 Server shutting down...');
p.kill('SIGINT');
process.exit(0);
});
process.on('SIGTERM', () => {
console.log('🔻 SIGTERM received');
p.kill('SIGTERM');
process.exit(0);
});
process.on('exit', () => {
p.kill('SIGHUP'); //this one
p.kill('SIGTERM');
});
return p;
}
createVideoTrack = (ffmpegProcess: ChildProcessWithoutNullStreams) => {
let videoBuffer = Buffer.alloc(0);
const videoSource = new nonstandard.RTCVideoSource();
const videoStream = ffmpegProcess.stdio[3]; // pipe:3
// Start FFmpeg and pipe video frames to the source
videoStream.on('data', (chunk: Buffer) => {
videoBuffer = Buffer.concat([videoBuffer, chunk]);
while (videoBuffer.length >= FRAME_SIZE) {
const frameData = videoBuffer.slice(0, FRAME_SIZE);
videoBuffer = videoBuffer.slice(FRAME_SIZE);
const frame: nonstandard.RTCVideoFrame = {
width: WIDTH,
height: HEIGHT,
data: new Uint8Array(frameData),
}
videoSource.onFrame(frame);
}
});
videoStream.on('data', (data: Buffer) => {
// console.error('FFmpeg Error:', data.toString());
});
videoStream.on('exit', (code) => {
console.log(`FFmpeg exited with code ${code}`);
});
return videoSource.createTrack();
}
createAudioTrack = (ffmpegProcess: ChildProcessWithoutNullStreams) => {
let audioBuffer = Buffer.alloc(0);
const audioSource = new nonstandard.RTCAudioSource();
const audioStream = ffmpegProcess.stdio[4]; // pipe:4
// --- AUDIO handling ---
const AUDIO_FRAME_SIZE = 480 * 2; // 480 samples * 2 bytes (s16le)
audioStream.on('data', (chunk: Buffer) => {
audioBuffer = Buffer.concat([audioBuffer, chunk]);
while (audioBuffer.length >= AUDIO_FRAME_SIZE) {
const frameData = audioBuffer.slice(0, AUDIO_FRAME_SIZE);
audioBuffer = audioBuffer.slice(AUDIO_FRAME_SIZE);
const samples = new Int16Array(480);
for (let i = 0; i < 480; i++) {
samples[i] = frameData.readInt16LE(i * 2);
}
audioSource.onData({
samples,
sampleRate: 48000,
bitsPerSample: 16,
channelCount: 1,
numberOfFrames: 480
});
}
});
audioStream.on('data', (data: Buffer) => {
// console.error('FFmpeg Error:', data.toString());
});
audioStream.on('exit', (code) => {
console.log(`FFmpeg exited with code ${code}`);
});
return audioSource.createTrack();
}
createPeerConnection = (videoTrack: MediaStreamTrack, audioTrack: MediaStreamTrack): RTCPeerConnection => {
const peerConnection = new RTCPeerConnection({ iceServers: [] });
const stream = new MediaStream()
stream.addTrack(videoTrack)
stream.addTrack(audioTrack);
peerConnection.addTrack(videoTrack, stream);
peerConnection.addTrack(audioTrack, stream);
return peerConnection;
}
}

View File

@ -1,66 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<title>WebRTC Stream</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/dashjs/4.7.1/dash.all.min.js"></script>
</head>
<body>
<h1>Video streams</h1>
<h2>WebRTC</h2>
<video id="video" autoplay playsinline controls></video>
<script>
const ws = new WebSocket('ws://localhost:8080');
const pc = new RTCPeerConnection({ iceServers: [] });
const video = document.getElementById('video');
pc.onconnectionstatechange = (event) => {
console.log("onconnectionstatechange ", event)
}
pc.ondatachannel = (event) => {
console.log("ondatachannel ", event)
}
pc.ontrack = (event) => {
console.log("Received track event", event.streams);
video.srcObject = event.streams[0];
};
pc.onicecandidate = ({ candidate }) => {
if (candidate) {
ws.send(JSON.stringify({ type: 'ice-candidate', data: candidate })); // Use 'candidate' instead of 'ice-candidate'
}
};
pc.onicegatheringstatechange = () => {
// console.log('ICE state:', pc.iceGatheringState);
};
ws.onopen = async () => {
pc.addTransceiver('video', { direction: 'recvonly' });
pc.addTransceiver('audio', { direction: 'recvonly' })
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
ws.send(JSON.stringify({ type: 'offer', data: offer }));
}
ws.onmessage = async (message) => {
const msg = JSON.parse(message.data);
if (msg.type === 'answer') {
await pc.setRemoteDescription(msg.data);
}
else if (msg.type === 'ice-candidate') {
await pc.addIceCandidate(msg.data);
}
};
;
</script>
</body>
</html>