dtstream.c File Reference

An implementation of the Frame Streams data transport protocol for the Unbound DNSTAP message logging facility. More...

#include "config.h"
#include "dnstap/dtstream.h"
#include "dnstap/dnstap_fstrm.h"
#include "util/config_file.h"
#include "util/ub_event.h"
#include "util/net_help.h"
#include "services/outside_network.h"
#include "sldns/sbuffer.h"
#include <fcntl.h>

Data Structures

struct  stop_flush_info
 structure to keep track of information during stop flush More...
 

Macros

#define DTIO_MESSAGES_PER_CALLBACK   100
 number of messages to process in one output callback
 
#define DTIO_RECONNECT_TIMEOUT_MIN   10
 the msec to wait for reconnect (if not immediate, the first attempt)
 
#define DTIO_RECONNECT_TIMEOUT_MAX   1000
 the msec to wait for reconnect max after backoff
 
#define DTIO_RECONNECT_TIMEOUT_SLOW   1000
 the msec to wait for reconnect slow, to stop busy spinning on reconnect
 
#define DTIO_MSG_FOR_WAKEUP   32
 number of messages before wakeup of thread
 
#define DTIO_RECV_FRAME_MAX_LEN   1000
 maximum length of received frame
 

Enumerations

enum  { DTIO_COMMAND_STOP = 0 , DTIO_COMMAND_WAKEUP = 1 }
 DTIO command channel commands. More...
 

Functions

static void dtio_open_output (struct dt_io_thread *dtio)
 open the output channel More...
 
static int dtio_add_output_event_write (struct dt_io_thread *dtio)
 add output event for read and write More...
 
static void dtio_reconnect_enable (struct dt_io_thread *dtio)
 start reconnection attempts More...
 
static void dtio_stop_flush_exit (struct stop_flush_info *info)
 stop from stop_flush event loop More...
 
static int dtio_control_start_send (struct dt_io_thread *dtio)
 setup a start control message
 
struct dt_msg_queuedt_msg_queue_create (struct comm_base *base)
 Create new (empty) worker message queue. More...
 
static void dt_msg_queue_clear (struct dt_msg_queue *mq)
 clear the message list, caller must hold the lock
 
void dt_msg_queue_delete (struct dt_msg_queue *mq)
 Delete a worker message queue. More...
 
static void dtio_wakeup (struct dt_io_thread *dtio)
 make the dtio wake up by sending a wakeup command
 
void mq_wakeup_cb (void *arg)
 timer callback to wakeup dtio thread to process messages
 
static void dt_msg_queue_start_timer (struct dt_msg_queue *mq, int wakeupnow)
 start timer to wakeup dtio because there is content in the queue
 
void dt_msg_queue_submit (struct dt_msg_queue *mq, void *buf, size_t len)
 Submit a message to the queue. More...
 
struct dt_io_threaddt_io_thread_create (void)
 Create IO thread. More...
 
void dt_io_thread_delete (struct dt_io_thread *dtio)
 Delete the IO thread structure. More...
 
int dt_io_thread_apply_cfg (struct dt_io_thread *dtio, struct config_file *cfg)
 Apply config to the dtio thread. More...
 
int dt_io_thread_register_queue (struct dt_io_thread *dtio, struct dt_msg_queue *mq)
 Register a msg queue to the io thread. More...
 
void dt_io_thread_unregister_queue (struct dt_io_thread *dtio, struct dt_msg_queue *mq)
 Unregister queue from io thread. More...
 
static int dt_msg_queue_pop (struct dt_msg_queue *mq, void **buf, size_t *len)
 pick a message from the queue, the routine locks and unlocks, returns true if there is a message
 
static int dtio_find_in_queue (struct dt_io_thread *dtio, struct dt_msg_queue *mq)
 find message in queue, false if no message, true if message to send
 
static int dtio_find_msg (struct dt_io_thread *dtio)
 find a new message to write, search message queues, false if none
 
void dtio_reconnect_timeout_cb (int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), void *arg)
 callback for the dnstap reconnect, to start reconnecting to output
 
