Recently I have been programming with the api stream that appeared with the api java8 to have fun, learn something and maintain the form, because I’ve been on a medical leave for a while. The program consists of dealing with collections of numbers using different structures, like LinkedHashMap, Hashset, Set, etc. The idea was to be able to show a sequence of numbers starting from a text file with highly disordered numerical data.

Once I got a first java implementation that works and treats me the data as I wanted, I find that it would be nice to be able to generate as many sequences as you want and I also want to filter between all executions writing to a single text file. The case is that after writing a bash script that makes use of a series of unix commands such as tr, sort, uniq, I find the cpu is being terribly underused, just 1% use, something totally unacceptable, so I decide to rewrite the java part to use the parallelism and multithread capabilities that come with java since the beginning of time, only this time I will use something newer than a Thread, launch it and wait for it to finish running, but not much more.

I’m talking about using the ExecutorService interface described in the java.util.Concurrent package since the times of java 5, using an implementation described for the first time in the jdk8, ThreadPoolExecutor.

You can read the official documentation here.

Basically the strategy is the following, instances a cache of asynchronous threads to perform the heavy task, we find out how many concurrent threads we can use in our cache safely without sacrificing all system resources so that no exception appears, we assign a reasonable time to each thread to finish and launch all executions. As a result, we gain a lot of cpu utilization, going from a measly 1% to 100% with the consequent productivity gain.

Well, to find out the characteristics of the cpu, we can use something like this:

