5. Implementazione Multi Threading

In questa lezione vedremo come velocizzare in modo significativo il nostro port scanner grazie ai thread.

Vedremo inoltre come creare una barra di avanzamento in modo da poter mostrare agli utenti che stanno utilizzando il nostro scanner a che punto è il processo di scansione.

Quando si vuole velocizzare un processo ci sono strade differenti che si possono intraprendere sulla base del problema che si sta cercando di risolvere: spesso dobbiamo svolgere lavori che sono limitati da un punto di vista input/output perché mentre leggiamo un file o aspettiamo che venga risolta una richiesta di rete la nostra potenza di calcolo viene sprecata, oppure abbiamo dei problemi che fanno uso intensivo della CPU. A questo si aggiunge anche il fatto che talvolta abbiamo bisogno di condividere gli stessi dati.

Avremo quindi una classe di problemi che trarrà vantaggio da una parallelizzazione multiprocesso e un’altra classe di problemi per cui ha senso utilizzare i thread.

Semplificando possiamo dire che un processo è un programma in esecuzione, mentre un thread è un segmento di processo: possiamo infatti avere più thread all’interno del medesimo processo.


Come velocizzare la scansione delle porte con il modulo multiprocessing

Utilizziamo quindi i thread per velocizzare la scansione delle nostre porte. Apriamo il file utils.py e importiamo ThreadPool da multiprocessing.pool. Per quelli di voi che vogliono esplorare delle alternative rispetto a ciò che faremo, diciamo che è possibile importare ThreadPoolExecuter da concurrent.futures.

Importiamo anche os e creiamo una nuova funzione chiamata threadpool_executer() in cui definiamo il numero di thread da utilizzare nella variabile number_of_workers:

import json
import os
from multiprocessing.pool import ThreadPool


def extract_json_data(filename):
    # ...

def threadpool_executer(function, iterable, iterable_length):
    number_of_workers = os.cpu_count()
    print(f"\nRunning using {number_of_workers} workers.\n")

Importiamo anche la Console di rich per migliorare l’aspetto grafico:

from rich.console import Console

console = Console()

Per fare in modo che la nostra funzione venga eseguita per ciascun elemento dell’iterabile utilizziamo l’istruzione with:

def threadpool_executer(function, iterable, iterable_length):
    number_of_workers = os.cpu_count()
    print(f"\nRunning using {number_of_workers} workers.\n")
    with ThreadPool(number_of_workers) as pool:
        pool.imap(function, iterable)

Per tenere traccia del livello di avanzamento della scansione utilizziamo la funzione enumerate:

def threadpool_executer(function, iterable, iterable_length):
    # ...
    with ThreadPool(number_of_workers) as pool:
        for loop_index, _ in enumerate(pool.imap(function, iterable), 1):
            progress = (loop_index / iterable_length) * 100
			print(progress)

Importiamo quindi la funzione threadpool_executer() in pscan.py:

from utils import extract_json_data, threadpool_executer

Modifichiamo il metodo run() in modo da implementarla e cancelliamo tutto il codice che non ci serve più:

def run(self):
	threadpool_executer(
		self.scan_port, self.ports_info.keys(), len(self.ports_info.keys())
	)
	self.show_completion_message()

Adesso se avviamo il codice vedremo che l’analisi delle porte aperte è molto più veloce. Il livello di avanzamento ci mostra decisamente troppe cifre decimali dopo la virgola, riduciamole a una sola in questo modo:

def threadpool_executer(function, iterable, iterable_length):
    # ...
    with ThreadPool(number_of_workers) as pool:
        for loop_index, _ in enumerate(pool.imap(function, iterable), 1):
            progress = "%.1f" % (loop_index / iterable_length) * 100
			print(progress)


Implementiamo una barra di avanzamento per la scansione

Ogni funzione dovrebbe occuparsi di una sola task, quindi separiamo la barra di avanzamento da threadpool_executer() e implementiamola in una nuova funzione chiamata display_progress():

def display_progress(iteration, total):
    progress = "%.1f" % (loop_index / iterable_length) * 100
	print(progress)


def threadpool_executer(function, iterable, iterable_length):
    number_of_workers = os.cpu_count()
    print(f"\nRunning using {number_of_workers} workers.\n")
    with ThreadPool(number_of_workers) as pool:
        for loop_index, _ in enumerate(pool.imap(function, iterable), 1):
			display_progress(loop_index, iterable_length)

Vogliamo mostrare una barra di avanzamento senza andare a capo ogni volta che lo stato della barra deve essere aggiornato. Per fare ciò iniziamo impostando a 45 caratteri la larghezza massima della nostra barra tramite la variabile bar_max_width, calcoliamo lo stato di avanzamento attuale della barra assegnandolo alla variabile bar_current_width e facciamo in modo che l’avanzamente venga mostrato tramite l’aggiunta di "█". Utilizziamo console.print per colorare l’output e tramite l’istruzione if facciamo in modo che se abbiamo concluso la scansione delle porte lasciamo uno spazio con un print vuoto.

def display_progress(iteration, total):
    bar_max_width = 45  # chars
    bar_current_width = bar_max_width * iteration // total
    bar = "█" * bar_current_width + "-" * (bar_max_width - bar_current_width)
    progress = "%.1f" % (iteration / total * 100)
    console.print(f"|{bar}| {progress} %", style="bold green")
    if iteration == total:
        print()

Se adesso eseguiamo il codice vediamo che la barra di avanzamento funziona, ma ne viene mostrata una aggiuntiva ogni volta che la percentuale di completamento viene aggiornata. Per fare in modo che la barra mostrata sia una sola implementiamo il carriage return\r, che consente al cursore di tornare all’inizio:

def display_progress(iteration, total):
    # ...
    console.print(f"|{bar}| {progress} %", end="\r", style="bold green")
    # ...

Adesso il nostro programma è concluso!

Potete trovare il codice completo di PScan nel repo GitHub.