static void dtio_reconnect_del (struct dt_io_thread *dtio)
 remove dtio reconnect timer
 
static void dtio_reconnect_clear (struct dt_io_thread *dtio)
 clear the reconnect exponential backoff timer. More...
 
static void dtio_reconnect_slow (struct dt_io_thread *dtio, int msec)
 reconnect slowly, because we already know we have to wait for a bit
 
static void dtio_cur_msg_free (struct dt_io_thread *dtio)
 delete the current message in the dtio, and reset counters
 
static void dtio_read_frame_free (struct dt_frame_read_buf *rb)
 delete the buffer and counters used to read frame
 
static void dtio_del_output_event (struct dt_io_thread *dtio)
 del the output file descriptor event for listening
 
static void dtio_close_fd (struct dt_io_thread *dtio)
 close dtio socket and set it to -1
 
static void dtio_close_output (struct dt_io_thread *dtio)
 close and stop the output file descriptor event
 
static int dtio_check_nb_connect (struct dt_io_thread *dtio)
 check for pending nonblocking connect errors, returns 1 if it is okay. More...
 
static int dtio_write_buf (struct dt_io_thread *dtio, uint8_t *buf, size_t len)
 write buffer to output. More...
 
static int dtio_write_more_of_len (struct dt_io_thread *dtio)
 write more of the length, preceding the data frame. More...
 
static int dtio_write_more_of_data (struct dt_io_thread *dtio)
 write more of the data frame. More...
 
static int dtio_write_more (struct dt_io_thread *dtio)
 write more of the current message. More...
 
static ssize_t receive_bytes (struct dt_io_thread *dtio, void *buf, size_t len)
 Receive bytes from dtio->fd, store in buffer. More...
 
static int dtio_check_close (struct dt_io_thread *dtio)
 check if the output fd has been closed, it returns false if the stream is closed.
 
static int dtio_read_accept_frame (struct dt_io_thread *dtio)
 Read accept frame. More...
 
static int dtio_add_output_event_read (struct dt_io_thread *dtio)
 add the output file descriptor event for listening, read only
 
static void dtio_sleep (struct dt_io_thread *dtio)
 put the dtio thread to sleep
 
void dtio_output_cb (int ATTR_UNUSED(fd), short bits, void *arg)
 callback for the dnstap events, to write to the output
 
void dtio_cmd_cb (int fd, short ATTR_UNUSED(bits), void *arg)
 callback for the dnstap commandpipe, to stop the dnstap IO
 
static void dtio_setup_base (struct dt_io_thread *dtio, time_t *secs, struct timeval *now)
 setup the event base for the dnstap io thread
 
static void dtio_setup_cmd (struct dt_io_thread *dtio)
 setup the cmd event for dnstap io
 
static void dtio_setup_reconnect (struct dt_io_thread *dtio)
 setup the reconnect event for dnstap io
 
static int dtio_control_stop_send (struct stop_flush_info *info)
 send the stop control, return true if completed the frame.
 
void dtio_stop_timer_cb (int ATTR_UNUSED(fd), short ATTR_UNUSED(bits), void *arg)
 
void dtio_stop_ev_cb (int ATTR_UNUSED(fd), short bits, void *arg)
 
static void dtio_control_stop_flush (struct dt_io_thread *dtio)
 flush at end, last packet and stop control
 
static void dtio_desetup (struct dt_io_thread *dtio)
 perform desetup and free stuff when the dnstap io thread exits
 
static int dtio_control_ready_send (struct dt_io_thread *dtio)
 setup a ready control message
 
static int dtio_open_output_local (struct dt_io_thread *dtio)
 open the output file descriptor for af_local
 
static int dtio_open_output_tcp (struct dt_io_thread *dtio)
 open the output file descriptor for af_inet and af_inet6
 
static int dtio_setup_ssl (struct dt_io_thread *dtio)
 setup the SSL structure for new connection
 
static void dtio_setup_on_base (struct dt_io_thread *dtio)
 perform the setup of the writer thread on the established event_base
 
static void * dnstap_io (void *arg)
 the IO thread function for the DNSTAP IO
 
