Recientemente he estado programando con la api stream que apareció con la api java8 para divertirme, aprender algo y mantener la forma, debido a que llevo un tiempo en una baja médica. El programa consiste en tratar con colecciones de números usando diferentes estructuras, como LinkedHashMap, Hashset, Set, etc… La idea era poder mostrar una secuencia de de números partiendo de un fichero de texto con datos numéricos altamente desordenados.

Una vez conseguida una primera implementación java que funciona y me trata los datos como yo quería, encuentro que estaría bien tener poder generar tantas secuencias como uno quiera y además quiero filtrar de entre todas ejecuciones escribiendo a un único fichero de texto. El caso es que después de escribir un script bash que hace uso de una serie de comandos unix como tr, sort, uniq, me encuentro con la cpu está siendo terriblemente infrautilizada, apenas un 1% de uso, algo totalmente inaceptable, por lo que me decido a reescribir la parte java para usar las capacidades de paralelismo y multihilo que vienen con java desde el principio de los tiempos, solo que esta vez voy a usar algo más nuevo que un Thread, lanzarlo y esperar a que termine su ejecución, aunque no mucho más.

Hablo de usar la interfaz ExecutorService descrita en el paquete java.util.Concurrent desde los tiempos de java 5, usando una implementación descrita por primera vez en el jdk8, ThreadPoolExecutor.

Puedes leer en la documentación oficial aquí.

Básicamente la estrategia es la siguiente, instancias una caché de hilos asíncronos encargados de realizar la tarea pesada, averiguamos cuantos hilos concurrentes podemos usar en nuestra caché de manera segura sin sacrificar todos los recursos del sistema de manera que no aparezca ninguna excepción, asignamos un tiempo prudencial a cada hilo de ejecución para que termine y lanzamos todas las ejecuciones. Como resultado, ganamos mucha utilización de la cpu, pasando de un mísero 1% a un 100% con la consiguiente ganancia de productividad.

Existen muchas apis para crear hilos, como soy un romántico y hace tiempo que no las usaba, decidí usar java.util.Threads. Sí, la primera iteración de esta tecnología 😉

Bueno, al lio, para averiguar las características de la cpu, podemos usar algo asi:

static int getNumberOfCPUCores() {
		String command = "";
		if (OSValidator.isMac()) {
			System.out.println("It looks like this is OsX...");
			command = "sysctl -n machdep.cpu.core_count";
		} else if (OSValidator.isUnix()) {
			System.out.println("It looks like this is Unix/Linux...");
			command = "lscpu";
		} else if (OSValidator.isWindows()) {
			System.out.println("It looks like this is Windows...");
			command = "cmd /C WMIC CPU Get /Format:List";
		}
		Process process = null;
		int numberOfCores = 0;
		int sockets = 0;
		try {
			if (OSValidator.isMac()) {
				String[] cmd = { "/bin/sh", "-c", command };
				process = Runtime.getRuntime().exec(cmd);
			} else {
				process = Runtime.getRuntime().exec(command);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}

		BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
		String line;

		try {
			while ((line = reader.readLine()) != null) {
				if (OSValidator.isMac()) {
					numberOfCores = line.length() > 0 ? Integer.parseInt(line) : 0;
				} else if (OSValidator.isUnix()) {
					if (line.contains("Core(s) per socket:")) {
						numberOfCores = Integer.parseInt(line.split("\\s+")[line.split("\\s+").length - 1]);
					}
					if (line.contains("Socket(s):")) {
						sockets = Integer.parseInt(line.split("\\s+")[line.split("\\s+").length - 1]);
					}
				} else if (OSValidator.isWindows()) {
					if (line.contains("NumberOfCores")) {
						numberOfCores = Integer.parseInt(line.split("=")[1]);
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		if (OSValidator.isUnix()) {
			return numberOfCores * sockets;
		}
		return numberOfCores;
}
public class OSValidator {

	private static String OS = System.getProperty("os.name").toLowerCase();

	public static void main(String[] args) {

		System.out.println(OS);

		if (isWindows()) {
			System.out.println("This is Windows");
		} else if (isMac()) {
			System.out.println("This is Mac");
		} else if (isUnix()) {
			System.out.println("This is Unix or Linux");
		} else if (isSolaris()) {
			System.out.println("This is Solaris");
		} else {
			System.out.println("Your OS is not support!!");
		}
	}

	public static boolean isWindows() {
		return (OS.indexOf("win") >= 0);
	}

	public static boolean isMac() {
		return (OS.indexOf("mac") >= 0);
	}

	public static boolean isUnix() {
		return (OS.indexOf("nix") >= 0 || OS.indexOf("nux") >= 0 || OS.indexOf("aix") > 0);
	}

	public static boolean isSolaris() {
		return (OS.indexOf("sunos") >= 0);
	}

	public static String getOS() {
		if (isWindows()) {
			return "win";
		} else if (isMac()) {
			return "osx";
		} else if (isUnix()) {
			return "uni";
		} else if (isSolaris()) {
			return "sol";
		} else {
			return "err";
		}
	}
}

Una vez que tienes algo asi, puedes usar en tu clase main o donde creas conveniente una caché de hilos. Encontré que es seguro usar el doble de hilos debido a que mi cpu tiene capacidades Multihilo,

int cores = Utils.getNumberOfCPUCores();
System.out.println("There are " + cores + " physical cores. I will use the double for the executor thread pool.");
ExecutorService executor = Executors.newFixedThreadPool(cores * 2);

Por lo que, una vez que tienes la caché de hilos inicializada, podemos empaquetar como Runnable la tarea pesada que quieres acelerar y luego invocar el comienzo de la caché.


for (int i = 1; i <= numThreadsToRun; i++) {
	Future<?> future = executor.submit(Utils.showSorteredValuesReversedOrder(pathToEM));
	future.get(5, TimeUnit.SECONDS);			
}

shutdownThreads(executor);

private static void shutdownThreads(ExecutorService executor) {
try {
   System.out.println("attempt to shutdown executor");
   executor.shutdown();
   executor.awaitTermination(5, TimeUnit.MINUTES);
} catch (InterruptedException e) {
   System.err.println("tasks interrupted");
   System.exit(-1);
} finally {
   if (!executor.isTerminated()) {
       System.err.println("cancel non-finished tasks");
}
executor.shutdownNow();
System.out.println("shutdown finished");
}
}

La variable numThreadsToRun puede ser tan grande como quieras, como puedes ver, voy a esperar un máximo de 5 segundos para cada tarea para que la caché se ocupe de asignar otro hilo para la siguiente tarea. A continuación, la tarea pesada. Estoy escribiendo solo una parte, en realidad tiene que trabajar con más estructuras de datos, pero por razones de legibilidad, voy a mostrar una sola. Más abajo os doy el enlace de github.

static Runnable showSorteredValuesReversedOrder(String pathToEM)
			throws FileNotFoundException, InterruptedException {

		Runnable runnable = () -> {
			List<AnotherEMPojo> myListEMPojo;
			try {
				myListEMPojo = Utils.processHistoricInputFile(pathToEM);
				Set<Entry<Integer, List<Integer>>> mySetStar1 = myListEMPojo.stream().map(o -> o.getStar1())
						.collect(Collectors.groupingBy(obj -> obj)).entrySet();

				
				LinkedHashMap<Integer, Integer> lhmStar1 = Utils.calculateMostFrequentValuesReversedOrder(mySetStar1);

				Set<Entry<Integer, Integer>> setStars = new LinkedHashSet<Entry<Integer, Integer>>();

				Iterator<Entry<Integer, Integer>> itStar1 = lhmStar1.entrySet().iterator();
				
				while (itStar1.hasNext()) {
					Entry<Integer, Integer> it = itStar1.next();
					if (setStars.add(it)) {System.out.println("star " + it.getKey());
					}

				}

			} catch (FileNotFoundException e) {
				e.printStackTrace();
			}

		};

		return runnable;
}

static LinkedHashMap<Integer, Integer> calculateMostFrequentValuesReversedOrder(
			Set<Entry<Integer, List<Integer>>> mySetwinner) {

		Iterator<Entry<Integer, List<Integer>>> iteratorSetWinner1 = mySetwinner.iterator();
		HashMap<Integer, Integer> aMap = new HashMap<Integer, Integer>();
		while (iteratorSetWinner1.hasNext()) {
			Entry<Integer, List<Integer>> it = iteratorSetWinner1.next();
			List<Integer> listValues = it.getValue();
			final Comparator<Integer> c = (p1, p2) -> Integer.compare(p1, p2);
			listValues.sort(c);
			aMap.put(it.getKey(), it.getValue().size());
		}
		// creating sorted map by value reversed order
		LinkedHashMap<Integer, Integer> aLinkedHM = aMap.entrySet().stream()
				.sorted(Collections.reverseOrder(comparingByValue()))
				.collect(toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e2, LinkedHashMap::new));
		return aLinkedHM;
}

static List<AnotherEMPojo> processHistoricInputFile(String inputFilePath) throws FileNotFoundException {
		List<AnotherEMPojo> inputList = new ArrayList<AnotherEMPojo>();
		try {
			File inputF = new File(inputFilePath);
			InputStream inputFS = new FileInputStream(inputF);
			BufferedReader br = new BufferedReader(new InputStreamReader(inputFS));
			// skip the header of the csv
			inputList = br.lines().skip(1).map(mapToHistoricItem).collect(Collectors.toList());
			br.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return inputList;
}

private static Function<String, AnotherEMPojo> mapToHistoricItem = (line) -> {
		String[] p = line.split(COMMA);// a CSV has comma separated lines
		AnotherEMPojo item = new AnotherEMPojo();
		item.setDateContest(p[0]);
		item.setWiner1(Integer.parseInt(p[1]));
		item.setWiner2(Integer.parseInt(p[2]));
		item.setWiner3(Integer.parseInt(p[3]));
		item.setWiner4(Integer.parseInt(p[4]));
		item.setWiner5(Integer.parseInt(p[5]));
		// 6 and 7 are null values...
		item.setStar1(Integer.parseInt(p[7]));
		item.setStar2(Integer.parseInt(p[8]));
		return item;
};

Antes hablaba sobre un script bash, por razones de claridad, voy a incluir su código también.

type=$1
numIterations=$2
PATH_TO_Euromillones2004_2018=$3
PATH_TO_Primitiva=$4
clear && java -cp target/MyStreamjava8-1.0.1-RELEASE.jar com.aironman.ApiStreamTests "$type" "$numIterations" "$PATH_TO_Euromillones2004_2018" "$PATH_TO_Primitiva" >> "output.txt"

# Me quedo con los ganadores y las guardo en un fichero
while read linea;
do 
	if [[ "${linea}" =~ "Winner1" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Winner2" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Winner3" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Winner4" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Winner5" ]] 
	then echo $linea;
	fi;
done < "output.txt" > "final_winners.txt"

(tr ' ' '\n' | sort | uniq -c | awk '{print "winner " $2 " appeared "$1 " times of '"$numIterations"'. Frequency is "$1*100 / '"$numIterations"'" %"}') < "final_winners.txt" > "final_output_winners.txt"

while read linea;
do 
	if [[ "${linea}" =~ "Star1" ]] 
	then echo $linea;
	fi;
	if [[ "${linea}" =~ "Star2" ]] 
	then echo $linea;
	fi;
done < "output.txt" > "final_star.txt"
(tr ' ' '\n' | sort | uniq -c | awk '{print "star " $2 " appeared "$1 " times of '"$numIterations"'. Frequency is "$1*100 / '"$numIterations"'" %"}') < "final_star.txt" > "final_output_star.txt"

finish_time=$(date +%s)
echo "A general file named output.txt has been generated. "
echo "Showing winners."
cat "final_output_winners.txt"
echo "Showing stars."
cat "final_output_star.txt"
echo "Done. Time duration: $((finish_time - start_time)) secs."

Eso es todo, el código de todo el proyecto está alojado en GitHub. Feliz entrada de año 2019, sean felices y cualquier cosa que necesiten que aclare, háganmelo saber.

Advertisement

2 thoughts on “Acerca de como mejorar el rendimiento de las aplicaciones backend usando capacidades multihilo y paralelismo

  1. You could certainly see your skills within the article you write. The world hopes for more passionate writers like you who aren’t afraid to mention how they believe. All the time go after your heart.|

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s