vg
tools for working with variation graphs
Public Member Functions | Private Member Functions | Private Attributes | Static Private Attributes | List of all members
vg::io::StreamMultiplexer Class Reference

#include <stream_multiplexer.hpp>

Public Member Functions

 StreamMultiplexer (ostream &backing, size_t max_threads)
 
 ~StreamMultiplexer ()
 
 StreamMultiplexer (const StreamMultiplexer &other)=delete
 
 StreamMultiplexer (StreamMultiplexer &&other)=delete
 
StreamMultiplexeroperator= (const StreamMultiplexer &other)=delete
 
StreamMultiplexeroperator= (StreamMultiplexer &&other)=delete
 
ostream & get_thread_stream (size_t thread_number)
 
void register_breakpoint (size_t thread_number)
 
bool want_breakpoint (size_t thread_number)
 
void register_barrier (size_t thread_number)
 
void discard_to_breakpoint (size_t thread_number)
 
void discard_bytes (size_t thread_number, size_t count)
 

Private Member Functions

bool ring_buffer_full (size_t thread_number) const
 
bool ring_buffer_empty (size_t thread_number) const
 
string & ring_buffer_push (size_t thread_number)
 
const string & ring_buffer_peek (size_t thread_number)
 
void ring_buffer_pop (size_t thread_number)
 
void writer_thread_function ()
 

Private Attributes

ostream & backing_stream
 Remember the backing stream we wrap. More...
 
vector< stringstream > thread_streams
 
vector< size_t > thread_breakpoint_cursors
 
vector< vector< string > > thread_queues
 
vector< size_t > thread_queue_empty_slots
 
vector< size_t > thread_queue_filled_slots
 
vector< size_t > thread_queue_byte_counts
 This tracks the number of bytes of data in each thread's queue. More...
 
vector< mutex > thread_queue_mutexes
 
atomic< bool > writer_stop
 When set to true, cause the writer thread to finish writing all queues and terminate. More...
 
thread writer_thread
 

Static Private Attributes

static const size_t MIN_QUEUE_ITEM_BYTES = 10 * 64 * 1024
 Don't deal with anything smaller than a few BGZF blocks. More...
 
static const size_t RING_BUFFER_SIZE = 10
 What is the number of slots in each queue ring buffer? More...
 

Detailed Description

Tool to allow multiple threads to write to streams that are multiplexed into an output stream, by breaking at allowed points.

Assumes an external source of thread numbering.

Constructor & Destructor Documentation

◆ StreamMultiplexer() [1/3]

vg::io::StreamMultiplexer::StreamMultiplexer ( ostream &  backing,
size_t  max_threads 
)

Make a new StreamMultiplexer sending output to the given output stream.

Needs to know the maximum number of threads that will use the multiplexer.

◆ ~StreamMultiplexer()

vg::io::StreamMultiplexer::~StreamMultiplexer ( )

Clean up and flush a StreamMultiplexer.

Assumes a final breakpoint on all streams.

◆ StreamMultiplexer() [2/3]

vg::io::StreamMultiplexer::StreamMultiplexer ( const StreamMultiplexer other)
delete

◆ StreamMultiplexer() [3/3]

vg::io::StreamMultiplexer::StreamMultiplexer ( StreamMultiplexer &&  other)
delete

Member Function Documentation

◆ discard_bytes()

void vg::io::StreamMultiplexer::discard_bytes ( size_t  thread_number,
size_t  count 
)

Cancel the writing of the last count bytes written since the last breakpoint for the given thread. If count is more than the number of bytes since the last breakpoint, rewinds only to the last breakpoint.

◆ discard_to_breakpoint()

void vg::io::StreamMultiplexer::discard_to_breakpoint ( size_t  thread_number)

Cancel the writing of and discard all data written since the previous breakpoint for the given thread.

◆ get_thread_stream()

ostream & vg::io::StreamMultiplexer::get_thread_stream ( size_t  thread_number)

Get the stream for the thread with the given number to write to. This stream will never change address for a given thread, but the object may be destroyed or recreated in place, or moved out of, during calls to register_breakpoint().

Note that using this function in an "untied" (i.e. not thread bound) OMP task is not supported!

◆ operator=() [1/2]

StreamMultiplexer& vg::io::StreamMultiplexer::operator= ( const StreamMultiplexer other)
delete

◆ operator=() [2/2]

StreamMultiplexer& vg::io::StreamMultiplexer::operator= ( StreamMultiplexer &&  other)
delete

◆ register_barrier()

void vg::io::StreamMultiplexer::register_barrier ( size_t  thread_number)

Send along whatever has been written for the given thread's stream to the output stream. Only returns once any subsequent write by another thread is guaranteed to appear later in the file than what has been written by the given thread so far. Implicitly also creates a breakpoint.

◆ register_breakpoint()

void vg::io::StreamMultiplexer::register_breakpoint ( size_t  thread_number)

This function must be called after batches of writes to the stream from get_thread_stream(), at points where it is legal, given the output format being constructed, to switch in the final output stream from one thread's output to another's.

It may adjust or replace the object that get_thread_stream() returned, but it must leave a stream at the same address.

Takes the thread number of the thread whose output we can break.

◆ ring_buffer_empty()

bool vg::io::StreamMultiplexer::ring_buffer_empty ( size_t  thread_number) const
private

Return if the ring buffer for the given thread is empty. Lock on the thread's ring buffer must be held.

◆ ring_buffer_full()

bool vg::io::StreamMultiplexer::ring_buffer_full ( size_t  thread_number) const
private

Return if the ring buffer for the given thread is full. Lock on the thread's ring buffer must be held.

◆ ring_buffer_peek()

const string & vg::io::StreamMultiplexer::ring_buffer_peek ( size_t  thread_number)
private

Assuming the ring buffer for the given thread is not empty, get a reference to the next thing that would be popped. Lock on the thread's ring buffer must be held.

◆ ring_buffer_pop()

void vg::io::StreamMultiplexer::ring_buffer_pop ( size_t  thread_number)
private

Assuming the ring buffer for the given thread is not empty, remove the thing that is visible via peek. Lock on the thread's ring buffer must be held.

◆ ring_buffer_push()

string & vg::io::StreamMultiplexer::ring_buffer_push ( size_t  thread_number)
private

Assuming the ring buffer for the given thread is not full, mark the next space as occupied and return a reference to it. Lock on the thread's ring buffer must be held.

◆ want_breakpoint()

bool vg::io::StreamMultiplexer::want_breakpoint ( size_t  thread_number)

Check if the multiplexer would like a breakpoint (i.e. has a substantial amount of data been written since the last breakpoint.

Writers can check this and conditionally flush their internal buffers to get to a good interleave-able state at regular intervals.

◆ writer_thread_function()

void vg::io::StreamMultiplexer::writer_thread_function ( )
private

Function which is run as the writer thread. Empties queues as fast as it can.

Lock again and pop. Nobody else could have removed the thing we were working on.

Member Data Documentation

◆ backing_stream

ostream& vg::io::StreamMultiplexer::backing_stream
private

Remember the backing stream we wrap.

◆ MIN_QUEUE_ITEM_BYTES

const size_t vg::io::StreamMultiplexer::MIN_QUEUE_ITEM_BYTES = 10 * 64 * 1024
staticprivate

Don't deal with anything smaller than a few BGZF blocks.

To prevent constant locking and unlocking of the queues, we want each item to be relatively substantial.

◆ RING_BUFFER_SIZE

const size_t vg::io::StreamMultiplexer::RING_BUFFER_SIZE = 10
staticprivate

What is the number of slots in each queue ring buffer?

Don't allow more than a few items per ring buffer.

◆ thread_breakpoint_cursors

vector<size_t> vg::io::StreamMultiplexer::thread_breakpoint_cursors
private

Not every breakpoint results in the stream for a thread being cleared out and enqueued. We only actually use a breakpoint if we have enough data in the stream. But we still have to support discard_to_breakpoint. So we keep a cursor for where the most recent breakpoint was. The actual current position in each stream is tracked by the put pointer (seekp()/tellp()).

◆ thread_queue_byte_counts

vector<size_t> vg::io::StreamMultiplexer::thread_queue_byte_counts
private

This tracks the number of bytes of data in each thread's queue.

◆ thread_queue_empty_slots

vector<size_t> vg::io::StreamMultiplexer::thread_queue_empty_slots
private

This is the number of the next slot in the ring buffer whose data can be overwritten.

◆ thread_queue_filled_slots

vector<size_t> vg::io::StreamMultiplexer::thread_queue_filled_slots
private

This is the number of the last used slot in the ring buffer. If it is equal to thread_queue_empty, no slots are used.

◆ thread_queue_mutexes

vector<mutex> vg::io::StreamMultiplexer::thread_queue_mutexes
private

Access to each thread's queue and byte count is controlled by a mutex. The mutex only has to be held long enough to a little moving, and can only ever be contended between two threads.

◆ thread_queues

vector<vector<string> > vg::io::StreamMultiplexer::thread_queues
private

When a thread reaches a breakpoint and its stringstream is big enough, its string data is copied into this queue at the back, and the stream is emptied.

We use a ring buffer, so that the writer thread never needs to deallocate anything. To prevent ambiguity, the ring buffer always contains at least 1 empty slot.

◆ thread_streams

vector<stringstream> vg::io::StreamMultiplexer::thread_streams
private

Each thread gets a slot in this vector for a stringstream it is supposed to be currently writing to.

◆ writer_stop

atomic<bool> vg::io::StreamMultiplexer::writer_stop
private

When set to true, cause the writer thread to finish writing all queues and terminate.

◆ writer_thread

thread vg::io::StreamMultiplexer::writer_thread
private

This thread is responsible for servicing all the queues and dumping the bytes to the real backing stream.


The documentation for this class was generated from the following files: