🔝 Retour au Sommaire
La programmation concurrente permet à votre programme d'exécuter plusieurs tâches en même temps (ou presque). C'est particulièrement utile pour :
- Effectuer plusieurs opérations d'entrée/sortie simultanément (téléchargements, requêtes réseau)
- Exploiter plusieurs cœurs de processeur pour des calculs intensifs
- Améliorer la réactivité de vos applications
Python propose deux approches principales pour la concurrence : Threading et Multiprocessing.
Les threads sont des "fils d'exécution" qui partagent le même espace mémoire au sein d'un même processus.
Analogie : Imaginez une cuisine avec plusieurs cuisiniers qui partagent les mêmes ustensiles, le même réfrigérateur et le même plan de travail.
Caractéristiques :
- Partage la mémoire entre les threads
- Plus léger et rapide à créer
- Limité par le GIL (Global Interpreter Lock) en Python
- Idéal pour les tâches I/O (entrées/sorties) : lecture de fichiers, requêtes réseau, etc.
Le multiprocessing crée plusieurs processus indépendants, chacun avec sa propre mémoire.
Analogie : Imaginez plusieurs cuisines séparées, chacune avec son propre équipement et ses propres ressources.
Caractéristiques :
- Chaque processus a sa propre mémoire
- Plus lourd à créer et gérer
- Contourne le GIL et peut utiliser plusieurs cœurs CPU
- Idéal pour les calculs intensifs (CPU-bound)
Le GIL est un verrou dans CPython (l'implémentation standard de Python) qui empêche plusieurs threads d'exécuter du code Python en même temps.
Conséquence : Pour les calculs intensifs, les threads ne sont pas vraiment parallèles en Python. C'est pourquoi on utilise le multiprocessing pour les tâches CPU-bound.
Exception : Les opérations I/O libèrent le GIL, donc les threads sont efficaces pour ces tâches.
Note (Python 3.13+) : un build free-threaded (sans GIL) existe depuis Python 3.13 — expérimental en 3.13, officiellement supporté en 3.14, mais toujours optionnel (voir le README du chapitre). Sauf si vous avez explicitement installé ce build particulier, le GIL décrit ici s'applique, et tout ce qui suit reste valable.
Le module threading permet de créer et gérer des threads facilement.
import threading
import time
def afficher_nombres():
"""Fonction qui affiche des nombres"""
for i in range(5):
print(f"Nombre: {i}")
time.sleep(1)
# Créer un thread
thread = threading.Thread(target=afficher_nombres)
# Démarrer le thread
thread.start()
# Le programme principal continue son exécution
print("Le thread a été lancé!")
# Attendre que le thread se termine
thread.join()
print("Le thread est terminé") Explication :
Thread(target=...)crée un nouveau thread qui exécutera la fonction spécifiéestart()lance l'exécution du threadjoin()attend que le thread se termine avant de continuer
import threading
import time
def telecharger_fichier(nom_fichier):
"""Simule le téléchargement d'un fichier"""
print(f"Début du téléchargement de {nom_fichier}")
time.sleep(2) # Simule le temps de téléchargement
print(f"Téléchargement de {nom_fichier} terminé")
# Créer plusieurs threads
fichiers = ["image1.jpg", "image2.jpg", "image3.jpg"]
threads = []
for fichier in fichiers:
thread = threading.Thread(target=telecharger_fichier, args=(fichier,))
threads.append(thread)
thread.start()
# Attendre que tous les threads se terminent
for thread in threads:
thread.join()
print("Tous les téléchargements sont terminés")Avantage : Les trois "téléchargements" se font en parallèle, économisant du temps par rapport à une exécution séquentielle.
import threading
import time
class MonThread(threading.Thread):
def __init__(self, nom, duree):
super().__init__()
self.nom = nom
self.duree = duree
def run(self):
"""Méthode exécutée quand le thread démarre"""
print(f"{self.nom} commence")
time.sleep(self.duree)
print(f"{self.nom} termine")
# Créer et lancer des threads
thread1 = MonThread("Thread-1", 2)
thread2 = MonThread("Thread-2", 3)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print("Tous les threads sont terminés")Lorsque plusieurs threads accèdent aux mêmes données, il faut utiliser un Lock pour éviter les conflits.
import threading
# Variable partagée
compteur = 0
# Verrou pour protéger l'accès
lock = threading.Lock()
def incrementer():
global compteur
for _ in range(100000):
# Acquérir le verrou avant de modifier la variable
with lock:
compteur += 1
# Créer plusieurs threads
threads = []
for _ in range(5):
thread = threading.Thread(target=incrementer)
threads.append(thread)
thread.start()
# Attendre tous les threads
for thread in threads:
thread.join()
print(f"Valeur finale du compteur: {compteur}")
# Sans lock, le résultat serait imprévisible
# Avec lock, on obtient toujours 500000Point important : Le with lock: garantit qu'un seul thread à la fois peut modifier le compteur.
Le module multiprocessing permet de créer des processus indépendants, idéal pour les calculs intensifs.
import multiprocessing
import time
def calculer_carre(nombre):
"""Calcule le carré d'un nombre"""
print(f"Calcul du carré de {nombre}")
time.sleep(1)
resultat = nombre ** 2
print(f"Résultat: {resultat}")
if __name__ == '__main__':
# Créer un processus
processus = multiprocessing.Process(target=calculer_carre, args=(5,))
# Démarrer le processus
processus.start()
print("Le processus a été lancé!")
# Attendre que le processus se termine
processus.join()
print("Le processus est terminé")Note importante : Le if __name__ == '__main__': est indispensable dès qu'on crée des processus. Selon la méthode de démarrage, l'interpréteur réimporte le module principal dans chaque processus enfant ; sans ce garde-fou, le code de création des processus se réexécuterait dans l'enfant, provoquant une cascade infinie de processus (ou une erreur). C'est le cas sous Windows et macOS (méthode spawn), et désormais sous Linux à partir de Python 3.14 (la méthode par défaut passe de fork à forkserver). En pratique : placez toujours votre code multiprocessing sous ce garde.
La classe Pool permet de distribuer facilement des tâches sur plusieurs processus.
import multiprocessing
def calculer_cube(nombre):
"""Calcule le cube d'un nombre"""
return nombre ** 3
if __name__ == '__main__':
nombres = [1, 2, 3, 4, 5, 6, 7, 8]
# Créer un pool de 4 processus
with multiprocessing.Pool(processes=4) as pool:
resultats = pool.map(calculer_cube, nombres)
print(f"Nombres: {nombres}")
print(f"Cubes: {resultats}")Explication :
Pool(processes=4)crée 4 processus workersmap()distribue automatiquement les calculs entre les processus- C'est l'équivalent parallèle de la fonction
map()standard
import multiprocessing
import time
def calcul_intensif(n):
"""Fonction qui effectue un calcul coûteux"""
total = 0
for i in range(n):
total += i ** 2
return total
def execution_sequentielle(nombres):
"""Exécution séquentielle"""
debut = time.perf_counter() # perf_counter : horloge monotone pour mesurer une durée
resultats = [calcul_intensif(n) for n in nombres]
duree = time.perf_counter() - debut
print(f"Séquentiel: {duree:.2f} secondes")
return resultats
def execution_parallele(nombres):
"""Exécution parallèle"""
debut = time.perf_counter()
with multiprocessing.Pool() as pool:
resultats = pool.map(calcul_intensif, nombres)
duree = time.perf_counter() - debut
print(f"Parallèle: {duree:.2f} secondes")
return resultats
if __name__ == '__main__':
nombres = [5000000] * 8 # 8 calculs identiques
print("Calculs intensifs:")
execution_sequentielle(nombres)
execution_parallele(nombres)Résultat typique : L'exécution parallèle est significativement plus rapide sur un CPU multi-cœurs.
Les processus ne partagent pas la mémoire, il faut utiliser des mécanismes de communication comme Queue.
import multiprocessing
import time
def producteur(queue, nombre_items):
"""Produit des données et les met dans la queue"""
for i in range(nombre_items):
item = f"Item-{i}"
queue.put(item)
print(f"Produit: {item}")
time.sleep(0.5)
queue.put(None) # Signal de fin
def consommateur(queue):
"""Consomme les données de la queue"""
while True:
item = queue.get()
if item is None:
break
print(f"Consommé: {item}")
time.sleep(1)
if __name__ == '__main__':
# Créer une queue partagée
queue = multiprocessing.Queue()
# Créer les processus
proc_producteur = multiprocessing.Process(target=producteur, args=(queue, 5))
proc_consommateur = multiprocessing.Process(target=consommateur, args=(queue,))
# Démarrer les processus
proc_producteur.start()
proc_consommateur.start()
# Attendre la fin
proc_producteur.join()
proc_consommateur.join()
print("Communication terminée")Gérer manuellement des threads ou des processus (création, start(), join(), collecte des résultats) devient vite fastidieux. Le module concurrent.futures offre une interface de haut niveau qui fonctionne à l'identique pour les threads et les processus : il suffit de changer une seule classe.
ThreadPoolExecutor: un pool de threads (pour les tâches I/O-bound)ProcessPoolExecutor: un pool de processus (pour les tâches CPU-bound)
Les deux exposent exactement la même API. C'est aujourd'hui la façon recommandée d'écrire du code concurrent en Python pour la plupart des besoins courants.
from concurrent.futures import ThreadPoolExecutor
def traiter(n):
return n * n
with ThreadPoolExecutor(max_workers=4) as executor:
resultats = list(executor.map(traiter, [1, 2, 3, 4, 5]))
print(resultats) # [1, 4, 9, 16, 25]Pour passer aux processus (tâche CPU-bound), il suffit de remplacer ThreadPoolExecutor par ProcessPoolExecutor — le reste du code est identique :
from concurrent.futures import ProcessPoolExecutor
def traiter(n):
return n * n
if __name__ == '__main__': # requis : voir la note sur multiprocessing
with ProcessPoolExecutor() as executor:
resultats = list(executor.map(traiter, [1, 2, 3, 4, 5]))
print(resultats)submit() lance une tâche et renvoie immédiatement un objet Future : une promesse de résultat que l'on récupère plus tard avec .result().
from concurrent.futures import ThreadPoolExecutor
def telecharger(url):
# ... travail I/O (requête réseau) ...
return f"contenu de {url}"
urls = ["a.com", "b.com", "c.com"]
with ThreadPoolExecutor(max_workers=3) as executor:
# Soumettre les tâches : un Future par tâche, associé à son URL
futures = {executor.submit(telecharger, url): url for url in urls}
# .result() attend la fin de la tâche et renvoie sa valeur
for future, url in futures.items():
print(f"{url} -> {future.result()}")as_completed() renvoie les Future dès qu'ils se terminent, sans attendre les plus lents. Idéal pour afficher une progression.
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def tache(n):
time.sleep(n) # les tâches courtes finissent en premier
return f"tâche {n} terminée"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(tache, duree) for duree in (3, 1, 2)]
for future in as_completed(futures):
print(future.result()) # ordre d'arrivée : 1, puis 2, puis 3Gestion des erreurs : une exception levée dans une tâche n'est pas perdue. Elle est re-levée au moment de l'appel à
future.result(), ce qui permet de l'entourer d'untry/except. C'est un avantage majeur sur la gestion manuelle des threads, où les exceptions passent souvent inaperçues.
Choisir le bon pool :
ThreadPoolExecutorpour l'I/O-bound (le GIL est libéré pendant l'attente),ProcessPoolExecutorpour le CPU-bound (vrai parallélisme sur plusieurs cœurs). C'est la même distinction que Threading vs Multiprocessing, mais avec une API bien plus simple.
Et en Python 3.14 : un troisième exécuteur,
InterpreterPoolExecutor, rejoint la famille. Il répartit le travail sur des sous-interpréteurs (chacun avec son propre GIL), pour un vrai parallélisme CPU plus léger que le multiprocessing — avec la même API. Voir le README du chapitre.
✅ Opérations I/O (Entrées/Sorties)
- Téléchargement de fichiers
- Requêtes réseau/API
- Lecture/écriture de fichiers
- Connexions bases de données
✅ Tâches qui attendent beaucoup
- Serveurs web qui gèrent plusieurs connexions
- Applications avec interface graphique
Exemple typique : Un scraper web qui télécharge 100 pages simultanément.
✅ Calculs intensifs (CPU-bound)
- Traitement d'images
- Calculs mathématiques complexes
- Analyse de données volumineuses
- Encodage vidéo/audio
✅ Besoin de vraie parallélisation
- Exploitation de plusieurs cœurs CPU
- Calculs scientifiques
Exemple typique : Traiter 1000 images pour appliquer des filtres.
| Critère | Threading | Multiprocessing |
|---|---|---|
| Mémoire | Partagée | Séparée |
| Création | Rapide | Plus lent |
| GIL | Limité par le GIL | Contourne le GIL |
| Idéal pour | I/O-bound | CPU-bound |
| Communication | Simple (mémoire partagée) | Plus complexe (Queue, Pipe) |
| Overhead | Faible | Plus élevé |
| Multi-cœurs | Non exploité pour calculs | Pleinement exploité |
Avec multiprocessing, c'est essentiel pour éviter des erreurs avec les méthodes de démarrage spawn et forkserver (Windows, macOS, et Linux à partir de Python 3.14) :
if __name__ == '__main__':
# Code multiprocessing ici
passPréférez with pour gérer automatiquement la fermeture des ressources :
# Bon
with multiprocessing.Pool() as pool:
resultats = pool.map(fonction, donnees)
# Moins bon (il faut fermer manuellement)
pool = multiprocessing.Pool()
resultats = pool.map(fonction, donnees)
pool.close()
pool.join() Ne créez pas un thread/processus par élément. Utilisez un Pool avec un nombre raisonnable :
# Bon - pool de 4 processus pour traiter 1000 items
with multiprocessing.Pool(processes=4) as pool:
resultats = pool.map(fonction, mes_1000_items)
# Mauvais - créer 1000 processus
for item in mes_1000_items:
Process(target=fonction, args=(item,)).start() # ❌En threading, utilisez toujours des verrous pour les variables partagées :
lock = threading.Lock()
def modifier_variable_partagee():
with lock:
# Modification sécurisée
variable_globale += 1Les exceptions dans les threads/processus peuvent être silencieuses. Gérez-les :
import threading
def fonction_avec_erreur():
try:
# Code qui peut échouer
resultat = 1 / 0
except Exception as e:
print(f"Erreur dans le thread: {e}")
thread = threading.Thread(target=fonction_avec_erreur)
thread.start()
thread.join() Voici un exemple réaliste qui combine les concepts vus :
import threading
import time
class TelechargeParallele:
"""Gestionnaire de téléchargements parallèles"""
def __init__(self, max_threads: int = 5):
self.max_threads = max_threads
self.resultats: list[dict] = []
self.lock = threading.Lock()
def telecharger_fichier(self, url: str):
"""Simule le téléchargement d'un fichier"""
print(f"[Début] Téléchargement de {url}")
# Simulation du téléchargement
duree = 2 # Secondes
time.sleep(duree)
# Sauvegarder le résultat de manière sécurisée
with self.lock:
self.resultats.append({
'url': url,
'statut': 'succès',
'duree': duree
})
print(f"[Terminé] {url}")
def telecharger_liste(self, urls: list[str]):
"""Télécharge une liste d'URLs en parallèle"""
threads = []
print(f"Démarrage de {len(urls)} téléchargements...")
debut = time.perf_counter()
# Créer et démarrer les threads
for url in urls:
thread = threading.Thread(target=self.telecharger_fichier, args=(url,))
threads.append(thread)
thread.start()
# Limiter le nombre de threads simultanés
if len(threads) >= self.max_threads:
threads[0].join()
threads.pop(0)
# Attendre tous les threads restants
for thread in threads:
thread.join()
duree_totale = time.perf_counter() - debut
print(f"\n✓ Tous les téléchargements terminés en {duree_totale:.2f}s")
return self.resultats
# Utilisation
if __name__ == '__main__':
urls = [
"http://example.com/fichier1.pdf",
"http://example.com/fichier2.pdf",
"http://example.com/fichier3.pdf",
"http://example.com/fichier4.pdf",
"http://example.com/fichier5.pdf",
]
telechargeur = TelechargeParallele(max_threads=3)
resultats = telechargeur.telecharger_liste(urls)
print(f"\nRésumé: {len(resultats)} fichiers téléchargés")- Threading = Idéal pour les tâches I/O (réseau, fichiers)
- Multiprocessing = Idéal pour les calculs intensifs (CPU)
- Le GIL limite le threading pour les calculs, pas pour l'I/O
- Utilisez Lock pour protéger les données partagées en threading
- Utilisez Queue pour la communication entre processus
- Pool simplifie la parallélisation de listes de tâches
concurrent.futures(ThreadPoolExecutor/ProcessPoolExecutor) est l'API de haut niveau recommandée : le même code fonctionne pour les threads et les processus- Toujours mesurer les performances avant/après parallélisation
Dans la section suivante (8.2), nous explorerons la programmation asynchrone avec asyncio, une alternative moderne au threading pour les opérations I/O.
Pour aller plus loin :
- Documentation officielle :
threadingetmultiprocessing - Approfondissez
concurrent.futures: passage deThreadPoolExecutoràProcessPoolExecutor, paramètrechunksizedemap(), annulation deFuture - Apprenez
asynciopour une approche asynchrone moderne