An implementation of the Frame Streams data transport protocol for the Unbound DNSTAP message logging facility. More...
#include "config.h"
#include "dnstap/dtstream.h"
#include "dnstap/dnstap_fstrm.h"
#include "util/config_file.h"
#include "util/ub_event.h"
#include "util/net_help.h"
#include "services/outside_network.h"
#include "sldns/sbuffer.h"
#include <fcntl.h>
Data Structures | |
struct | stop_flush_info |
structure to keep track of information during stop flush More... | |
Macros | |
#define | DTIO_MESSAGES_PER_CALLBACK 100 |
number of messages to process in one output callback | |
#define | DTIO_RECONNECT_TIMEOUT_MIN 10 |
the msec to wait for reconnect (if not immediate, the first attempt) | |
#define | DTIO_RECONNECT_TIMEOUT_MAX 1000 |
the msec to wait for reconnect max after backoff | |
#define | DTIO_RECONNECT_TIMEOUT_SLOW 1000 |
the msec to wait for reconnect slow, to stop busy spinning on reconnect | |
#define | DTIO_MSG_FOR_WAKEUP 32 |
number of messages before wakeup of thread | |
#define | DTIO_RECV_FRAME_MAX_LEN 1000 |
maximum length of received frame | |
Enumerations | |
enum | { DTIO_COMMAND_STOP = 0 , DTIO_COMMAND_WAKEUP = 1 } |
DTIO command channel commands. More... | |
Functions | |
static void | dtio_open_output (struct dt_io_thread *dtio) |
open the output channel More... | |
static int | dtio_add_output_event_write (struct dt_io_thread *dtio) |
add output event for read and write More... | |
static void | dtio_reconnect_enable (struct dt_io_thread *dtio) |
start reconnection attempts More... | |
static void | dtio_stop_flush_exit (struct stop_flush_info *info) |
stop from stop_flush event loop More... | |
static int | dtio_control_start_send (struct dt_io_thread *dtio) |
setup a start control message | |
struct dt_msg_queue * | dt_msg_queue_create (struct comm_base *base) |
Create new (empty) worker message queue. More... | |
static void | dt_msg_queue_clear (struct dt_msg_queue *mq) |
clear the message list, caller must hold the lock | |
void | dt_msg_queue_delete (struct dt_msg_queue *mq) |
Delete a worker message queue. More... | |
static void | dtio_wakeup (struct dt_io_thread *dtio) |
make the dtio wake up by sending a wakeup command | |
void | mq_wakeup_cb (void *arg) |
timer callback to wakeup dtio thread to process messages | |
static void | dt_msg_queue_start_timer (struct dt_msg_queue *mq, int wakeupnow) |
start timer to wakeup dtio because there is content in the queue | |
void | dt_msg_queue_submit (struct dt_msg_queue *mq, void *buf, size_t len) |
Submit a message to the queue. More... | |
struct dt_io_thread * | dt_io_thread_create (void) |
Create IO thread. More... | |
void | dt_io_thread_delete (struct dt_io_thread *dtio) |
Delete the IO thread structure. More... | |
int | dt_io_thread_apply_cfg (struct dt_io_thread *dtio, struct config_file *cfg) |
Apply config to the dtio thread. More... | |
int | dt_io_thread_register_queue (struct dt_io_thread *dtio, struct dt_msg_queue *mq) |
Register a msg queue to the io thread. More... | |
void | dt_io_thread_unregister_queue (struct dt_io_thread *dtio, struct dt_msg_queue *mq) |
Unregister queue from io thread. More... | |
static int | dt_msg_queue_pop (struct dt_msg_queue *mq, void **buf, size_t *len) |
pick a message from the queue, the routine locks and unlocks, returns true if there is a message | |
static int | dtio_find_in_queue (struct dt_io_thread *dtio, struct dt_msg_queue *mq) |
find message in queue, false if no message, true if message to send | |
static int | dtio_find_msg (struct dt_io_thread *dtio) |
find a new message to write, search message queues, false if none | |
void | dtio_reconnect_timeout_cb (int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), void *arg) |
callback for the dnstap reconnect, to start reconnecting to output | |
static void | dtio_reconnect_del (struct dt_io_thread *dtio) |
remove dtio reconnect timer | |
static void | dtio_reconnect_clear (struct dt_io_thread *dtio) |
clear the reconnect exponential backoff timer. More... | |
static void | dtio_reconnect_slow (struct dt_io_thread *dtio, int msec) |
reconnect slowly, because we already know we have to wait for a bit | |
static void | dtio_cur_msg_free (struct dt_io_thread *dtio) |
delete the current message in the dtio, and reset counters | |
static void | dtio_read_frame_free (struct dt_frame_read_buf *rb) |
delete the buffer and counters used to read frame | |
static void | dtio_del_output_event (struct dt_io_thread *dtio) |
del the output file descriptor event for listening | |
static void | dtio_close_fd (struct dt_io_thread *dtio) |
close dtio socket and set it to -1 | |
static void | dtio_close_output (struct dt_io_thread *dtio) |
close and stop the output file descriptor event | |
static int | dtio_check_nb_connect (struct dt_io_thread *dtio) |
check for pending nonblocking connect errors, returns 1 if it is okay. More... | |
static int | dtio_write_buf (struct dt_io_thread *dtio, uint8_t *buf, size_t len) |
write buffer to output. More... | |
static int | dtio_write_more_of_len (struct dt_io_thread *dtio) |
write more of the length, preceding the data frame. More... | |
static int | dtio_write_more_of_data (struct dt_io_thread *dtio) |
write more of the data frame. More... | |
static int | dtio_write_more (struct dt_io_thread *dtio) |
write more of the current message. More... | |
static ssize_t | receive_bytes (struct dt_io_thread *dtio, void *buf, size_t len) |
Receive bytes from dtio->fd, store in buffer. More... | |
static int | dtio_check_close (struct dt_io_thread *dtio) |
check if the output fd has been closed, it returns false if the stream is closed. | |
static int | dtio_read_accept_frame (struct dt_io_thread *dtio) |
Read accept frame. More... | |
static int | dtio_add_output_event_read (struct dt_io_thread *dtio) |
add the output file descriptor event for listening, read only | |
static void | dtio_sleep (struct dt_io_thread *dtio) |
put the dtio thread to sleep | |
void | dtio_output_cb (int ATTR_UNUSED(fd), short bits, void *arg) |
callback for the dnstap events, to write to the output | |
void | dtio_cmd_cb (int fd, short ATTR_UNUSED(bits), void *arg) |
callback for the dnstap commandpipe, to stop the dnstap IO | |
static void | dtio_setup_base (struct dt_io_thread *dtio, time_t *secs, struct timeval *now) |
setup the event base for the dnstap io thread | |
static void | dtio_setup_cmd (struct dt_io_thread *dtio) |
setup the cmd event for dnstap io | |
static void | dtio_setup_reconnect (struct dt_io_thread *dtio) |
setup the reconnect event for dnstap io | |
static int | dtio_control_stop_send (struct stop_flush_info *info) |
send the stop control, return true if completed the frame. | |
void | dtio_stop_timer_cb (int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), void *arg) |
void | dtio_stop_ev_cb (int ATTR_UNUSED(fd), short bits, void *arg) |
static void | dtio_control_stop_flush (struct dt_io_thread *dtio) |
flush at end, last packet and stop control | |
static void | dtio_desetup (struct dt_io_thread *dtio) |
perform desetup and free stuff when the dnstap io thread exits | |
static int | dtio_control_ready_send (struct dt_io_thread *dtio) |
setup a ready control message | |
static int | dtio_open_output_local (struct dt_io_thread *dtio) |
open the output file descriptor for af_local | |
static int | dtio_open_output_tcp (struct dt_io_thread *dtio) |
open the output file descriptor for af_inet and af_inet6 | |
static int | dtio_setup_ssl (struct dt_io_thread *dtio) |
setup the SSL structure for new connection | |
static void | dtio_setup_on_base (struct dt_io_thread *dtio) |
perform the setup of the writer thread on the established event_base | |
static void * | dnstap_io (void *arg) |
the IO thread function for the DNSTAP IO | |
int | dt_io_thread_start (struct dt_io_thread *dtio, void *event_base_nothr, int numworkers) |
Start the io thread. More... | |
void | dt_io_thread_stop (struct dt_io_thread *dtio) |
Stop the io thread. More... | |
Variables | |
enum { ... } | dtio_channel_command |
DTIO command channel commands. | |
An implementation of the Frame Streams data transport protocol for the Unbound DNSTAP message logging facility.
anonymous enum |
|
static |
open the output channel
open the output file descriptor
References dt_io_thread::check_nb_connect, dtio_close_fd(), dtio_control_ready_send(), dtio_control_start_send(), dtio_open_output_local(), dtio_open_output_tcp(), dtio_output_cb(), dtio_reconnect_enable(), dtio_setup_ssl(), dt_io_thread::event, dt_io_thread::event_base, dt_io_thread::fd, dt_io_thread::is_bidirectional, log_err(), dt_io_thread::ssl, UB_EV_PERSIST, UB_EV_READ, UB_EV_WRITE, ub_event_free(), ub_event_new(), dt_io_thread::upstream_is_tcp, dt_io_thread::upstream_is_tls, and dt_io_thread::upstream_is_unix.
Referenced by dtio_setup_on_base().
|
static |
add output event for read and write
add the output file descriptor event for listening, read and write
References dtio_close_output(), dt_io_thread::event, dt_io_thread::event_added, dt_io_thread::event_added_is_write, log_err(), UB_EV_WRITE, ub_event_add(), ub_event_add_bits(), and ub_event_del().
Referenced by dtio_setup_on_base().
|
static |
start reconnection attempts
attempt to reconnect to the output, after a timeout
References dtio_reconnect_timeout_cb(), DTIO_RECONNECT_TIMEOUT_MAX, DTIO_RECONNECT_TIMEOUT_MIN, dt_io_thread::event_base, log_err(), dt_io_thread::reconnect_is_added, dt_io_thread::reconnect_timeout, dt_io_thread::reconnect_timer, ub_timer_add(), VERB_ALGO, verbose(), and dt_io_thread::want_to_exit.
Referenced by dtio_close_output(), dtio_open_output(), and dtio_reconnect_slow().
|
static |
stop from stop_flush event loop
exit the stop flush base
References stop_flush_info::base, log_err(), ub_event_base_loopexit(), and stop_flush_info::want_to_exit_flush.
Referenced by dtio_control_stop_send().
struct dt_msg_queue* dt_msg_queue_create | ( | struct comm_base * | base | ) |
Create new (empty) worker message queue.
Limit set to default on max.
base | event base for wakeup timer. |
References comm_timer_create(), dt_msg_queue::lock, dt_msg_queue::maxsize, mq_wakeup_cb(), and dt_msg_queue::wakeup_timer.
void dt_msg_queue_delete | ( | struct dt_msg_queue * | mq | ) |
Delete a worker message queue.
It has to be unlinked from access, so it can be deleted without lock worries. The queue is emptied (deleted).
mq | message queue. |
References comm_timer_delete(), dt_msg_queue_clear(), dt_msg_queue::lock, and dt_msg_queue::wakeup_timer.
void dt_msg_queue_submit | ( | struct dt_msg_queue * | mq, |
void * | buf, | ||
size_t | len | ||
) |
Submit a message to the queue.
The queue is locked by the routine, the message is inserted, and then the queue is unlocked so the message can be picked up by the writer thread.
mq | message queue. |
buf | buffer with message (dnstap contents). The buffer must have been malloced by caller. It is linked in the queue, and is free()d after use. If the routine fails the buffer is freed as well (and nothing happens, the item could not be logged). |
len | length of buffer. |
References dt_msg_entry::buf, dt_msg_queue::cursize, dt_msg_queue_start_timer(), dt_msg_queue::dtio, DTIO_MSG_FOR_WAKEUP, dt_io_thread::event_added_is_write, dt_msg_queue::first, dt_msg_entry::len, dt_msg_queue::lock, log_err(), dt_msg_queue::maxsize, dt_msg_queue::msgcount, dt_msg_entry::next, and entry::next.
struct dt_io_thread* dt_io_thread_create | ( | void | ) |
Create IO thread.
References dt_io_thread::wakeup_timer_enabled, and dt_io_thread::wakeup_timer_lock.
void dt_io_thread_delete | ( | struct dt_io_thread * | dtio | ) |
Delete the IO thread structure.
dtio | the io thread that is deleted. It must not be running. |
References dt_io_thread::client_cert_file, dt_io_thread::client_key_file, dt_io_thread::io_list, dt_io_thread::ip_str, dt_io_list_item::next, dt_io_thread::socket_path, dt_io_thread::ssl_ctx, dt_io_thread::tls_server_name, and dt_io_thread::wakeup_timer_lock.
int dt_io_thread_apply_cfg | ( | struct dt_io_thread * | dtio, |
struct config_file * | cfg | ||
) |
Apply config to the dtio thread.
dtio | io thread, not yet started. |
cfg | config file struct. |
References check_auth_name_for_ssl(), config_file::chrootdir, dt_io_thread::client_cert_file, dt_io_thread::client_key_file, connect_sslctx_create(), config_file::dnstap, config_file::dnstap_bidirectional, config_file::dnstap_ip, config_file::dnstap_socket_path, config_file::dnstap_tls, config_file::dnstap_tls_cert_bundle, config_file::dnstap_tls_client_cert_file, config_file::dnstap_tls_client_key_file, config_file::dnstap_tls_server_name, dt_io_thread::ip_str, dt_io_thread::is_bidirectional, log_err(), log_warn(), dt_io_thread::socket_path, dt_io_thread::ssl_ctx, config_file::tls_cert_bundle, dt_io_thread::tls_server_name, dt_io_thread::tls_use_sni, config_file::tls_use_sni, config_file::tls_win_cert, dt_io_thread::upstream_is_tcp, dt_io_thread::upstream_is_tls, dt_io_thread::upstream_is_unix, and dt_io_thread::use_client_certs.
int dt_io_thread_register_queue | ( | struct dt_io_thread * | dtio, |
struct dt_msg_queue * | mq | ||
) |
Register a msg queue to the io thread.
It will be polled to see if there are messages and those then get removed and sent, when the thread is running.
dtio | the io thread. |
mq | message queue to register. |
References dt_msg_queue::dtio, dt_io_thread::io_list, dt_io_thread::io_list_iter, dt_msg_queue::lock, dt_io_list_item::next, and dt_io_list_item::queue.
void dt_io_thread_unregister_queue | ( | struct dt_io_thread * | dtio, |
struct dt_msg_queue * | mq | ||
) |
Unregister queue from io thread.
dtio | the io thread. |
mq | message queue. |
References dt_msg_queue::dtio, dt_io_thread::io_list, dt_io_thread::io_list_iter, dt_msg_queue::lock, dt_io_list_item::next, and dt_io_list_item::queue.
|
static |
clear the reconnect exponential backoff timer.
We have successfully connected so we can try again with short timeouts.
References dtio_reconnect_del(), and dt_io_thread::reconnect_timeout.
Referenced by dtio_check_nb_connect(), and dtio_setup_reconnect().
|
static |
check for pending nonblocking connect errors, returns 1 if it is okay.
-1 on error (close it), 0 to try later
References dt_io_thread::check_nb_connect, dtio_reconnect_clear(), error(), dt_io_thread::event, dt_io_thread::fd, dt_io_thread::ip_str, log_err(), sock_strerror(), dt_io_thread::socket_path, dt_io_thread::stop_flush_event, UB_EV_WRITE, ub_winsock_tcp_wouldblock(), VERB_DETAIL, and verbose().
|
static |
write buffer to output.
returns number of bytes written, 0 if nothing happened, try again later, or -1 if the channel is to be closed.
References dt_io_thread::event, dt_io_thread::fd, log_err(), sock_strerror(), dt_io_thread::ssl, dt_io_thread::stop_flush_event, UB_EV_WRITE, and ub_winsock_tcp_wouldblock().
Referenced by dtio_control_stop_send(), dtio_write_more_of_data(), and dtio_write_more_of_len().
|
static |
write more of the length, preceding the data frame.
return true if message is done, false if incomplete.
References dt_io_thread::cur_msg_len, dt_io_thread::cur_msg_len_done, dtio_close_output(), dtio_del_output_event(), dtio_write_buf(), and dt_io_thread::ssl.
Referenced by dtio_write_more().
|
static |
write more of the data frame.
return true if message is done, false if incomplete.
References dt_io_thread::cur_msg, dt_io_thread::cur_msg_done, dt_io_thread::cur_msg_len, dtio_close_output(), dtio_del_output_event(), and dtio_write_buf().
Referenced by dtio_write_more().
|
static |
write more of the current message.
false if incomplete, true if the message is done
References dt_io_thread::cur_msg_done, dt_io_thread::cur_msg_len, dt_io_thread::cur_msg_len_done, dtio_write_more_of_data(), and dtio_write_more_of_len().
|
static |
Receive bytes from dtio->fd, store in buffer.
Returns 0: closed, -1: continue, >0: number of bytes read into buffer
References DTIO_RECONNECT_TIMEOUT_MIN, dt_io_thread::event, dt_io_thread::fd, dt_io_thread::ip_str, log_err(), MSG_DONTWAIT, dt_io_thread::reconnect_timeout, dt_io_thread::socket_path, dt_io_thread::stop_flush_event, UB_EV_READ, ub_winsock_tcp_wouldblock(), VERB_DETAIL, verbose(), and verbosity.
Referenced by dtio_check_close(), and dtio_read_accept_frame().
|
static |
Read accept frame.
Returns -1: continue reading, 0: closed, 1: valid accept received.
References dt_frame_read_buf::buf, dt_frame_read_buf::buf_cap, dt_frame_read_buf::buf_count, dt_frame_read_buf::control_frame, DTIO_RECV_FRAME_MAX_LEN, dt_frame_read_buf::frame_len, dt_frame_read_buf::frame_len_done, log_err(), dt_io_thread::read_frame, receive_bytes(), dt_io_thread::ssl, VERB_OPS, and verbose().
int dt_io_thread_start | ( | struct dt_io_thread * | dtio, |
void * | event_base_nothr, | ||
int | numworkers | ||
) |
Start the io thread.
dtio | the io thread. |
event_base_nothr | the event base to attach the events to, in case we are running without threads. With threads, this is ignored and a thread is started to process the dnstap log messages. |
numworkers | number of worker threads. The dnstap io thread is that number +1 as the threadnumber (in logs). |
References dt_io_thread::commandpipe, dnstap_io(), dtio_setup_on_base(), dt_io_thread::event_base, log_err(), dt_io_thread::started, dt_io_thread::threadnum, and dt_io_thread::tid.
void dt_io_thread_stop | ( | struct dt_io_thread * | dtio | ) |
Stop the io thread.
dtio | the io thread. |
References dt_io_thread::commandpipe, DTIO_COMMAND_STOP, dtio_desetup(), log_err(), sock_strerror(), dt_io_thread::started, dt_io_thread::tid, VERB_ALGO, verbose(), and dt_io_thread::want_to_exit.
Referenced by worker_delete().