vg
tools for working with variation graphs
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
vg::StreamSorter< Message > Class Template Reference

#include <stream_sorter.hpp>

Inheritance diagram for vg::StreamSorter< Message >:
vg::Progressive

Public Member Functions

 StreamSorter (bool show_progress=false)
 
void stream_sort (istream &stream_in, ostream &stream_out, StreamIndex< Message > *index_to=nullptr)
 
void easy_sort (istream &stream_in, ostream &stream_out, StreamIndex< Message > *index_to=nullptr)
 
void sort (vector< Message > &msgs) const
 Sort a vector of messages, in place. More...
 
bool less_than (const Message &a, const Message &b) const
 Return true if out of Messages a and b, a must come before b, and false otherwise. More...
 
Position get_min_position (const Message &msg) const
 
bool less_than (const Position &a, const Position &b) const
 
- Public Member Functions inherited from vg::Progressive
void preload_progress (const string &message)
 
void create_progress (const string &message, long count)
 
void create_progress (long count)
 
void update_progress (long i)
 
void increment_progress ()
 
void destroy_progress (void)
 

Private Types

using cursor_t = vg::io::ProtobufIterator< Message >
 
using emitter_t = vg::io::ProtobufEmitter< Message >
 

Private Member Functions

void open_all (const vector< string > &filenames, list< ifstream > &streams, list< cursor_t > &cursors)
 
void streaming_merge (list< cursor_t > &cursors, emitter_t &emitter, size_t expected_messages=0)
 
vector< string > streaming_merge (const vector< string > &temp_names_in, unordered_map< string, size_t > *messages_per_file=nullptr)
 

Private Attributes

size_t max_buf_size = (512 * 1024 * 1024)
 
size_t max_fan_in
 

Additional Inherited Members

- Public Attributes inherited from vg::Progressive
bool show_progress = false
 

Detailed Description

template<typename Message>
class vg::StreamSorter< Message >

We define a PositionScanner that scans any VG Protobuf message that has Positions or node IDs in it and emits the Positions as is and the node IDs wrapped in Positions. This lets us use one sorting implementation on Positions even with things like Graphs where non-Position node IDs are important. Provides the ability to sort a stream of Protobuf Messages, either "dumbly" (in memory), or streaming into temporary files. For Alignments, paired Alignments are not necessarily going to end up next to each other, so if sorting by position make sure to set the position cross-references first if you want to be able to find them.

Member Typedef Documentation

◆ cursor_t

template<typename Message >
using vg::StreamSorter< Message >::cursor_t = vg::io::ProtobufIterator<Message>
private

◆ emitter_t

template<typename Message >
using vg::StreamSorter< Message >::emitter_t = vg::io::ProtobufEmitter<Message>
private

Constructor & Destructor Documentation

◆ StreamSorter()

template<typename Message >
vg::StreamSorter< Message >::StreamSorter ( bool  show_progress = false)

Create a stream sorter, showing sort progress on standard error if show_progress is true.

Member Function Documentation

◆ easy_sort()

template<typename Message >
void vg::StreamSorter< Message >::easy_sort ( istream &  stream_in,
ostream &  stream_out,
StreamIndex< Message > *  index_to = nullptr 
)

Sort a stream of VPKG-format Protobuf data, loading it all into memory and doing a single giant sort operation. Optionally index the sorted file into the given index.

◆ get_min_position()

template<typename Message >
Position vg::StreamSorter< Message >::get_min_position ( const Message &  msg) const

Determine the minumum Position visited by an Message. The minumum Position is the lowest node ID visited by the message, with the lowest offset visited on that node ID as the offset, and the orientation set to false if the forward strand is visited, and true if only the reverse strand is visited.

◆ less_than() [1/2]

template<typename Message >
bool vg::StreamSorter< Message >::less_than ( const Message &  a,
const Message &  b 
) const

Return true if out of Messages a and b, a must come before b, and false otherwise.

◆ less_than() [2/2]

template<typename Message >
bool vg::StreamSorter< Message >::less_than ( const Position a,
const Position b 
) const

Return True if position A is less than position B in our sort, and false otherwise. Position order is defined first by node ID, then by strand (forward first), and then by offset within the strand. We can't sort by actual base on the forward strand, because we need to be able to sort without knowing the graph's node lengths.

◆ open_all()

template<typename Message >
void vg::StreamSorter< Message >::open_all ( const vector< string > &  filenames,
list< ifstream > &  streams,
list< cursor_t > &  cursors 
)
private

Open all the given input files, keeping the streams and cursors in the given lists. We use lists because none of these should be allowed to move after creation.

◆ sort()

template<typename Message >
void vg::StreamSorter< Message >::sort ( vector< Message > &  msgs) const

Sort a vector of messages, in place.

◆ stream_sort()

template<typename Message >
void vg::StreamSorter< Message >::stream_sort ( istream &  stream_in,
ostream &  stream_out,
StreamIndex< Message > *  index_to = nullptr 
)

Sort a stream of VPKG-format Protobuf data, using temporary files, limiting the number of simultaneously open input files and the size of in-memory data. Optionally index the sorted file into the given index.

◆ streaming_merge() [1/2]

template<typename Message >
vector< string > vg::StreamSorter< Message >::streaming_merge ( const vector< string > &  temp_names_in,
unordered_map< string, size_t > *  messages_per_file = nullptr 
)
private

Merge all the given temp input files into one or more temp output files, opening no more than max_fan_in input files at a time. The input files, which must be from temp_file::create(), will be deleted.

If messages_per_file is specified, it will be used to show progress bars, and will be updated for newly-created files.

◆ streaming_merge() [2/2]

template<typename Message >
void vg::StreamSorter< Message >::streaming_merge ( list< cursor_t > &  cursors,
emitter_t emitter,
size_t  expected_messages = 0 
)
private

Merge all the messages from the given list of cursors into the given emitter. The total expected number of messages can be passed for progress bar purposes.

Member Data Documentation

◆ max_buf_size

template<typename Message >
size_t vg::StreamSorter< Message >::max_buf_size = (512 * 1024 * 1024)
private

What's the maximum size of messages in serialized, uncompressed bytes to load into memory for a single temp file chunk, during the streaming sort? For reference, a whole-genome GAM file is about 500 GB of uncompressed data

◆ max_fan_in

template<typename Message >
size_t vg::StreamSorter< Message >::max_fan_in
private

What's the max fan-in when combining temp files, during the streaming sort? This will be computed based on the max file descriptor limit from the OS.


The documentation for this class was generated from the following file: