Skip to content

Commit

Permalink
Update publish.py
Browse files Browse the repository at this point in the history
  • Loading branch information
steveseguin authored Aug 30, 2024
1 parent 7cb651a commit e86526a
Showing 1 changed file with 84 additions and 40 deletions.
124 changes: 84 additions & 40 deletions publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import re
import traceback
import subprocess
import struct
try:
import hashlib
from urllib.parse import urlparse
Expand Down Expand Up @@ -311,6 +312,9 @@ def __init__(self, params):
self.salt = ""
self.aom = params.aom
self.av1 = params.av1
self.socketout = params.socketout
self.socketport = params.socketport
self.socket = None

try:
if self.password:
Expand Down Expand Up @@ -448,7 +452,65 @@ async def sendMessageAsync(self, msg): # send message to wss
except Exception as e:
printwarn(get_exception_info(E))


def setup_socket(self):
import socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(('0.0.0.0', int(self.socketport)))

def on_new_socket_sample(self, sink):
sample = sink.emit("pull-sample")
if sample:
buffer = sample.get_buffer()
caps = sample.get_caps()
height = caps.get_structure(0).get_value("height")
width = caps.get_structure(0).get_value("width")

_, map_info = buffer.map(Gst.MapFlags.READ)
frame_data = map_info.data

# Send frame size
self.socket.sendto(struct.pack('!III', width, height, len(frame_data)), ('127.0.0.1', int(self.socketport)))
print("Sending")
# Send frame data in chunks
chunk_size = 65507 # Maximum safe UDP packet size
for i in range(0, len(frame_data), chunk_size):
chunk = frame_data[i:i+chunk_size]
self.socket.sendto(chunk, ('127.0.0.1', int(self.socketport)))

buffer.unmap(map_info)
return Gst.FlowReturn.OK

def new_sample(self, sink):
if self.processing:
return False
self.processing = True
try :
sample = sink.emit("pull-sample")
if sample:
buffer = sample.get_buffer()
caps = sample.get_caps()
height = int(caps.get_structure(0).get_int("height").value)
width = int(caps.get_structure(0).get_int("width").value)
frame_data = buffer.extract_dup(0, buffer.get_size())
np_frame_data = np.frombuffer(frame_data, dtype=np.uint8).reshape(height, width, 3)
print(np.shape(np_frame_data), np_frame_data[0,0,:])

frame_shape = (720 * 1280 * 3)
frame_buffer = np.ndarray(frame_shape+5, dtype=np.uint8, buffer=self.shared_memory.buf)
frame_buffer[5:5+width*height*3] = np_frame_data.flatten(order='K') # K means order as how ordered in memory
frame_buffer[0] = width/255
frame_buffer[1] = width%255
frame_buffer[2] = height/255
frame_buffer[3] = height%255
frame_buffer[4] = self.counter%255
self.counter+=1
self.trigger_socket.sendto(b"update", ("127.0.0.1", 12345))

except Exception as E:
printwarn(get_exception_info(E))

self.processing = False
return False

def on_incoming_stream(self, _, pad):
try:
Expand Down Expand Up @@ -625,6 +687,17 @@ def on_incoming_stream(self, _, pad):
if self.ndiout:
# I'm handling this on elsewhere now
pass
elif self.socketout:
print("SOCKET VIDEO OUT")
out = Gst.parse_bin_from_description(
"queue ! rtph264depay ! h264parse ! avdec_h264 ! videoconvert ! video/x-raw,format=BGR ! appsink name=appsink emit-signals=true", True)
self.pipe.add(out)
out.sync_state_with_parent()
sink = out.get_static_pad('sink')
pad.link(sink)

appsink = self.pipe.get_by_name('appsink')
appsink.connect("new-sample", self.on_new_socket_sample)
elif self.view:
print("DISPLAY OUTPUT MODE BEING SETUP")

Expand Down Expand Up @@ -783,7 +856,7 @@ def on_incoming_stream(self, _, pad):
print(self.shared_memory)
appsink = self.pipe.get_by_name('appsink')
appsink.set_property("emit-signals", True)
appsink.connect("new-sample", new_sample)
appsink.connect("new-sample", self.new_sample)

elif "audio" in name:
if self.noaudio:
Expand Down Expand Up @@ -1228,43 +1301,6 @@ def on_stats(promise, abin, data):
except Exception as E:
printwarn(get_exception_info(E))

def new_sample(sink):
if self.processing:
return False
self.processing = True
try :
sample = sink.emit("pull-sample")
if sample:
buffer = sample.get_buffer()
caps = sample.get_caps()
height = int(caps.get_structure(0).get_int("height").value)
width = int(caps.get_structure(0).get_int("width").value)
frame_data = buffer.extract_dup(0, buffer.get_size())
np_frame_data = np.frombuffer(frame_data, dtype=np.uint8).reshape(height, width, 3)
print(np.shape(np_frame_data), np_frame_data[0,0,:])

frame_shape = (720 * 1280 * 3)
frame_buffer = np.ndarray(frame_shape+5, dtype=np.uint8, buffer=self.shared_memory.buf)
frame_buffer[5:5+width*height*3] = np_frame_data.flatten(order='K') # K means order as how ordered in memory
frame_buffer[0] = width/255
frame_buffer[1] = width%255
frame_buffer[2] = height/255
frame_buffer[3] = height%255
frame_buffer[4] = self.counter%255
self.counter+=1
self.trigger_socket.sendto(b"update", ("127.0.0.1", 12345))

except Exception as E:
printwarn(get_exception_info(E))

self.processing = False
return False

def on_frame_probe(pad, info):
buf = info.get_buffer()
print(f'[{buf.pts / Gst.SECOND:6.2f}]')
return Gst.PadProbeReturn.OK

print("creating a new webrtc bin")

started = True
Expand Down Expand Up @@ -1806,7 +1842,9 @@ async def main():
parser.add_argument('--audio-pipeline', type=str, default=None, help='Custom GStreamer audio source pipeline')
parser.add_argument('--timestamp', action='store_true', help='Add a timestamp to the video output, if possible')
parser.add_argument('--clockstamp', action='store_true', help='Add a clock overlay to the video output, if possible')

parser.add_argument('--socketport', type=str, default=12345, help='Output video frames to a socket; specify the port number')
parser.add_argument('--socketout', type=str, help='Output video frames to a socket; specify the stream ID')

args = parser.parse_args()

Gst.init(None)
Expand Down Expand Up @@ -1864,6 +1902,8 @@ async def main():

elif args.fdsink:
args.streamin = args.fdsink
elif args.socketout:
args.streamin = args.socketout
elif args.framebuffer:
if not np:
print("You must install Numpy for this to work.\npip3 install numpy")
Expand Down Expand Up @@ -2494,6 +2534,10 @@ async def main():
args.pipeline = PIPELINE_DESC

c = WebRTCClient(args)

if args.socketout:
c.setup_socket()

while True:
try:
await c.connect()
Expand Down

0 comments on commit e86526a

Please sign in to comment.