int dt_io_thread_start (struct dt_io_thread *dtio, void *event_base_nothr, int numworkers)
 Start the io thread. More...
 
void dt_io_thread_stop (struct dt_io_thread *dtio)
 Stop the io thread. More...
 

Variables

enum { ... }  dtio_channel_command
 DTIO command channel commands.
 

Detailed Description

An implementation of the Frame Streams data transport protocol for the Unbound DNSTAP message logging facility.

Enumeration Type Documentation

◆ anonymous enum

anonymous enum

DTIO command channel commands.

Enumerator
DTIO_COMMAND_STOP 

DTIO command channel stop.

DTIO_COMMAND_WAKEUP 

DTIO command channel wakeup.

Function Documentation

◆ dtio_open_output()

◆ dtio_add_output_event_write()

static int dtio_add_output_event_write ( struct dt_io_thread dtio)
static

add output event for read and write

add the output file descriptor event for listening, read and write

References dtio_close_output(), dt_io_thread::event, dt_io_thread::event_added, dt_io_thread::event_added_is_write, log_err(), UB_EV_WRITE, ub_event_add(), ub_event_add_bits(), and ub_event_del().

Referenced by dtio_setup_on_base().

◆ dtio_reconnect_enable()

◆ dtio_stop_flush_exit()

static void dtio_stop_flush_exit ( struct stop_flush_info info)
static

stop from stop_flush event loop

exit the stop flush base

References stop_flush_info::base, log_err(), ub_event_base_loopexit(), and stop_flush_info::want_to_exit_flush.

Referenced by dtio_control_stop_send().

◆ dt_msg_queue_create()

struct dt_msg_queue* dt_msg_queue_create ( struct comm_base base)

Create new (empty) worker message queue.

Limit set to default on max.

Parameters
baseevent base for wakeup timer.
Returns
NULL on malloc failure or a new queue (not locked).

References comm_timer_create(), dt_msg_queue::lock, dt_msg_queue::maxsize, mq_wakeup_cb(), and dt_msg_queue::wakeup_timer.

◆ dt_msg_queue_delete()

void dt_msg_queue_delete ( struct dt_msg_queue mq)

Delete a worker message queue.

It has to be unlinked from access, so it can be deleted without lock worries. The queue is emptied (deleted).

Parameters
mqmessage queue.

References comm_timer_delete(), dt_msg_queue_clear(), dt_msg_queue::lock, and dt_msg_queue::wakeup_timer.

◆ dt_msg_queue_submit()

void dt_msg_queue_submit ( struct dt_msg_queue mq,
void *  buf,
size_t  len 
)

Submit a message to the queue.

The queue is locked by the routine, the message is inserted, and then the queue is unlocked so the message can be picked up by the writer thread.

Parameters
mqmessage queue.
bufbuffer with message (dnstap contents). The buffer must have been malloced by caller. It is linked in the queue, and is free()d after use. If the routine fails the buffer is freed as well (and nothing happens, the item could not be logged).
lenlength of buffer.

References dt_msg_entry::buf, dt_msg_queue::cursize, dt_msg_queue_start_timer(), dt_msg_queue::dtio, DTIO_MSG_FOR_WAKEUP, dt_io_thread::event_added_is_write, dt_msg_queue::first, dt_msg_entry::len, dt_msg_queue::lock, log_err(), dt_msg_queue::maxsize, dt_msg_queue::msgcount, dt_msg_entry::next, and entry::next.

◆ dt_io_thread_create()

struct dt_io_thread* dt_io_thread_create ( void  )

Create IO thread.

Returns
new io thread object. not yet started. or NULL malloc failure.

References dt_io_thread::wakeup_timer_enabled, and dt_io_thread::wakeup_timer_lock.

◆ dt_io_thread_delete()

void dt_io_thread_delete ( struct dt_io_thread dtio)

◆ dt_io_thread_apply_cfg()

◆ dt_io_thread_register_queue()

int dt_io_thread_register_queue ( struct dt_io_thread dtio,
struct dt_msg_queue mq 
)

Register a msg queue to the io thread.

