Add libselect to use select instead of polling all file descriptors

This commit is contained in:
Andreas Eversberg
2023-01-20 18:13:11 +01:00
parent ffaed13648
commit 9f662d309f
35 changed files with 408 additions and 131 deletions

View File

@@ -28,6 +28,7 @@
#include <arpa/inet.h>
#include "../libdebug/debug.h"
#include "../libtimer/timer.h"
#include "../libselect/select.h"
#include "endpoint.h"
#define RTP_VERSION 2
@@ -132,6 +133,22 @@ static int rtp_receive(int sock, uint8_t **payload_p, int *payload_len_p, uint8_
return 0;
}
static int rtcp_receive(int sock)
{
static uint8_t data[2048];
int len;
len = read(sock, data, sizeof(data));
if (len < 0) {
if (errno == EAGAIN)
return -EAGAIN;
PDEBUG(DCC, DEBUG_DEBUG, "Read errno = %d (%s)\n", errno, strerror(errno));
return -EIO;
}
return 0;
}
static void rtp_send(int sock, uint8_t *payload, int payload_len, uint8_t marker, uint8_t pt, uint16_t sequence, uint32_t timestamp, uint32_t ssrc)
{
struct rtp_hdr *rtph;
@@ -157,6 +174,9 @@ static void rtp_send(int sock, uint8_t *payload, int payload_len, uint8_t marker
PDEBUG(DCC, DEBUG_DEBUG, "Write errno = %d (%s)\n", errno, strerror(errno));
}
static int rtp_listen_cb(struct osmo_fd *ofd, unsigned int when);
static int rtcp_listen_cb(struct osmo_fd *ofd, unsigned int when);
/* open and bind RTP
* set local port to what we bound
*/
@@ -221,15 +241,23 @@ socket_error:
osmo_cc_rtp_close(media);
return -EIO;
}
media->rtp_socket = rc;
media->rtp_ofd.fd = rc;
media->rtp_ofd.cb = rtp_listen_cb;
media->rtp_ofd.data = media;
media->rtp_ofd.when = OSMO_FD_READ;
osmo_fd_register(&media->rtp_ofd);
rc = socket(domain, SOCK_DGRAM, IPPROTO_UDP);
if (rc < 0)
goto socket_error;
media->rtcp_socket = rc;
media->rtcp_ofd.fd = rc;
media->rtcp_ofd.cb = rtcp_listen_cb;
media->rtcp_ofd.data = media;
media->rtcp_ofd.when = OSMO_FD_READ;
osmo_fd_register(&media->rtcp_ofd);
/* bind sockets */
*sport = htons(conf->rtp_port_next);
rc = bind(media->rtp_socket, (struct sockaddr *)&sa, slen);
rc = bind(media->rtp_ofd.fd, (struct sockaddr *)&sa, slen);
if (rc < 0) {
bind_error:
osmo_cc_rtp_close(media);
@@ -241,18 +269,18 @@ bind_error:
continue;
}
*sport = htons(conf->rtp_port_next + 1);
rc = bind(media->rtcp_socket, (struct sockaddr *)&sa, slen);
rc = bind(media->rtcp_ofd.fd, (struct sockaddr *)&sa, slen);
if (rc < 0)
goto bind_error;
media->description.port_local = conf->rtp_port_next;
conf->rtp_port_next = (conf->rtp_port_next + 2 > conf->rtp_port_to) ? conf->rtp_port_from : conf->rtp_port_next + 2;
/* set nonblocking io */
flags = fcntl(media->rtp_socket, F_GETFL);
/* set nonblocking io, to prevent write to block */
flags = fcntl(media->rtp_ofd.fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(media->rtp_socket, F_SETFL, flags);
flags = fcntl(media->rtcp_socket, F_GETFL);
fcntl(media->rtp_ofd.fd, F_SETFL, flags);
flags = fcntl(media->rtcp_ofd.fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(media->rtcp_socket, F_SETFL, flags);
fcntl(media->rtcp_ofd.fd, F_SETFL, flags);
break;
}
@@ -305,7 +333,7 @@ pton_error:
}
*sport = htons(media->description.port_remote);
rc = connect(media->rtp_socket, (struct sockaddr *)&sa, slen);
rc = connect(media->rtp_ofd.fd, (struct sockaddr *)&sa, slen);
if (rc < 0) {
connect_error:
PDEBUG(DCC, DEBUG_NOTICE, "Cannot connect to address '%s'.\n", media->connection_data_remote.address);
@@ -313,7 +341,7 @@ connect_error:
return -EIO;
}
*sport = htons(media->description.port_remote + 1);
rc = connect(media->rtcp_socket, (struct sockaddr *)&sa, slen);
rc = connect(media->rtcp_ofd.fd, (struct sockaddr *)&sa, slen);
if (rc < 0)
goto connect_error;
@@ -326,7 +354,7 @@ void osmo_cc_rtp_send(osmo_cc_session_codec_t *codec, uint8_t *data, int len, ui
uint8_t *payload = NULL;
int payload_len = 0;
if (!codec || !codec->media->rtp_socket)
if (!codec || !codec->media->rtp_ofd.fd)
return;
if (codec->encoder)
@@ -336,7 +364,7 @@ void osmo_cc_rtp_send(osmo_cc_session_codec_t *codec, uint8_t *data, int len, ui
payload_len = len;
}
rtp_send(codec->media->rtp_socket, payload, payload_len, marker, codec->payload_type_remote, codec->media->tx_sequence, codec->media->tx_timestamp, codec->media->tx_ssrc);
rtp_send(codec->media->rtp_ofd.fd, payload, payload_len, marker, codec->payload_type_remote, codec->media->tx_sequence, codec->media->tx_timestamp, codec->media->tx_ssrc);
codec->media->tx_sequence += inc_sequence;
codec->media->tx_timestamp += inc_timestamp;
@@ -344,9 +372,9 @@ void osmo_cc_rtp_send(osmo_cc_session_codec_t *codec, uint8_t *data, int len, ui
free(payload);
}
/* receive rtp data for given media, return < 0, if there is nothing this time */
int osmo_cc_rtp_receive(osmo_cc_session_media_t *media, void *priv)
static int rtp_listen_cb(struct osmo_fd *ofd, unsigned int when)
{
osmo_cc_session_media_t *media = ofd->data;
int rc;
uint8_t *payload = NULL;
int payload_len = 0;
@@ -356,49 +384,63 @@ int osmo_cc_rtp_receive(osmo_cc_session_media_t *media, void *priv)
uint8_t *data;
int len;
if (!media || media->rtp_socket <= 0)
return -EIO;
if (when & OSMO_FD_READ) {
rc = rtp_receive(media->rtp_ofd.fd, &payload, &payload_len, &marker, &payload_type, &media->rx_sequence, &media->rx_timestamp, &media->rx_ssrc);
if (rc < 0)
return rc;
rc = rtp_receive(media->rtp_socket, &payload, &payload_len, &marker, &payload_type, &media->rx_sequence, &media->rx_timestamp, &media->rx_ssrc);
if (rc < 0)
return rc;
/* search for codec */
for (codec = media->codec_list; codec; codec = codec->next) {
if (codec->payload_type_local == payload_type)
break;
}
if (!codec) {
PDEBUG(DCC, DEBUG_NOTICE, "Received RTP frame for unknown codec (payload_type = %d).\n", payload_type);
return 0;
}
/* search for codec */
for (codec = media->codec_list; codec; codec = codec->next) {
if (codec->payload_type_local == payload_type)
break;
}
if (!codec) {
PDEBUG(DCC, DEBUG_NOTICE, "Received RTP frame for unknown codec (payload_type = %d).\n", payload_type);
return 0;
if (codec->decoder)
codec->decoder(payload, payload_len, &data, &len, media->session->priv);
else {
data = payload;
len = payload_len;
}
if (codec->media->receive)
codec->media->receiver(codec, marker, media->rx_sequence, media->rx_timestamp, media->rx_ssrc, data, len);
if (codec->decoder)
free(data);
}
if (codec->decoder)
codec->decoder(payload, payload_len, &data, &len, priv);
else {
data = payload;
len = payload_len;
return 0;
}
static int rtcp_listen_cb(struct osmo_fd *ofd, unsigned int when)
{
osmo_cc_session_media_t *media = ofd->data;
int rc;
if (when & OSMO_FD_READ) {
rc = rtcp_receive(media->rtp_ofd.fd);
if (rc < 0)
return rc;
}
if (codec->media->receive)
codec->media->receiver(codec, marker, media->rx_sequence, media->rx_timestamp, media->rx_ssrc, data, len);
if (codec->decoder)
free(data);
return 0;
}
void osmo_cc_rtp_close(osmo_cc_session_media_t *media)
{
if (media->rtp_socket) {
close(media->rtp_socket);
media->rtp_socket = 0;
if (media->rtp_ofd.fd) {
osmo_fd_unregister(&media->rtp_ofd);
close(media->rtp_ofd.fd);
media->rtp_ofd.fd = 0;
}
if (media->rtcp_socket) {
close(media->rtcp_socket);
media->rtcp_socket = 0;
if (media->rtcp_ofd.fd) {
osmo_fd_unregister(&media->rtcp_ofd);
close(media->rtcp_ofd.fd);
media->rtcp_ofd.fd = 0;
}
}