2 * UDP prototype streaming system
3 * Copyright (c) 2000, 2001, 2002 Fabrice Bellard
5 * This file is part of FFmpeg.
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
27 #define _DEFAULT_SOURCE
28 #define _BSD_SOURCE /* Needed for using struct ip_mreq with recent glibc */
31 #include "avio_internal.h"
32 #include "libavutil/avassert.h"
33 #include "libavutil/parseutils.h"
34 #include "libavutil/fifo.h"
35 #include "libavutil/intreadwrite.h"
36 #include "libavutil/avstring.h"
37 #include "libavutil/opt.h"
38 #include "libavutil/log.h"
39 #include "libavutil/time.h"
42 #include "os_support.h"
47 #include "TargetConditionals.h"
53 /* On many Linux systems, udplite.h is missing but the kernel supports UDP-Lite.
54 * So, we provide a fallback here.
56 #define UDPLITE_SEND_CSCOV 10
57 #define UDPLITE_RECV_CSCOV 11
60 #ifndef IPPROTO_UDPLITE
61 #define IPPROTO_UDPLITE 136
65 #undef HAVE_PTHREAD_CANCEL
66 #define HAVE_PTHREAD_CANCEL 1
69 #if HAVE_PTHREAD_CANCEL
70 #include "libavutil/thread.h"
73 #ifndef IPV6_ADD_MEMBERSHIP
74 #define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
75 #define IPV6_DROP_MEMBERSHIP IPV6_LEAVE_GROUP
78 #define UDP_TX_BUF_SIZE 32768
79 #define UDP_RX_BUF_SIZE 393216
80 #define UDP_MAX_PKT_SIZE 65536
81 #define UDP_HEADER_SIZE 8
83 typedef struct UDPContext
{
95 struct sockaddr_storage dest_addr
;
99 /* Circular Buffer variables for use in UDP receive code */
100 int circular_buffer_size
;
102 int circular_buffer_error
;
103 int64_t bitrate
; /* number of bits to send per second */
106 #if HAVE_PTHREAD_CANCEL
107 pthread_t circular_buffer_thread
;
108 pthread_mutex_t mutex
;
112 uint8_t tmp
[UDP_MAX_PKT_SIZE
+4];
116 struct sockaddr_storage local_addr_storage
;
119 IPSourceFilters filters
;
122 #define OFFSET(x) offsetof(UDPContext, x)
123 #define D AV_OPT_FLAG_DECODING_PARAM
124 #define E AV_OPT_FLAG_ENCODING_PARAM
125 static const AVOption options
[] = {
126 { "buffer_size", "System data size (in bytes)", OFFSET(buffer_size
), AV_OPT_TYPE_INT
, { .i64
= -1 }, -1, INT_MAX
, .flags
= D
|E
},
127 { "bitrate", "Bits to send per second", OFFSET(bitrate
), AV_OPT_TYPE_INT64
, { .i64
= 0 }, 0, INT64_MAX
, .flags
= E
},
128 { "burst_bits", "Max length of bursts in bits (when using bitrate)", OFFSET(burst_bits
), AV_OPT_TYPE_INT64
, { .i64
= 0 }, 0, INT64_MAX
, .flags
= E
},
129 { "localport", "Local port", OFFSET(local_port
), AV_OPT_TYPE_INT
, { .i64
= -1 }, -1, INT_MAX
, D
|E
},
130 { "local_port", "Local port", OFFSET(local_port
), AV_OPT_TYPE_INT
, { .i64
= -1 }, -1, INT_MAX
, .flags
= D
|E
},
131 { "localaddr", "Local address", OFFSET(localaddr
), AV_OPT_TYPE_STRING
, { .str
= NULL
}, .flags
= D
|E
},
132 { "udplite_coverage", "choose UDPLite head size which should be validated by checksum", OFFSET(udplite_coverage
), AV_OPT_TYPE_INT
, {.i64
= 0}, 0, INT_MAX
, D
|E
},
133 { "pkt_size", "Maximum UDP packet size", OFFSET(pkt_size
), AV_OPT_TYPE_INT
, { .i64
= 1472 }, -1, INT_MAX
, .flags
= D
|E
},
134 { "reuse", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket
), AV_OPT_TYPE_BOOL
, { .i64
= -1 }, -1, 1, D
|E
},
135 { "reuse_socket", "explicitly allow reusing UDP sockets", OFFSET(reuse_socket
), AV_OPT_TYPE_BOOL
, { .i64
= -1 }, -1, 1, .flags
= D
|E
},
136 { "broadcast", "explicitly allow or disallow broadcast destination", OFFSET(is_broadcast
), AV_OPT_TYPE_BOOL
, { .i64
= 0 }, 0, 1, E
},
137 { "ttl", "Time to live (multicast only)", OFFSET(ttl
), AV_OPT_TYPE_INT
, { .i64
= 16 }, 0, INT_MAX
, E
},
138 { "connect", "set if connect() should be called on socket", OFFSET(is_connected
), AV_OPT_TYPE_BOOL
, { .i64
= 0 }, 0, 1, .flags
= D
|E
},
139 { "fifo_size", "set the UDP receiving circular buffer size, expressed as a number of packets with size of 188 bytes", OFFSET(circular_buffer_size
), AV_OPT_TYPE_INT
, {.i64
= 7*4096}, 0, INT_MAX
, D
},
140 { "overrun_nonfatal", "survive in case of UDP receiving circular buffer overrun", OFFSET(overrun_nonfatal
), AV_OPT_TYPE_BOOL
, {.i64
= 0}, 0, 1, D
},
141 { "timeout", "set raise error timeout (only in read mode)", OFFSET(timeout
), AV_OPT_TYPE_INT
, { .i64
= 0 }, 0, INT_MAX
, D
},
142 { "sources", "Source list", OFFSET(sources
), AV_OPT_TYPE_STRING
, { .str
= NULL
}, .flags
= D
|E
},
143 { "block", "Block list", OFFSET(block
), AV_OPT_TYPE_STRING
, { .str
= NULL
}, .flags
= D
|E
},
147 static const AVClass udp_class
= {
149 .item_name
= av_default_item_name
,
151 .version
= LIBAVUTIL_VERSION_INT
,
154 static const AVClass udplite_context_class
= {
155 .class_name
= "udplite",
156 .item_name
= av_default_item_name
,
158 .version
= LIBAVUTIL_VERSION_INT
,
161 static int udp_set_multicast_ttl(int sockfd
, int mcastTTL
,
162 struct sockaddr
*addr
)
164 #ifdef IP_MULTICAST_TTL
165 if (addr
->sa_family
== AF_INET
) {
166 if (setsockopt(sockfd
, IPPROTO_IP
, IP_MULTICAST_TTL
, &mcastTTL
, sizeof(mcastTTL
)) < 0) {
167 ff_log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IP_MULTICAST_TTL)");
172 #if defined(IPPROTO_IPV6) && defined(IPV6_MULTICAST_HOPS)
173 if (addr
->sa_family
== AF_INET6
) {
174 if (setsockopt(sockfd
, IPPROTO_IPV6
, IPV6_MULTICAST_HOPS
, &mcastTTL
, sizeof(mcastTTL
)) < 0) {
175 ff_log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IPV6_MULTICAST_HOPS)");
183 static int udp_join_multicast_group(int sockfd
, struct sockaddr
*addr
,struct sockaddr
*local_addr
)
185 #ifdef IP_ADD_MEMBERSHIP
186 if (addr
->sa_family
== AF_INET
) {
189 mreq
.imr_multiaddr
.s_addr
= ((struct sockaddr_in
*)addr
)->sin_addr
.s_addr
;
191 mreq
.imr_interface
= ((struct sockaddr_in
*)local_addr
)->sin_addr
;
193 mreq
.imr_interface
.s_addr
= INADDR_ANY
;
194 if (setsockopt(sockfd
, IPPROTO_IP
, IP_ADD_MEMBERSHIP
, (const void *)&mreq
, sizeof(mreq
)) < 0) {
195 ff_log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IP_ADD_MEMBERSHIP)");
200 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
201 if (addr
->sa_family
== AF_INET6
) {
202 struct ipv6_mreq mreq6
;
204 memcpy(&mreq6
.ipv6mr_multiaddr
, &(((struct sockaddr_in6
*)addr
)->sin6_addr
), sizeof(struct in6_addr
));
205 //TODO: Interface index should be looked up from local_addr
206 mreq6
.ipv6mr_interface
= 0;
207 if (setsockopt(sockfd
, IPPROTO_IPV6
, IPV6_ADD_MEMBERSHIP
, &mreq6
, sizeof(mreq6
)) < 0) {
208 ff_log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IPV6_ADD_MEMBERSHIP)");
216 static int udp_leave_multicast_group(int sockfd
, struct sockaddr
*addr
,struct sockaddr
*local_addr
)
218 #ifdef IP_DROP_MEMBERSHIP
219 if (addr
->sa_family
== AF_INET
) {
222 mreq
.imr_multiaddr
.s_addr
= ((struct sockaddr_in
*)addr
)->sin_addr
.s_addr
;
224 mreq
.imr_interface
= ((struct sockaddr_in
*)local_addr
)->sin_addr
;
226 mreq
.imr_interface
.s_addr
= INADDR_ANY
;
227 if (setsockopt(sockfd
, IPPROTO_IP
, IP_DROP_MEMBERSHIP
, (const void *)&mreq
, sizeof(mreq
)) < 0) {
228 ff_log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IP_DROP_MEMBERSHIP)");
233 #if HAVE_STRUCT_IPV6_MREQ && defined(IPPROTO_IPV6)
234 if (addr
->sa_family
== AF_INET6
) {
235 struct ipv6_mreq mreq6
;
237 memcpy(&mreq6
.ipv6mr_multiaddr
, &(((struct sockaddr_in6
*)addr
)->sin6_addr
), sizeof(struct in6_addr
));
238 //TODO: Interface index should be looked up from local_addr
239 mreq6
.ipv6mr_interface
= 0;
240 if (setsockopt(sockfd
, IPPROTO_IPV6
, IPV6_DROP_MEMBERSHIP
, &mreq6
, sizeof(mreq6
)) < 0) {
241 ff_log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(IPV6_DROP_MEMBERSHIP)");
249 static int udp_set_multicast_sources(URLContext
*h
,
250 int sockfd
, struct sockaddr
*addr
,
251 int addr_len
, struct sockaddr_storage
*local_addr
,
252 struct sockaddr_storage
*sources
,
253 int nb_sources
, int include
)
256 if (addr
->sa_family
!= AF_INET
) {
257 #if HAVE_STRUCT_GROUP_SOURCE_REQ && defined(MCAST_BLOCK_SOURCE)
258 /* For IPv4 prefer the old approach, as that alone works reliably on
259 * Windows and it also supports supplying the interface based on its
262 for (i
= 0; i
< nb_sources
; i
++) {
263 struct group_source_req mreqs
;
264 int level
= addr
->sa_family
== AF_INET
? IPPROTO_IP
: IPPROTO_IPV6
;
266 //TODO: Interface index should be looked up from local_addr
267 mreqs
.gsr_interface
= 0;
268 memcpy(&mreqs
.gsr_group
, addr
, addr_len
);
269 memcpy(&mreqs
.gsr_source
, &sources
[i
], sizeof(*sources
));
271 if (setsockopt(sockfd
, level
,
272 include
? MCAST_JOIN_SOURCE_GROUP
: MCAST_BLOCK_SOURCE
,
273 (const void *)&mreqs
, sizeof(mreqs
)) < 0) {
275 ff_log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(MCAST_JOIN_SOURCE_GROUP)");
277 ff_log_net_error(NULL
, AV_LOG_ERROR
, "setsockopt(MCAST_BLOCK_SOURCE)");
278 return ff_neterrno();
283 av_log(h
, AV_LOG_ERROR
,
284 "Setting multicast sources only supported for IPv4\n");
285 return AVERROR(EINVAL
);
288 #if HAVE_STRUCT_IP_MREQ_SOURCE && defined(IP_BLOCK_SOURCE)
289 for (i
= 0; i
< nb_sources
; i
++) {
290 struct ip_mreq_source mreqs
;
291 if (sources
[i
].ss_family
!= AF_INET
) {
292 av_log(h
, AV_LOG_ERROR
, "Source/block address %d is of incorrect protocol family\n", i
+ 1);
293 return AVERROR(EINVAL
);
296 mreqs
.imr_multiaddr
.s_addr
= ((struct sockaddr_in
*)addr
)->sin_addr
.s_addr
;
298 mreqs
.imr_interface
= ((struct sockaddr_in
*)local_addr
)->sin_addr
;
300 mreqs
.imr_interface
.s_addr
= INADDR_ANY
;
301 mreqs
.imr_sourceaddr
.s_addr
= ((struct sockaddr_in
*)&sources
[i
])->sin_addr
.s_addr
;
303 if (setsockopt(sockfd
, IPPROTO_IP
,
304 include
? IP_ADD_SOURCE_MEMBERSHIP
: IP_BLOCK_SOURCE
,
305 (const void *)&mreqs
, sizeof(mreqs
)) < 0) {
307 ff_log_net_error(h
, AV_LOG_ERROR
, "setsockopt(IP_ADD_SOURCE_MEMBERSHIP)");
309 ff_log_net_error(h
, AV_LOG_ERROR
, "setsockopt(IP_BLOCK_SOURCE)");
310 return ff_neterrno();
314 return AVERROR(ENOSYS
);
318 static int udp_set_url(URLContext
*h
,
319 struct sockaddr_storage
*addr
,
320 const char *hostname
, int port
)
322 struct addrinfo
*res0
;
325 res0
= ff_ip_resolve_host(h
, hostname
, port
, SOCK_DGRAM
, AF_UNSPEC
, 0);
326 if (!res0
) return AVERROR(EIO
);
327 memcpy(addr
, res0
->ai_addr
, res0
->ai_addrlen
);
328 addr_len
= res0
->ai_addrlen
;
334 static int udp_socket_create(URLContext
*h
, struct sockaddr_storage
*addr
,
335 socklen_t
*addr_len
, const char *localaddr
)
337 UDPContext
*s
= h
->priv_data
;
339 struct addrinfo
*res0
, *res
;
340 int family
= AF_UNSPEC
;
342 if (((struct sockaddr
*) &s
->dest_addr
)->sa_family
)
343 family
= ((struct sockaddr
*) &s
->dest_addr
)->sa_family
;
344 res0
= ff_ip_resolve_host(h
, (localaddr
&& localaddr
[0]) ? localaddr
: NULL
,
346 SOCK_DGRAM
, family
, AI_PASSIVE
);
349 for (res
= res0
; res
; res
=res
->ai_next
) {
350 if (s
->udplite_coverage
)
351 udp_fd
= ff_socket(res
->ai_family
, SOCK_DGRAM
, IPPROTO_UDPLITE
);
353 udp_fd
= ff_socket(res
->ai_family
, SOCK_DGRAM
, 0);
354 if (udp_fd
!= -1) break;
355 ff_log_net_error(NULL
, AV_LOG_ERROR
, "socket");
361 memcpy(addr
, res
->ai_addr
, res
->ai_addrlen
);
362 *addr_len
= res
->ai_addrlen
;
376 static int udp_port(struct sockaddr_storage
*addr
, int addr_len
)
378 char sbuf
[sizeof(int)*3+1];
381 if ((error
= getnameinfo((struct sockaddr
*)addr
, addr_len
, NULL
, 0, sbuf
, sizeof(sbuf
), NI_NUMERICSERV
)) != 0) {
382 av_log(NULL
, AV_LOG_ERROR
, "getnameinfo: %s\n", gai_strerror(error
));
386 return strtol(sbuf
, NULL
, 10);
391 * If no filename is given to av_open_input_file because you want to
392 * get the local port first, then you must call this function to set
393 * the remote server address.
395 * url syntax: udp://host:port[?option=val...]
396 * option: 'ttl=n' : set the ttl value (for multicast only)
397 * 'localport=n' : set the local port
398 * 'pkt_size=n' : set max packet size
399 * 'reuse=1' : enable reusing the socket
400 * 'overrun_nonfatal=1': survive in case of circular buffer overrun
402 * @param h media file context
403 * @param uri of the remote server
404 * @return zero if no error.
406 int ff_udp_set_remote_url(URLContext
*h
, const char *uri
)
408 UDPContext
*s
= h
->priv_data
;
409 char hostname
[256], buf
[10];
413 av_url_split(NULL
, 0, NULL
, 0, hostname
, sizeof(hostname
), &port
, NULL
, 0, uri
);
415 /* set the destination address */
416 s
->dest_addr_len
= udp_set_url(h
, &s
->dest_addr
, hostname
, port
);
417 if (s
->dest_addr_len
< 0) {
420 s
->is_multicast
= ff_is_multicast_address((struct sockaddr
*) &s
->dest_addr
);
421 p
= strchr(uri
, '?');
423 if (av_find_info_tag(buf
, sizeof(buf
), "connect", p
)) {
424 int was_connected
= s
->is_connected
;
425 s
->is_connected
= strtol(buf
, NULL
, 10);
426 if (s
->is_connected
&& !was_connected
) {
427 if (connect(s
->udp_fd
, (struct sockaddr
*) &s
->dest_addr
,
430 ff_log_net_error(h
, AV_LOG_ERROR
, "connect");
441 * Return the local port used by the UDP connection
442 * @param h media file context
443 * @return the local port number
445 int ff_udp_get_local_port(URLContext
*h
)
447 UDPContext
*s
= h
->priv_data
;
448 return s
->local_port
;
452 * Return the udp file handle for select() usage to wait for several RTP
453 * streams at the same time.
454 * @param h media file context
456 static int udp_get_file_handle(URLContext
*h
)
458 UDPContext
*s
= h
->priv_data
;
462 #if HAVE_PTHREAD_CANCEL
463 static void *circular_buffer_task_rx( void *_URLContext
)
465 URLContext
*h
= _URLContext
;
466 UDPContext
*s
= h
->priv_data
;
469 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &old_cancelstate
);
470 pthread_mutex_lock(&s
->mutex
);
471 if (ff_socket_nonblock(s
->udp_fd
, 0) < 0) {
472 av_log(h
, AV_LOG_ERROR
, "Failed to set blocking mode");
473 s
->circular_buffer_error
= AVERROR(EIO
);
478 struct sockaddr_storage addr
;
479 socklen_t addr_len
= sizeof(addr
);
481 pthread_mutex_unlock(&s
->mutex
);
482 /* Blocking operations are always cancellation points;
483 see "General Information" / "Thread Cancelation Overview"
485 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE
, &old_cancelstate
);
486 len
= recvfrom(s
->udp_fd
, s
->tmp
+4, sizeof(s
->tmp
)-4, 0, (struct sockaddr
*)&addr
, &addr_len
);
487 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE
, &old_cancelstate
);
488 pthread_mutex_lock(&s
->mutex
);
490 if (ff_neterrno() != AVERROR(EAGAIN
) && ff_neterrno() != AVERROR(EINTR
)) {
491 s
->circular_buffer_error
= ff_neterrno();
496 if (ff_ip_check_source_lists(&addr
, &s
->filters
))
498 AV_WL32(s
->tmp
, len
);
500 if(av_fifo_space(s
->fifo
) < len
+ 4) {
502 if (s
->overrun_nonfatal
) {
503 av_log(h
, AV_LOG_WARNING
, "Circular buffer overrun. "
504 "Surviving due to overrun_nonfatal option\n");
507 av_log(h
, AV_LOG_ERROR
, "Circular buffer overrun. "
508 "To avoid, increase fifo_size URL option. "
509 "To survive in such case, use overrun_nonfatal option\n");
510 s
->circular_buffer_error
= AVERROR(EIO
);
514 av_fifo_generic_write(s
->fifo
, s
->tmp
, len
+4, NULL
);
515 pthread_cond_signal(&s
->cond
);
519 pthread_cond_signal(&s
->cond
);
520 pthread_mutex_unlock(&s
->mutex
);
524 static void *circular_buffer_task_tx( void *_URLContext
)
526 URLContext
*h
= _URLContext
;
527 UDPContext
*s
= h
->priv_data
;
528 int64_t target_timestamp
= av_gettime_relative();
529 int64_t start_timestamp
= av_gettime_relative();
530 int64_t sent_bits
= 0;
531 int64_t burst_interval
= s
->bitrate
? (s
->burst_bits
* 1000000 / s
->bitrate
) : 0;
532 int64_t max_delay
= s
->bitrate
? ((int64_t)h
->max_packet_size
* 8 * 1000000 / s
->bitrate
+ 1) : 0;
534 pthread_mutex_lock(&s
->mutex
);
536 if (ff_socket_nonblock(s
->udp_fd
, 0) < 0) {
537 av_log(h
, AV_LOG_ERROR
, "Failed to set blocking mode");
538 s
->circular_buffer_error
= AVERROR(EIO
);
548 len
=av_fifo_size(s
->fifo
);
553 if (pthread_cond_wait(&s
->cond
, &s
->mutex
) < 0) {
556 len
=av_fifo_size(s
->fifo
);
559 av_fifo_generic_read(s
->fifo
, tmp
, 4, NULL
);
562 av_assert0(len
>= 0);
563 av_assert0(len
<= sizeof(s
->tmp
));
565 av_fifo_generic_read(s
->fifo
, s
->tmp
, len
, NULL
);
567 pthread_mutex_unlock(&s
->mutex
);
570 timestamp
= av_gettime_relative();
571 if (timestamp
< target_timestamp
) {
572 int64_t delay
= target_timestamp
- timestamp
;
573 if (delay
> max_delay
) {
575 start_timestamp
= timestamp
+ delay
;
580 if (timestamp
- burst_interval
> target_timestamp
) {
581 start_timestamp
= timestamp
- burst_interval
;
585 sent_bits
+= len
* 8;
586 target_timestamp
= start_timestamp
+ sent_bits
* 1000000 / s
->bitrate
;
593 if (!s
->is_connected
) {
594 ret
= sendto (s
->udp_fd
, p
, len
, 0,
595 (struct sockaddr
*) &s
->dest_addr
,
598 ret
= send(s
->udp_fd
, p
, len
, 0);
604 if (ret
!= AVERROR(EAGAIN
) && ret
!= AVERROR(EINTR
)) {
605 pthread_mutex_lock(&s
->mutex
);
606 s
->circular_buffer_error
= ret
;
607 pthread_mutex_unlock(&s
->mutex
);
613 pthread_mutex_lock(&s
->mutex
);
617 pthread_mutex_unlock(&s
->mutex
);
624 /* put it in UDP context */
625 /* return non zero if error */
626 static int udp_open(URLContext
*h
, const char *uri
, int flags
)
628 char hostname
[1024], localaddr
[1024] = "";
629 int port
, udp_fd
= -1, tmp
, bind_ret
= -1, dscp
= -1;
630 UDPContext
*s
= h
->priv_data
;
634 struct sockaddr_storage my_addr
;
639 is_output
= !(flags
& AVIO_FLAG_READ
);
640 if (s
->buffer_size
< 0)
641 s
->buffer_size
= is_output
? UDP_TX_BUF_SIZE
: UDP_RX_BUF_SIZE
;
644 if (ff_ip_parse_sources(h
, s
->sources
, &s
->filters
) < 0)
649 if (ff_ip_parse_blocks(h
, s
->block
, &s
->filters
) < 0)
654 h
->max_packet_size
= s
->pkt_size
;
656 p
= strchr(uri
, '?');
658 if (av_find_info_tag(buf
, sizeof(buf
), "reuse", p
)) {
660 s
->reuse_socket
= strtol(buf
, &endptr
, 10);
661 /* assume if no digits were found it is a request to enable it */
665 if (av_find_info_tag(buf
, sizeof(buf
), "overrun_nonfatal", p
)) {
667 s
->overrun_nonfatal
= strtol(buf
, &endptr
, 10);
668 /* assume if no digits were found it is a request to enable it */
670 s
->overrun_nonfatal
= 1;
671 if (!HAVE_PTHREAD_CANCEL
)
672 av_log(h
, AV_LOG_WARNING
,
673 "'overrun_nonfatal' option was set but it is not supported "
674 "on this build (pthread support is required)\n");
676 if (av_find_info_tag(buf
, sizeof(buf
), "ttl", p
)) {
677 s
->ttl
= strtol(buf
, NULL
, 10);
679 if (av_find_info_tag(buf
, sizeof(buf
), "udplite_coverage", p
)) {
680 s
->udplite_coverage
= strtol(buf
, NULL
, 10);
682 if (av_find_info_tag(buf
, sizeof(buf
), "localport", p
)) {
683 s
->local_port
= strtol(buf
, NULL
, 10);
685 if (av_find_info_tag(buf
, sizeof(buf
), "pkt_size", p
)) {
686 s
->pkt_size
= strtol(buf
, NULL
, 10);
688 if (av_find_info_tag(buf
, sizeof(buf
), "buffer_size", p
)) {
689 s
->buffer_size
= strtol(buf
, NULL
, 10);
691 if (av_find_info_tag(buf
, sizeof(buf
), "connect", p
)) {
692 s
->is_connected
= strtol(buf
, NULL
, 10);
694 if (av_find_info_tag(buf
, sizeof(buf
), "dscp", p
)) {
695 dscp
= strtol(buf
, NULL
, 10);
697 if (av_find_info_tag(buf
, sizeof(buf
), "fifo_size", p
)) {
698 s
->circular_buffer_size
= strtol(buf
, NULL
, 10);
699 if (!HAVE_PTHREAD_CANCEL
)
700 av_log(h
, AV_LOG_WARNING
,
701 "'circular_buffer_size' option was set but it is not supported "
702 "on this build (pthread support is required)\n");
704 if (av_find_info_tag(buf
, sizeof(buf
), "bitrate", p
)) {
705 s
->bitrate
= strtoll(buf
, NULL
, 10);
706 if (!HAVE_PTHREAD_CANCEL
)
707 av_log(h
, AV_LOG_WARNING
,
708 "'bitrate' option was set but it is not supported "
709 "on this build (pthread support is required)\n");
711 if (av_find_info_tag(buf
, sizeof(buf
), "burst_bits", p
)) {
712 s
->burst_bits
= strtoll(buf
, NULL
, 10);
714 if (av_find_info_tag(buf
, sizeof(buf
), "localaddr", p
)) {
715 av_strlcpy(localaddr
, buf
, sizeof(localaddr
));
717 if (av_find_info_tag(buf
, sizeof(buf
), "sources", p
)) {
718 if (ff_ip_parse_sources(h
, buf
, &s
->filters
) < 0)
721 if (av_find_info_tag(buf
, sizeof(buf
), "block", p
)) {
722 if (ff_ip_parse_blocks(h
, buf
, &s
->filters
) < 0)
725 if (!is_output
&& av_find_info_tag(buf
, sizeof(buf
), "timeout", p
))
726 s
->timeout
= strtol(buf
, NULL
, 10);
727 if (is_output
&& av_find_info_tag(buf
, sizeof(buf
), "broadcast", p
))
728 s
->is_broadcast
= strtol(buf
, NULL
, 10);
730 /* handling needed to support options picking from both AVOption and URL */
731 s
->circular_buffer_size
*= 188;
732 if (flags
& AVIO_FLAG_WRITE
) {
733 h
->max_packet_size
= s
->pkt_size
;
735 h
->max_packet_size
= UDP_MAX_PKT_SIZE
;
737 h
->rw_timeout
= s
->timeout
;
739 /* fill the dest addr */
740 av_url_split(NULL
, 0, NULL
, 0, hostname
, sizeof(hostname
), &port
, NULL
, 0, uri
);
742 /* XXX: fix av_url_split */
743 if (hostname
[0] == '\0' || hostname
[0] == '?') {
744 /* only accepts null hostname if input */
745 if (!(flags
& AVIO_FLAG_READ
))
748 if (ff_udp_set_remote_url(h
, uri
) < 0)
752 if ((s
->is_multicast
|| s
->local_port
<= 0) && (h
->flags
& AVIO_FLAG_READ
))
753 s
->local_port
= port
;
756 udp_fd
= udp_socket_create(h
, &my_addr
, &len
, localaddr
);
758 udp_fd
= udp_socket_create(h
, &my_addr
, &len
, s
->localaddr
);
762 s
->local_addr_storage
=my_addr
; //store for future multicast join
764 /* Follow the requested reuse option, unless it's multicast in which
765 * case enable reuse unless explicitly disabled.
767 if (s
->reuse_socket
> 0 || (s
->is_multicast
&& s
->reuse_socket
< 0)) {
769 if (setsockopt (udp_fd
, SOL_SOCKET
, SO_REUSEADDR
, &(s
->reuse_socket
), sizeof(s
->reuse_socket
)) != 0)
773 if (s
->is_broadcast
) {
775 if (setsockopt (udp_fd
, SOL_SOCKET
, SO_BROADCAST
, &(s
->is_broadcast
), sizeof(s
->is_broadcast
)) != 0)
780 /* Set the checksum coverage for UDP-Lite (RFC 3828) for sending and receiving.
781 * The receiver coverage has to be less than or equal to the sender coverage.
782 * Otherwise, the receiver will drop all packets.
784 if (s
->udplite_coverage
) {
785 if (setsockopt (udp_fd
, IPPROTO_UDPLITE
, UDPLITE_SEND_CSCOV
, &(s
->udplite_coverage
), sizeof(s
->udplite_coverage
)) != 0)
786 av_log(h
, AV_LOG_WARNING
, "socket option UDPLITE_SEND_CSCOV not available");
788 if (setsockopt (udp_fd
, IPPROTO_UDPLITE
, UDPLITE_RECV_CSCOV
, &(s
->udplite_coverage
), sizeof(s
->udplite_coverage
)) != 0)
789 av_log(h
, AV_LOG_WARNING
, "socket option UDPLITE_RECV_CSCOV not available");
794 if (setsockopt (udp_fd
, IPPROTO_IP
, IP_TOS
, &dscp
, sizeof(dscp
)) != 0)
798 /* If multicast, try binding the multicast address first, to avoid
799 * receiving UDP packets from other sources aimed at the same UDP
800 * port. This fails on windows. This makes sending to the same address
801 * using sendto() fail, so only do it if we're opened in read-only mode. */
802 if (s
->is_multicast
&& (h
->flags
& AVIO_FLAG_READ
)) {
803 bind_ret
= bind(udp_fd
,(struct sockaddr
*)&s
->dest_addr
, len
);
805 /* bind to the local address if not multicast or if the multicast
807 /* the bind is needed to give a port to the socket now */
808 if (bind_ret
< 0 && bind(udp_fd
,(struct sockaddr
*)&my_addr
, len
) < 0) {
809 ff_log_net_error(h
, AV_LOG_ERROR
, "bind failed");
813 len
= sizeof(my_addr
);
814 getsockname(udp_fd
, (struct sockaddr
*)&my_addr
, &len
);
815 s
->local_port
= udp_port(&my_addr
, len
);
817 if (s
->is_multicast
) {
818 if (h
->flags
& AVIO_FLAG_WRITE
) {
820 if (udp_set_multicast_ttl(udp_fd
, s
->ttl
, (struct sockaddr
*)&s
->dest_addr
) < 0)
823 if (h
->flags
& AVIO_FLAG_READ
) {
825 if (s
->filters
.nb_include_addrs
) {
826 if (udp_set_multicast_sources(h
, udp_fd
,
827 (struct sockaddr
*)&s
->dest_addr
,
828 s
->dest_addr_len
, &s
->local_addr_storage
,
829 s
->filters
.include_addrs
,
830 s
->filters
.nb_include_addrs
, 1) < 0)
833 if (udp_join_multicast_group(udp_fd
, (struct sockaddr
*)&s
->dest_addr
,(struct sockaddr
*)&s
->local_addr_storage
) < 0)
836 if (s
->filters
.nb_exclude_addrs
) {
837 if (udp_set_multicast_sources(h
, udp_fd
,
838 (struct sockaddr
*)&s
->dest_addr
,
839 s
->dest_addr_len
, &s
->local_addr_storage
,
840 s
->filters
.exclude_addrs
,
841 s
->filters
.nb_exclude_addrs
, 0) < 0)
848 /* limit the tx buf size to limit latency */
849 tmp
= s
->buffer_size
;
850 if (setsockopt(udp_fd
, SOL_SOCKET
, SO_SNDBUF
, &tmp
, sizeof(tmp
)) < 0) {
851 ff_log_net_error(h
, AV_LOG_ERROR
, "setsockopt(SO_SNDBUF)");
855 /* set udp recv buffer size to the requested value (default 64K) */
856 tmp
= s
->buffer_size
;
857 if (setsockopt(udp_fd
, SOL_SOCKET
, SO_RCVBUF
, &tmp
, sizeof(tmp
)) < 0) {
858 ff_log_net_error(h
, AV_LOG_WARNING
, "setsockopt(SO_RECVBUF)");
861 if (getsockopt(udp_fd
, SOL_SOCKET
, SO_RCVBUF
, &tmp
, &len
) < 0) {
862 ff_log_net_error(h
, AV_LOG_WARNING
, "getsockopt(SO_RCVBUF)");
864 av_log(h
, AV_LOG_DEBUG
, "end receive buffer size reported is %d\n", tmp
);
865 if(tmp
< s
->buffer_size
)
866 av_log(h
, AV_LOG_WARNING
, "attempted to set receive buffer to size %d but it only ended up set as %d\n", s
->buffer_size
, tmp
);
869 /* make the socket non-blocking */
870 ff_socket_nonblock(udp_fd
, 1);
872 if (s
->is_connected
) {
873 if (connect(udp_fd
, (struct sockaddr
*) &s
->dest_addr
, s
->dest_addr_len
)) {
874 ff_log_net_error(h
, AV_LOG_ERROR
, "connect");
881 #if HAVE_PTHREAD_CANCEL
883 Create thread in case of:
884 1. Input and circular_buffer_size is set
885 2. Output and bitrate and circular_buffer_size is set
888 if (is_output
&& s
->bitrate
&& !s
->circular_buffer_size
) {
889 /* Warn user in case of 'circular_buffer_size' is not set */
890 av_log(h
, AV_LOG_WARNING
,"'bitrate' option was set but 'circular_buffer_size' is not, but required\n");
893 if ((!is_output
&& s
->circular_buffer_size
) || (is_output
&& s
->bitrate
&& s
->circular_buffer_size
)) {
896 /* start the task going */
897 s
->fifo
= av_fifo_alloc(s
->circular_buffer_size
);
898 ret
= pthread_mutex_init(&s
->mutex
, NULL
);
900 av_log(h
, AV_LOG_ERROR
, "pthread_mutex_init failed : %s\n", strerror(ret
));
903 ret
= pthread_cond_init(&s
->cond
, NULL
);
905 av_log(h
, AV_LOG_ERROR
, "pthread_cond_init failed : %s\n", strerror(ret
));
908 ret
= pthread_create(&s
->circular_buffer_thread
, NULL
, is_output
?circular_buffer_task_tx
:circular_buffer_task_rx
, h
);
910 av_log(h
, AV_LOG_ERROR
, "pthread_create failed : %s\n", strerror(ret
));
913 s
->thread_started
= 1;
918 #if HAVE_PTHREAD_CANCEL
920 pthread_cond_destroy(&s
->cond
);
922 pthread_mutex_destroy(&s
->mutex
);
927 av_fifo_freep(&s
->fifo
);
928 ff_ip_reset_filters(&s
->filters
);
932 static int udplite_open(URLContext
*h
, const char *uri
, int flags
)
934 UDPContext
*s
= h
->priv_data
;
936 // set default checksum coverage
937 s
->udplite_coverage
= UDP_HEADER_SIZE
;
939 return udp_open(h
, uri
, flags
);
942 static int udp_read(URLContext
*h
, uint8_t *buf
, int size
)
944 UDPContext
*s
= h
->priv_data
;
946 struct sockaddr_storage addr
;
947 socklen_t addr_len
= sizeof(addr
);
948 #if HAVE_PTHREAD_CANCEL
949 int avail
, nonblock
= h
->flags
& AVIO_FLAG_NONBLOCK
;
952 pthread_mutex_lock(&s
->mutex
);
954 avail
= av_fifo_size(s
->fifo
);
955 if (avail
) { // >=size) {
958 av_fifo_generic_read(s
->fifo
, tmp
, 4, NULL
);
961 av_log(h
, AV_LOG_WARNING
, "Part of datagram lost due to insufficient buffer size\n");
965 av_fifo_generic_read(s
->fifo
, buf
, avail
, NULL
);
966 av_fifo_drain(s
->fifo
, AV_RL32(tmp
) - avail
);
967 pthread_mutex_unlock(&s
->mutex
);
969 } else if(s
->circular_buffer_error
){
970 int err
= s
->circular_buffer_error
;
971 pthread_mutex_unlock(&s
->mutex
);
973 } else if(nonblock
) {
974 pthread_mutex_unlock(&s
->mutex
);
975 return AVERROR(EAGAIN
);
978 /* FIXME: using the monotonic clock would be better,
979 but it does not exist on all supported platforms. */
980 int64_t t
= av_gettime() + 100000;
981 struct timespec tv
= { .tv_sec
= t
/ 1000000,
982 .tv_nsec
= (t
% 1000000) * 1000 };
983 int err
= pthread_cond_timedwait(&s
->cond
, &s
->mutex
, &tv
);
985 pthread_mutex_unlock(&s
->mutex
);
986 return AVERROR(err
== ETIMEDOUT
? EAGAIN
: err
);
994 if (!(h
->flags
& AVIO_FLAG_NONBLOCK
)) {
995 ret
= ff_network_wait_fd(s
->udp_fd
, 0);
999 ret
= recvfrom(s
->udp_fd
, buf
, size
, 0, (struct sockaddr
*)&addr
, &addr_len
);
1001 return ff_neterrno();
1002 if (ff_ip_check_source_lists(&addr
, &s
->filters
))
1003 return AVERROR(EINTR
);
1007 static int udp_write(URLContext
*h
, const uint8_t *buf
, int size
)
1009 UDPContext
*s
= h
->priv_data
;
1012 #if HAVE_PTHREAD_CANCEL
1016 pthread_mutex_lock(&s
->mutex
);
1019 Return error if last tx failed.
1020 Here we can't know on which packet error was, but it needs to know that error exists.
1022 if (s
->circular_buffer_error
<0) {
1023 int err
=s
->circular_buffer_error
;
1024 pthread_mutex_unlock(&s
->mutex
);
1028 if(av_fifo_space(s
->fifo
) < size
+ 4) {
1029 /* What about a partial packet tx ? */
1030 pthread_mutex_unlock(&s
->mutex
);
1031 return AVERROR(ENOMEM
);
1034 av_fifo_generic_write(s
->fifo
, tmp
, 4, NULL
); /* size of packet */
1035 av_fifo_generic_write(s
->fifo
, (uint8_t *)buf
, size
, NULL
); /* the data */
1036 pthread_cond_signal(&s
->cond
);
1037 pthread_mutex_unlock(&s
->mutex
);
1041 if (!(h
->flags
& AVIO_FLAG_NONBLOCK
)) {
1042 ret
= ff_network_wait_fd(s
->udp_fd
, 1);
1047 if (!s
->is_connected
) {
1048 ret
= sendto (s
->udp_fd
, buf
, size
, 0,
1049 (struct sockaddr
*) &s
->dest_addr
,
1052 ret
= send(s
->udp_fd
, buf
, size
, 0);
1054 return ret
< 0 ? ff_neterrno() : ret
;
1057 static int udp_close(URLContext
*h
)
1059 UDPContext
*s
= h
->priv_data
;
1061 #if HAVE_PTHREAD_CANCEL
1062 // Request close once writing is finished
1063 if (s
->thread_started
&& !(h
->flags
& AVIO_FLAG_READ
)) {
1064 pthread_mutex_lock(&s
->mutex
);
1066 pthread_cond_signal(&s
->cond
);
1067 pthread_mutex_unlock(&s
->mutex
);
1071 if (s
->is_multicast
&& (h
->flags
& AVIO_FLAG_READ
))
1072 udp_leave_multicast_group(s
->udp_fd
, (struct sockaddr
*)&s
->dest_addr
,(struct sockaddr
*)&s
->local_addr_storage
);
1073 #if HAVE_PTHREAD_CANCEL
1074 if (s
->thread_started
) {
1076 // Cancel only read, as write has been signaled as success to the user
1077 if (h
->flags
& AVIO_FLAG_READ
) {
1079 /* recvfrom() is not a cancellation point for win32, so we shutdown
1080 * the socket and abort pending IO, subsequent recvfrom() calls
1081 * will fail with WSAESHUTDOWN causing the thread to exit. */
1082 shutdown(s
->udp_fd
, SD_RECEIVE
);
1083 CancelIoEx((HANDLE
)(SOCKET
)s
->udp_fd
, NULL
);
1085 pthread_cancel(s
->circular_buffer_thread
);
1088 ret
= pthread_join(s
->circular_buffer_thread
, NULL
);
1090 av_log(h
, AV_LOG_ERROR
, "pthread_join(): %s\n", strerror(ret
));
1091 pthread_mutex_destroy(&s
->mutex
);
1092 pthread_cond_destroy(&s
->cond
);
1095 closesocket(s
->udp_fd
);
1096 av_fifo_freep(&s
->fifo
);
1097 ff_ip_reset_filters(&s
->filters
);
1101 const URLProtocol ff_udp_protocol
= {
1103 .url_open
= udp_open
,
1104 .url_read
= udp_read
,
1105 .url_write
= udp_write
,
1106 .url_close
= udp_close
,
1107 .url_get_file_handle
= udp_get_file_handle
,
1108 .priv_data_size
= sizeof(UDPContext
),
1109 .priv_data_class
= &udp_class
,
1110 .flags
= URL_PROTOCOL_FLAG_NETWORK
,
1113 const URLProtocol ff_udplite_protocol
= {
1115 .url_open
= udplite_open
,
1116 .url_read
= udp_read
,
1117 .url_write
= udp_write
,
1118 .url_close
= udp_close
,
1119 .url_get_file_handle
= udp_get_file_handle
,
1120 .priv_data_size
= sizeof(UDPContext
),
1121 .priv_data_class
= &udplite_context_class
,
1122 .flags
= URL_PROTOCOL_FLAG_NETWORK
,