use strict;
use Time::HiRes qw(gettimeofday tv_interval sleep);
my $Start_time = [gettimeofday];
use File::Temp;
use Term::ANSIScreen qw|:color :cursor :screen|;
use Win32::Console::ANSI qw|Cursor|;
use threads;
use threads::shared;
use Thread::Queue;
use constant FATAL => "\aFATAL ERROR: ";
use vars qw|$RecordPtrLen|;
#===== Copyright 2009, Webpraxis Consulting Ltd. - ALL RIGHTS RESERVED - Email: webpraxis@gmail.com ============================
# merge-sort_threaded.pl: Stable threaded merge sort of a text file with tab separated values.
#===============================================================================================================================
#           Usage : perl merge-sort_threaded.pl INFILE OUTFILE MAXRECORDS
#       Arguments : INFILE     = path of the data text file to be sorted
#                   OUTFILE    = path of sorted data file
#                   MAXRECORDS = maximum number of records per initial file
#     Input Files : See Arguments.
#    Output Files : See Arguments.
# Temporary Files : Filename templates are mergeXXXXX.dat
#         Remarks : See http://www.webpraxis.ab.ca/merge/merge-sort_threaded.shtml for details.
#         History : v1.0.0 - February 26, 2009 - Original release.
#===============================================================================================================================
# 0) DEFINE SORT CHARACTERISTICS:
my $MakeCompositeKey	= sub{	my $record		= shift;								#anonymous sub for creating a composite key
								my $recordPtr	= shift;
								sprintf( "%5.5d\t%s", ( split /\t/, $record )[0,8] ) .
								sprintf( "\t%${RecordPtrLen}.${RecordPtrLen}lu", $recordPtr );
						     };
#-------------------------------------------------------------------------------------------------------------------------------
# 1) INITIALIZE:
$| = 1; 																				#set STDOUT buffer to auto-flush
cls();																					#clear screen
print colored ['black on white'], "$0\n\n\n", color 'reset';							#display program name

my $InFile			= shift || die FATAL, 'Input file not specified';					#get path of input file
my $OutFile			= shift || die FATAL, 'Output file not specified';					#get path of output file
my $MaxRecordCount	= shift || die FATAL, 'Max number of records not specified';		#maximum number of records per initial file
$RecordPtrLen		= length( -s $InFile );												#establish maximum length of a record pointer

print " Problem Details:\n\n", 															#echo problem details
	  "\t Input File: $InFile\n",
	  "\tOutput File: $OutFile\n\n",
	  '-' x 80, "\n\n";
#-------------------------------------------------------------------------------------------------------------------------------
# 2) CREATE FILE QUEUE & LAUNCH THREAD FOR MERGING:
my $CursorY		:shared;																#cursor row coordinate
my $KeysCreated :shared;																#boolean flag indicating all keys created
my $FileQueue	= Thread::Queue->new();													#create file queue of record keys
my $MergeThread	= threads->create( \&merge );											#launch thread for merging keys in sorted order
#-------------------------------------------------------------------------------------------------------------------------------
# 3) CREATE FILES OF COMPOSITE RECORD KEYS AND POPULATE MERGE FILE QUEUE:
my $recordStart = 0;																	#init file pointer for start of a data record
my $recordCount;																		#record counter
my $noFiles;
my @keys;																				#array of composite record keys

print " Process Details:\n\n";															#inform user of processing start
$CursorY = ( Cursor() )[1];  															#record cursor row position
print	locate( $CursorY,   3 ), 'File Queue: ',
		locate( $CursorY+2, 3 ), '      Main: ',
		locate( $CursorY+4, 3 ), '    Thread: ', colored['bold white'], 'Enqueued ',
		locate( $CursorY+5, 3 ), '            ', colored['bold white'], 'Dequeued ',
		locate( $CursorY+6, 3 ), '            ', colored['bold white'], 'Dequeued ';

open( INFILE, $InFile ) || die FATAL, "Can't open data file '$InFile' - $!";			#open the data file for read
while( my $record = <INFILE> ) {														#repeat for each input data record
	push @keys,	&$MakeCompositeKey( $record, $recordStart );							# create & store composite key
	$recordStart = tell;																# store start of next record

	next unless ( ++$recordCount == $MaxRecordCount ) or eof;							# next record unless enough records or end of file
	my $tmpKeys		= new File::Temp(	TEMPLATE	=> 'mergeXXXXX',					# create temp file for the keys
										DIR			=> $ENV{TEMP},
										SUFFIX		=> '.dat',
										UNLINK		=> 0
									)
						|| die FATAL, "Can't create temporary file in Directory '$ENV{TEMP}' - $!";
	print( $tmpKeys $keys[$_], "\n" ) for sort { $keys[$a] cmp $keys[$b] } 0..$#keys;	# output lexically ascending composite keys
	close( $tmpKeys );																	# close temp file for the keys
	++$noFiles;																			# update file counter
	$FileQueue->enqueue( $tmpKeys->filename );											# add keys file to file queue
	print	locate( $CursorY,   15 ), clline,											# inform user
			colored ['bold yellow'], $FileQueue->pending,
			locate( $CursorY+2, 15 ), clline,
			colored['bold white'], 'Enqueued ',
			colored ['bold green'], "$tmpKeys";
	@keys			= ();																# clear array for the keys
	$recordCount	= 0;																# reset record count
}																						#until all input records processed
die locate( $CursorY+8, 3 ),															#abort if not enough files
    FATAL, "Specified max number of records too large!" unless $noFiles > 1;
$KeysCreated = 1;																		#flag end of keys creation
#-------------------------------------------------------------------------------------------------------------------------------
# 4) READ SORTED KEYS & OUTPUT CORRESPONDING DATA RECORDS:
print	locate( $CursorY+2, 15 ), clline,												#inform user
		colored ['bold red'], 'Waiting...';
my $sortedKeysFile = $MergeThread->join();												#get filename of resulting sorted keys
print	locate( $CursorY+2, 15 ), clline,												#inform user
		colored ['bold red'], "Creating sorted output file...";
open( OUTFILE, ">$OutFile" )															#open output file for write
 || die FATAL, "Can't create output file '$OutFile' - $!";
open( KEYS, $sortedKeysFile )															#open keys file for read
 || die FATAL, "Can't read last temporary file '$sortedKeysFile' - $!";
while( <KEYS> ) {																		#repeat for each key
	seek INFILE, ( split /\t/ )[2], 0;													# position file pointer for INFILE
	print OUTFILE scalar <INFILE>;														# output data record
}																						#until all keys processed
close( OUTFILE );																		#close sorted data file
close( INFILE );																		#close input data file
close( KEYS );																			#close keys file
unlink $sortedKeysFile;																	#delete keys file
print	locate( $CursorY+2, 15 ), clline,												#inform user
		colored ['bold red'], "\aDone!",
		locate( $CursorY+8, 3 ),
		colored ['bold white'], 
		"Elapsed time: ", tv_interval( $Start_time ), "\n";								#report total elapsed time
exit;
#===== SUBROUTINES =============================================================================================================
#            Usage : &merge()
#          Purpose : Merge enqueued sorted key files. Returns name of last remaining enqueued file.
#        Arguments : None.
#  Temporary Files : Filename templates are mergeXXXXX.dat
#        Externals : $FileQueue
# Shared Externals : $CursorY, $KeysCreated
#             Subs : None.
#          Remarks : None.
#          History : v1.0.0 - February 26, 2009 - Original release.

sub merge {																				#begin sub
	my $sourceKeys1;																	# filename for 1st keys file to be merged
	my $sourceKeys2;																	# filename for 2nd keys file to be merged
	my $tmpKeys;																		# filename for merged keys file

	do {
		$sourceKeys1 = $FileQueue->dequeue();											# get 1st filename
		$sourceKeys2 = $FileQueue->dequeue();											# get 2nd filename
		print	locate( $CursorY+5, 24 ), clline,										# inform user
				colored ['bold yellow'], "$sourceKeys1",
				locate( $CursorY+6, 24 ), clline,
				colored ['bold yellow'], "$sourceKeys2",
				locate( $CursorY,   15 ), clline,
				colored ['bold yellow'], $FileQueue->pending;
		open( SOURCEKEYS1,		$sourceKeys1    )										#  open 1st keys file for read
		 || die FATAL, "Can't read temporary file '$sourceKeys1' - $!";
		open( SOURCEKEYS2,		$sourceKeys2    )										#  open 2nd keys file for read
		 || die FATAL, "Can't read temporary file '$sourceKeys2' - $!";

		$tmpKeys	= new File::Temp(	TEMPLATE	=> 'mergeXXXXX',					#  create temp file for the merged keys
										DIR			=> $ENV{TEMP},
										SUFFIX		=> '.dat',
										UNLINK		=> 0
									)
						|| die FATAL, "Can't create temporary file in Directory '$ENV{TEMP}' - $!";

		my ( $key1, $key2 ) = ( undef, undef );											#  clear the key from each file
		until(	( !defined $key1 && eof SOURCEKEYS1 ) ||								#  repeat
				( !defined $key2 && eof SOURCEKEYS2 )
			 ) {
			$key1	= <SOURCEKEYS1> unless defined $key1;								#   get the next key in 1st file
			$key2	= <SOURCEKEYS2> unless defined $key2;								#   get the next key in 2nd file
			if( $key1 lt $key2 ) {														#   case of 1st key less than 2nd one
				print $tmpKeys	$key1;													#    add key from 1st file to new temp key file
				undef $key1;															#    clear the current key from 1st file
			} else {																	#   case of 2nd key less than 1st one
				print $tmpKeys	$key2;													#    add key from 2nd file to new temp key file
				undef $key2;															#    clear the current key from 2nd file
			}																			#   end case of keys ordering
		}																				#  until no more keys to process from either file
		unless( !defined $key1 && eof SOURCEKEYS1 ) {									#  if the 1st file has some unprocessed keys
			print $tmpKeys $key1 if defined $key1;										#   add any unprocessed read key to new temp file
			print $tmpKeys	<SOURCEKEYS1>;												#   add any unread keys to new temp file
		} else {																		#  else the 2nd file has some unprocessed keys
			print $tmpKeys $key2 if defined $key2;										#   add any unprocessed read key to new temp file		
			print $tmpKeys	<SOURCEKEYS2>;												#   add any unread keys to new temp file
		}																				#  end if-else
		close SOURCEKEYS1;																#  close the 1st file
		close SOURCEKEYS2;																#  close the 2nd file
		close $tmpKeys;																	#  close the new temp file

		unlink $sourceKeys1, $sourceKeys2;												#  delete source temp files
		$FileQueue->enqueue( $tmpKeys->filename );
		print	locate( $CursorY,   15 ), clline,										#  inform user
				colored ['bold yellow'], $FileQueue->pending,
				locate( $CursorY+4, 24 ), clline,
				colored ['bold green'], "$tmpKeys",
				locate( $CursorY+5, 24 ), clline,
				locate( $CursorY+6, 24 ), clline;
	} until ($KeysCreated == 1) && ($FileQueue->pending == 1);							# until only 1 file left in queue
	$tmpKeys->filename;																	# return filepath of resulting file
}																						#end sub merge
#===== Copyright 2009, Webpraxis Consulting Ltd. - ALL RIGHTS RESERVED - Email: webpraxis@gmail.com ============================
# end of merge-sort_threaded.pl