static int getNumberOfCPUCores() {
		String command = "";
		if (OSValidator.isMac()) {
			System.out.println("It looks like this is OsX...");
			command = "sysctl -n machdep.cpu.core_count";
		} else if (OSValidator.isUnix()) {
			System.out.println("It looks like this is Unix/Linux...");
			command = "lscpu";
		} else if (OSValidator.isWindows()) {
			System.out.println("It looks like this is Windows...");
			command = "cmd /C WMIC CPU Get /Format:List";
		}
		Process process = null;
		int numberOfCores = 0;
		int sockets = 0;
		try {
			if (OSValidator.isMac()) {
				String[] cmd = { "/bin/sh", "-c", command };
				process = Runtime.getRuntime().exec(cmd);
			} else {
				process = Runtime.getRuntime().exec(command);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}

		BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
		String line;

		try {
			while ((line = reader.readLine()) != null) {
				if (OSValidator.isMac()) {
					numberOfCores = line.length() > 0 ? Integer.parseInt(line) : 0;
				} else if (OSValidator.isUnix()) {
					if (line.contains("Core(s) per socket:")) {
						numberOfCores = Integer.parseInt(line.split("\\s+")[line.split("\\s+").length - 1]);
					}
					if (line.contains("Socket(s):")) {
						sockets = Integer.parseInt(line.split("\\s+")[line.split("\\s+").length - 1]);
					}
				} else if (OSValidator.isWindows()) {
					if (line.contains("NumberOfCores")) {
						numberOfCores = Integer.parseInt(line.split("=")[1]);
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		if (OSValidator.isUnix()) {
			return numberOfCores * sockets;
		}
		return numberOfCores;
}
public class OSValidator {

	private static String OS = System.getProperty("os.name").toLowerCase();

	public static void main(String[] args) {

		System.out.println(OS);

		if (isWindows()) {
			System.out.println("This is Windows");
		} else if (isMac()) {
			System.out.println("This is Mac");
		} else if (isUnix()) {
			System.out.println("This is Unix or Linux");
		} else if (isSolaris()) {
			System.out.println("This is Solaris");
		} else {
			System.out.println("Your OS is not support!!");
		}
	}

	public static boolean isWindows() {
		return (OS.indexOf("win") >= 0);
	}

	public static boolean isMac() {
		return (OS.indexOf("mac") >= 0);
	}

	public static boolean isUnix() {
		return (OS.indexOf("nix") >= 0 || OS.indexOf("nux") >= 0 || OS.indexOf("aix") > 0);
	}

	public static boolean isSolaris() {
		return (OS.indexOf("sunos") >= 0);
	}

	public static String getOS() {
		if (isWindows()) {
			return "win";
		} else if (isMac()) {
			return "osx";
		} else if (isUnix()) {
			return "uni";
		} else if (isSolaris()) {
			return "sol";
		} else {
			return "err";
		}
	}
}

Once you have something like this, you can use it in your main class or wherever you want a thread cache. I found that it is safe to use twice as many threads because my cpu has Multithread capabilities.

int cores = Utils.getNumberOfCPUCores();
System.out.println("There are " + cores + " physical cores. I will use the double for the executor thread pool.");
ExecutorService executor = Executors.newFixedThreadPool(cores * 2);

So, once you have the thread cache initialized, we can pack as Runnable the heavy task you want to speed up and then invoke the start of the cache.

for (int i = 1; i <= numThreadsToRun; i++) {
	Future<?> future = executor.submit(Utils.showSorteredValuesReversedOrder(pathToEM));
	future.get(5, TimeUnit.SECONDS);			
}

shutdownThreads(executor);

private static void shutdownThreads(ExecutorService executor) {
try {
   System.out.println("attempt to shutdown executor");
   executor.shutdown();
   executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
   System.err.println("tasks interrupted");
   System.exit(-1);
} finally {
   if (!executor.isTerminated()) {
       System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}

The numThreadsToRun variable can be as big as you want, as you can see, I’m going to wait a maximum of 5 seconds for each task so that the cache takes care of assigning another thread for the next task. Then, the heavy task. I’m writing only one part, actually you have to work with more data structures, but for readability reasons, I’ll show only one. Below I give you the github link.

static Runnable showSorteredValuesReversedOrder(String pathToEM)
			throws FileNotFoundException, InterruptedException {

		Runnable runnable = () -> {
			List<AnotherEMPojo> myListEMPojo;
			try {
				myListEMPojo = Utils.processHistoricInputFile(pathToEM);
				Set<Entry<Integer, List<Integer>>> mySetStar1 = myListEMPojo.stream().map(o -> o.getStar1())
						.collect(Collectors.groupingBy(obj -> obj)).entrySet();

				
				LinkedHashMap<Integer, Integer> lhmStar1 = Utils.calculateMostFrequentValuesReversedOrder(mySetStar1);

				Set<Entry<Integer, Integer>> setStars = new LinkedHashSet<Entry<Integer, Integer>>();

				Iterator<Entry<Integer, Integer>> itStar1 = lhmStar1.entrySet().iterator();
				
				while (itStar1.hasNext()) {
					Entry<Integer, Integer> it = itStar1.next();
					if (setStars.add(it)) {System.out.println("star " + it.getKey());
					}

				}

			} catch (FileNotFoundException e) {
				e.printStackTrace();
			}

		};

		return runnable;
}

static LinkedHashMap<Integer, Integer> calculateMostFrequentValuesReversedOrder(
			Set<Entry<Integer, List<Integer>>> mySetwinner) {

		Iterator<Entry<Integer, List<Integer>>> iteratorSetWinner1 = mySetwinner.iterator();
		HashMap<Integer, Integer> aMap = new HashMap<Integer, Integer>();
		while (iteratorSetWinner1.hasNext()) {
			Entry<Integer, List<Integer>> it = iteratorSetWinner1.next();
			List<Integer> listValues = it.getValue();
			final Comparator<Integer> c = (p1, p2) -> Integer.compare(p1, p2);
			listValues.sort(c);
			aMap.put(it.getKey(), it.getValue().size());
		}
		// creating sorted map by value reversed order
		LinkedHashMap<Integer, Integer> aLinkedHM = aMap.entrySet().stream()
				.sorted(Collections.reverseOrder(comparingByValue()))
				.collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2, LinkedHashMap::new));
		return aLinkedHM;
}

static List<AnotherEMPojo> processHistoricInputFile(String inputFilePath) throws FileNotFoundException {
		List<AnotherEMPojo> inputList = new ArrayList<AnotherEMPojo>();
		try {
			File inputF = new File(inputFilePath);
			InputStream inputFS = new FileInputStream(inputF);
			BufferedReader br = new BufferedReader(new InputStreamReader(inputFS));
			// skip the header of the csv
			inputList = br.lines().skip(1).map(mapToHistoricItem).collect(Collectors.toList());
			br.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return inputList;
}

private static Function<String, AnotherEMPojo> mapToHistoricItem = (line) -> {
		String[] p = line.split(COMMA);// a CSV has comma separated lines
		AnotherEMPojo item = new AnotherEMPojo();
		item.setDateContest(p[0]);
		item.setWiner1(Integer.parseInt(p[1]));
		item.setWiner2(Integer.parseInt(p[2]));
		item.setWiner3(Integer.parseInt(p[3]));
		item.setWiner4(Integer.parseInt(p[4]));
		item.setWiner5(Integer.parseInt(p[5]));
		// 6 and 7 are null values...
		item.setStar1(Integer.parseInt(p[7]));
		item.setStar2(Integer.parseInt(p[8]));
		return item;
};

Before I was talking about a bash script, for reasons of clarity, I’m going to include your code too.

#!/bin/bash
type=$1
numIterations=$2
PATH_TO_Euromillones2004_2018=$3
PATH_TO_Primitiva=$4
clear && java -cp target/MyStreamjava8-1.0.1-RELEASE.jar com.aironman.ApiStreamTests "$type" "$numIterations" "$PATH_TO_Euromillones2004_2018" "$PATH_TO_Primitiva" >> "output.txt"

# Me quedo con los ganadores y las guardo en un fichero
while read linea;
do 
	if [[ "${linea}" =~ "Winner1" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Winner2" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Winner3" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Winner4" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Winner5" ]] 
	then echo $linea;
	fi;
done < "output.txt" > "final_winners.txt"

(tr ' ' '\n' | sort | uniq -c | awk '{print "winner " $2 " appeared "$1 " times of '"$numIterations"'. Frequency is "$1*100 / '"$numIterations"'" %"}') < "final_winners.txt" > "final_output_winners.txt"

while read linea;
do 
	if [[ "${linea}" =~ "Star1" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Star2" ]] 
	then echo $linea;
	fi;
done < "output.txt" > "final_star.txt"
(tr ' ' '\n' | sort | uniq -c | awk '{print "star " $2 " appeared "$1 " times of '"$numIterations"'. Frequency is "$1*100 / '"$numIterations"'" %"}') < "final_star.txt" > "final_output_star.txt"

finish_time=$(date +%s)
echo "A general file named output.txt has been generated. "
echo "Showing winners."
cat "final_output_winners.txt"
echo "Showing stars."
cat "final_output_star.txt"
echo "Done. Time duration: $((finish_time - start_time)) secs."

That’s it, the code for the whole project is hosted on GitHub. Happy New Year 2019, be happy and whatever you need me to clarify, let me know.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s