It will be polled to see if there are messages and those then get removed and sent, when the thread is running.

Parameters
dtiothe io thread.
mqmessage queue to register.
Returns
false on failure (malloc failure).

References dt_msg_queue::dtio, dt_io_thread::io_list, dt_io_thread::io_list_iter, dt_msg_queue::lock, dt_io_list_item::next, and dt_io_list_item::queue.

◆ dt_io_thread_unregister_queue()

void dt_io_thread_unregister_queue ( struct dt_io_thread dtio,
struct dt_msg_queue mq 
)

Unregister queue from io thread.

Parameters
dtiothe io thread.
mqmessage queue.

References dt_msg_queue::dtio, dt_io_thread::io_list, dt_io_thread::io_list_iter, dt_msg_queue::lock, dt_io_list_item::next, and dt_io_list_item::queue.

◆ dtio_reconnect_clear()

static void dtio_reconnect_clear ( struct dt_io_thread dtio)
static

clear the reconnect exponential backoff timer.

We have successfully connected so we can try again with short timeouts.

References dtio_reconnect_del(), and dt_io_thread::reconnect_timeout.

Referenced by dtio_check_nb_connect(), and dtio_setup_reconnect().

◆ dtio_check_nb_connect()

static int dtio_check_nb_connect ( struct dt_io_thread dtio)
static

◆ dtio_write_buf()

static int dtio_write_buf ( struct dt_io_thread dtio,
uint8_t *  buf,
size_t  len 
)
static

write buffer to output.

returns number of bytes written, 0 if nothing happened, try again later, or -1 if the channel is to be closed.

References dt_io_thread::event, dt_io_thread::fd, log_err(), sock_strerror(), dt_io_thread::ssl, dt_io_thread::stop_flush_event, UB_EV_WRITE, and ub_winsock_tcp_wouldblock().

Referenced by dtio_control_stop_send(), dtio_write_more_of_data(), and dtio_write_more_of_len().

◆ dtio_write_more_of_len()

static int dtio_write_more_of_len ( struct dt_io_thread dtio)
static

write more of the length, preceding the data frame.

return true if message is done, false if incomplete.

References dt_io_thread::cur_msg_len, dt_io_thread::cur_msg_len_done, dtio_close_output(), dtio_del_output_event(), dtio_write_buf(), and dt_io_thread::ssl.

Referenced by dtio_write_more().

◆ dtio_write_more_of_data()

static int dtio_write_more_of_data ( struct dt_io_thread dtio)
static

write more of the data frame.

return true if message is done, false if incomplete.

References dt_io_thread::cur_msg, dt_io_thread::cur_msg_done, dt_io_thread::cur_msg_len, dtio_close_output(), dtio_del_output_event(), and dtio_write_buf().

Referenced by dtio_write_more().

◆ dtio_write_more()

static int dtio_write_more ( struct dt_io_thread dtio)
static

write more of the current message.

false if incomplete, true if the message is done

References dt_io_thread::cur_msg_done, dt_io_thread::cur_msg_len, dt_io_thread::cur_msg_len_done, dtio_write_more_of_data(), and dtio_write_more_of_len().

◆ receive_bytes()

static ssize_t receive_bytes ( struct dt_io_thread dtio,
void *  buf,
size_t  len 
)
static

◆ dtio_read_accept_frame()

◆ dt_io_thread_start()

int dt_io_thread_start ( struct dt_io_thread dtio,
void *  event_base_nothr,
int  numworkers 
)

Start the io thread.

Parameters
dtiothe io thread.
event_base_nothrthe event base to attach the events to, in case we are running without threads. With threads, this is ignored and a thread is started to process the dnstap log messages.
numworkersnumber of worker threads. The dnstap io thread is that number +1 as the threadnumber (in logs).
Returns
false on failure.

References dt_io_thread::commandpipe, dnstap_io(), dtio_setup_on_base(), dt_io_thread::event_base, log_err(), dt_io_thread::started, dt_io_thread::threadnum, and dt_io_thread::tid.

◆ dt_io_thread_stop()

void dt_io_thread_stop ( struct dt_io_thread dtio)