2 * Advanced Message Queuing Protocol (AMQP) 0-9-1
3 * Copyright (c) 2020 Andriy Gelman
5 * This file is part of FFmpeg.
7 * FFmpeg is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2.1 of the License, or (at your option) any later version.
12 * FFmpeg is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with FFmpeg; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 #include <amqp_tcp_socket.h>
26 #include "libavutil/mem.h"
27 #include "libavutil/opt.h"
30 #include "urldecode.h"
32 typedef struct AMQPContext
{
34 amqp_connection_state_t conn
;
35 amqp_socket_t
*socket
;
37 const char *routing_key
;
39 int64_t connection_timeout
;
40 int pkt_size_overflow
;
45 #define DEFAULT_CHANNEL 1
47 #define OFFSET(x) offsetof(AMQPContext, x)
48 #define D AV_OPT_FLAG_DECODING_PARAM
49 #define E AV_OPT_FLAG_ENCODING_PARAM
50 static const AVOption options
[] = {
51 { "pkt_size", "Maximum send/read packet size", OFFSET(pkt_size
), AV_OPT_TYPE_INT
, { .i64
= 131072 }, 4096, INT_MAX
, .flags
= D
| E
},
52 { "exchange", "Exchange to send/read packets", OFFSET(exchange
), AV_OPT_TYPE_STRING
, { .str
= "amq.direct" }, 0, 0, .flags
= D
| E
},
53 { "routing_key", "Key to filter streams", OFFSET(routing_key
), AV_OPT_TYPE_STRING
, { .str
= "amqp" }, 0, 0, .flags
= D
| E
},
54 { "connection_timeout", "Initial connection timeout", OFFSET(connection_timeout
), AV_OPT_TYPE_DURATION
, { .i64
= -1 }, -1, INT64_MAX
, .flags
= D
| E
},
55 { "delivery_mode", "Delivery mode", OFFSET(delivery_mode
), AV_OPT_TYPE_INT
, { .i64
= AMQP_DELIVERY_PERSISTENT
}, 1, 2, .flags
= E
, .unit
= "delivery_mode"},
56 { "persistent", "Persistent delivery mode", 0, AV_OPT_TYPE_CONST
, { .i64
= AMQP_DELIVERY_PERSISTENT
}, 0, 0, E
, .unit
= "delivery_mode" },
57 { "non-persistent", "Non-persistent delivery mode", 0, AV_OPT_TYPE_CONST
, { .i64
= AMQP_DELIVERY_NONPERSISTENT
}, 0, 0, E
, .unit
= "delivery_mode" },
61 static int amqp_proto_open(URLContext
*h
, const char *uri
, int flags
)
64 char hostname
[STR_LEN
], credentials
[STR_LEN
], path
[STR_LEN
];
66 const char *user
, *password
= NULL
, *vhost
;
67 const char *user_decoded
, *password_decoded
, *vhost_decoded
;
69 amqp_rpc_reply_t broker_reply
;
70 struct timeval tval
= { 0 };
72 AMQPContext
*s
= h
->priv_data
;
75 h
->max_packet_size
= s
->pkt_size
;
77 av_url_split(NULL
, 0, credentials
, sizeof(credentials
),
78 hostname
, sizeof(hostname
), &port
, path
, sizeof(path
), uri
);
83 if (hostname
[0] == '\0' || port
<= 0 || port
> 65535 ) {
84 av_log(h
, AV_LOG_ERROR
, "Invalid hostname/port\n");
85 return AVERROR(EINVAL
);
88 p
= strchr(credentials
, ':');
94 if (!password
|| *password
== '\0')
97 password_decoded
= ff_urldecode(password
, 0);
98 if (!password_decoded
)
99 return AVERROR(ENOMEM
);
105 user_decoded
= ff_urldecode(user
, 0);
107 av_freep(&password_decoded
);
108 return AVERROR(ENOMEM
);
111 /* skip query for now */
112 p
= strchr(path
, '?');
120 vhost
++; /* skip leading '/' */
122 vhost_decoded
= ff_urldecode(vhost
, 0);
123 if (!vhost_decoded
) {
124 av_freep(&user_decoded
);
125 av_freep(&password_decoded
);
126 return AVERROR(ENOMEM
);
129 s
->conn
= amqp_new_connection();
131 av_freep(&vhost_decoded
);
132 av_freep(&user_decoded
);
133 av_freep(&password_decoded
);
134 av_log(h
, AV_LOG_ERROR
, "Error creating connection\n");
135 return AVERROR_EXTERNAL
;
138 s
->socket
= amqp_tcp_socket_new(s
->conn
);
140 av_log(h
, AV_LOG_ERROR
, "Error creating socket\n");
141 goto destroy_connection
;
144 if (s
->connection_timeout
< 0)
145 s
->connection_timeout
= (h
->rw_timeout
> 0 ? h
->rw_timeout
: 5000000);
147 tval
.tv_sec
= s
->connection_timeout
/ 1000000;
148 tval
.tv_usec
= s
->connection_timeout
% 1000000;
149 ret
= amqp_socket_open_noblock(s
->socket
, hostname
, port
, &tval
);
152 av_log(h
, AV_LOG_ERROR
, "Error connecting to server: %s\n",
153 amqp_error_string2(ret
));
154 goto destroy_connection
;
157 broker_reply
= amqp_login(s
->conn
, vhost_decoded
, 0, s
->pkt_size
, 0,
158 AMQP_SASL_METHOD_PLAIN
, user_decoded
, password_decoded
);
160 if (broker_reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
161 av_log(h
, AV_LOG_ERROR
, "Error login\n");
162 server_msg
= AMQP_ACCESS_REFUSED
;
163 goto close_connection
;
166 amqp_channel_open(s
->conn
, DEFAULT_CHANNEL
);
167 broker_reply
= amqp_get_rpc_reply(s
->conn
);
169 if (broker_reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
170 av_log(h
, AV_LOG_ERROR
, "Error set channel\n");
171 server_msg
= AMQP_CHANNEL_ERROR
;
172 goto close_connection
;
175 if (h
->flags
& AVIO_FLAG_READ
) {
176 amqp_bytes_t queuename
;
177 char queuename_buff
[STR_LEN
];
178 amqp_queue_declare_ok_t
*r
;
180 r
= amqp_queue_declare(s
->conn
, DEFAULT_CHANNEL
, amqp_empty_bytes
,
181 0, 0, 0, 1, amqp_empty_table
);
182 broker_reply
= amqp_get_rpc_reply(s
->conn
);
183 if (!r
|| broker_reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
184 av_log(h
, AV_LOG_ERROR
, "Error declare queue\n");
185 server_msg
= AMQP_RESOURCE_ERROR
;
189 /* store queuename */
190 queuename
.bytes
= queuename_buff
;
191 queuename
.len
= FFMIN(r
->queue
.len
, STR_LEN
);
192 memcpy(queuename
.bytes
, r
->queue
.bytes
, queuename
.len
);
194 amqp_queue_bind(s
->conn
, DEFAULT_CHANNEL
, queuename
,
195 amqp_cstring_bytes(s
->exchange
),
196 amqp_cstring_bytes(s
->routing_key
), amqp_empty_table
);
198 broker_reply
= amqp_get_rpc_reply(s
->conn
);
199 if (broker_reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
200 av_log(h
, AV_LOG_ERROR
, "Queue bind error\n");
201 server_msg
= AMQP_INTERNAL_ERROR
;
205 amqp_basic_consume(s
->conn
, DEFAULT_CHANNEL
, queuename
, amqp_empty_bytes
,
206 0, 1, 0, amqp_empty_table
);
208 broker_reply
= amqp_get_rpc_reply(s
->conn
);
209 if (broker_reply
.reply_type
!= AMQP_RESPONSE_NORMAL
) {
210 av_log(h
, AV_LOG_ERROR
, "Set consume error\n");
211 server_msg
= AMQP_INTERNAL_ERROR
;
216 av_freep(&vhost_decoded
);
217 av_freep(&user_decoded
);
218 av_freep(&password_decoded
);
222 amqp_channel_close(s
->conn
, DEFAULT_CHANNEL
, server_msg
);
224 amqp_connection_close(s
->conn
, server_msg
);
226 amqp_destroy_connection(s
->conn
);
228 av_freep(&vhost_decoded
);
229 av_freep(&user_decoded
);
230 av_freep(&password_decoded
);
231 return AVERROR_EXTERNAL
;
234 static int amqp_proto_write(URLContext
*h
, const unsigned char *buf
, int size
)
237 AMQPContext
*s
= h
->priv_data
;
238 int fd
= amqp_socket_get_sockfd(s
->socket
);
240 amqp_bytes_t message
= { size
, (void *)buf
};
241 amqp_basic_properties_t props
;
243 ret
= ff_network_wait_fd_timeout(fd
, 1, h
->rw_timeout
, &h
->interrupt_callback
);
247 props
._flags
= AMQP_BASIC_CONTENT_TYPE_FLAG
| AMQP_BASIC_DELIVERY_MODE_FLAG
;
248 props
.content_type
= amqp_cstring_bytes("octet/stream");
249 props
.delivery_mode
= s
->delivery_mode
;
251 ret
= amqp_basic_publish(s
->conn
, DEFAULT_CHANNEL
, amqp_cstring_bytes(s
->exchange
),
252 amqp_cstring_bytes(s
->routing_key
), 0, 0,
256 av_log(h
, AV_LOG_ERROR
, "Error publish: %s\n", amqp_error_string2(ret
));
257 return AVERROR_EXTERNAL
;
263 static int amqp_proto_read(URLContext
*h
, unsigned char *buf
, int size
)
265 AMQPContext
*s
= h
->priv_data
;
266 int fd
= amqp_socket_get_sockfd(s
->socket
);
269 amqp_rpc_reply_t broker_reply
;
270 amqp_envelope_t envelope
;
272 ret
= ff_network_wait_fd_timeout(fd
, 0, h
->rw_timeout
, &h
->interrupt_callback
);
276 amqp_maybe_release_buffers(s
->conn
);
277 broker_reply
= amqp_consume_message(s
->conn
, &envelope
, NULL
, 0);
279 if (broker_reply
.reply_type
!= AMQP_RESPONSE_NORMAL
)
280 return AVERROR_EXTERNAL
;
282 if (envelope
.message
.body
.len
> size
) {
283 s
->pkt_size_overflow
= FFMAX(s
->pkt_size_overflow
, envelope
.message
.body
.len
);
284 av_log(h
, AV_LOG_WARNING
, "Message exceeds space in the buffer. "
285 "Message will be truncated. Setting -pkt_size %d "
286 "may resolve this issue.\n", s
->pkt_size_overflow
);
288 size
= FFMIN(size
, envelope
.message
.body
.len
);
290 memcpy(buf
, envelope
.message
.body
.bytes
, size
);
291 amqp_destroy_envelope(&envelope
);
296 static int amqp_proto_close(URLContext
*h
)
298 AMQPContext
*s
= h
->priv_data
;
299 amqp_channel_close(s
->conn
, DEFAULT_CHANNEL
, AMQP_REPLY_SUCCESS
);
300 amqp_connection_close(s
->conn
, AMQP_REPLY_SUCCESS
);
301 amqp_destroy_connection(s
->conn
);
306 static const AVClass amqp_context_class
= {
307 .class_name
= "amqp",
308 .item_name
= av_default_item_name
,
310 .version
= LIBAVUTIL_VERSION_INT
,
313 const URLProtocol ff_libamqp_protocol
= {
315 .url_close
= amqp_proto_close
,
316 .url_open
= amqp_proto_open
,
317 .url_read
= amqp_proto_read
,
318 .url_write
= amqp_proto_write
,
319 .priv_data_size
= sizeof(AMQPContext
),
320 .priv_data_class
= &amqp_context_class
,
321 .flags
= URL_PROTOCOL_FLAG_NETWORK
,