Przewodnik po Java ExecutorService

Przegląd

ExecutorService jest API JDK, które upraszcza wykonywanie zadań w trybie asynchronicznym. Ogólnie rzecz biorąc, ExecutorService automatycznie dostarcza pulę wątków i API do przypisywania do niej zadań.

Dalsza lektura:

Przewodnik po frameworku Fork/Join w Javie

Wprowadzenie do frameworka fork/join zaprezentowanego w Javie 7 oraz narzędzi pomagających przyspieszyć przetwarzanie równoległe poprzez próbę wykorzystania wszystkich dostępnych rdzeni procesora.
Czytaj więcej →

Overview of the java.util.concurrent

Odkrycie zawartości pakietu java.util.concurrent.
Czytaj więcej →

Przewodnik po java.util.concurrent.Locks

W tym artykule poznajemy różne implementacje interfejsu Lock oraz nowo wprowadzoną w Javie 9 klasę StampedLock.
Czytaj więcej →

Instytucja ExecutorService

2.1. Metody fabryczne klasy Executors

Najprostszym sposobem na utworzenie ExecutorService jest użycie jednej z metod fabrycznych klasy Executors.

Na przykład, poniższa linia kodu utworzy pulę wątków z 10 wątkami:

ExecutorService executor = Executors.newFixedThreadPool(10);

Istnieje kilka innych metod fabrycznych, aby utworzyć predefiniowaną usługę ExecutorService, która spełnia określone przypadki użycia. Aby znaleźć najlepszą metodę dla swoich potrzeb, należy zapoznać się z oficjalną dokumentacją Oracle.

2.2. Bezpośrednie tworzenie ExecutorService

Ponieważ ExecutorService jest interfejsem, można użyć instancji dowolnej jego implementacji. Istnieje kilka implementacji do wyboru w pakiecie java.util.concurrent, można też stworzyć własną.

Na przykład klasa ThreadPoolExecutor posiada kilka konstruktorów, których możemy użyć do skonfigurowania usługi executora i jego wewnętrznej puli:

ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());

Możesz zauważyć, że powyższy kod jest bardzo podobny do kodu źródłowego metody fabrycznej newSingleThreadExecutor(). Dla większości przypadków, szczegółowa ręczna konfiguracja nie jest konieczna.

Przypisywanie zadań do ExecutorService

ExecutorService może wykonywać zadania Runnable i Callable. Aby zachować prostotę, w tym artykule użyjemy dwóch prymitywnych zadań. Zauważ, że używamy tutaj wyrażeń lambda zamiast anonimowych klas wewnętrznych:

Runnable runnableTask = () -> { try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { e.printStackTrace(); }};Callable<String> callableTask = () -> { TimeUnit.MILLISECONDS.sleep(300); return "Task's execution";};List<Callable<String>> callableTasks = new ArrayList<>();callableTasks.add(callableTask);callableTasks.add(callableTask);callableTasks.add(callableTask);

Możemy przypisać zadania do ExecutorService używając kilku metod, w tym execute(), która dziedziczy po interfejsie Executor, a także submit(), invokeAny() oraz invokeAll().

Metoda execute() jest pusta i nie daje żadnej możliwości uzyskania wyniku wykonania zadania ani sprawdzenia jego statusu (czy jest uruchomione):

executorService.execute(runnableTask);

submit() przekazuje zadanie Callable lub Runnable do ExecutorService i zwraca wynik typu Future:

Future<String> future = executorService.submit(callableTask);

invokeAny() przypisuje kolekcję zadań do usługi ExecutorService, powodując uruchomienie każdego z nich i zwraca wynik pomyślnego wykonania jednego zadania (jeśli nastąpiło pomyślne wykonanie):

String result = executorService.invokeAny(callableTasks);

invokeAll() przypisuje kolekcję zadań do usługi ExecutorService, powodując uruchomienie każdego z nich i zwraca wynik wykonania wszystkich zadań w postaci listy obiektów typu Future:

List<Future<String>> futures = executorService.invokeAll(callableTasks);

Zanim przejdziemy dalej, musimy omówić jeszcze dwa elementy: zamykanie ExecutorService i radzenie sobie z typami zwrotnymi Future.

Zamykanie ExecutorService

Ogólnie, ExecutorService nie zostanie automatycznie zniszczony, gdy nie będzie żadnego zadania do przetworzenia. Pozostanie przy życiu i będzie czekać na nowe zadania do wykonania.

W niektórych przypadkach jest to bardzo pomocne, np. gdy aplikacja musi przetwarzać zadania, które pojawiają się nieregularnie lub ilość zadań nie jest znana w czasie kompilacji.

Z drugiej strony, aplikacja może osiągnąć swój koniec, ale nie zostać zatrzymana, ponieważ oczekująca usługa ExecutorService spowoduje, że JVM będzie nadal działać.

Aby poprawnie zamknąć usługę ExecutorService, mamy API shutdown() i shutdownNow().

Metoda shutdown() nie powoduje natychmiastowego zniszczenia usługi ExecutorService. Sprawi ona, że ExecutorService przestanie przyjmować nowe zadania i wyłączy się po tym, jak wszystkie działające wątki zakończą swoją bieżącą pracę:

executorService.shutdown();

Metoda shutdownNow() próbuje natychmiast zniszczyć ExecutorService, ale nie gwarantuje, że wszystkie uruchomione wątki zostaną zatrzymane w tym samym czasie:

List<Runnable> notExecutedTasks = executorService.shutDownNow();

Ta metoda zwraca listę zadań, które czekają na przetworzenie. Do dewelopera należy decyzja, co zrobić z tymi zadaniami.

Jednym z dobrych sposobów na zamknięcie usługi ExecutorService (który jest również zalecany przez Oracle) jest użycie obu tych metod w połączeniu z metodą awaitTermination():

executorService.shutdown();try { if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow();}

Przy takim podejściu, usługa ExecutorService najpierw przestanie przyjmować nowe zadania, a następnie będzie czekać do określonego czasu na zakończenie wszystkich zadań. Jeśli ten czas upłynie, wykonanie zostanie natychmiast zatrzymane.

Interfejs Future

Metody submit() i invokeAll() zwracają obiekt lub kolekcję obiektów typu Future, co pozwala nam uzyskać wynik wykonania zadania lub sprawdzić status zadania (czy jest uruchomione).

Interfejs Future udostępnia specjalną blokującą metodę get(), która zwraca rzeczywisty wynik wykonania zadania typu Callable lub null w przypadku zadania typu Runnable:

Wywołanie metody get(), gdy zadanie jest jeszcze uruchomione, spowoduje zablokowanie wykonania do momentu, gdy zadanie zostanie poprawnie wykonane i wynik będzie dostępny.

Przy bardzo długim blokowaniu spowodowanym przez metodę get(), wydajność aplikacji może ulec pogorszeniu. Jeśli dane wynikowe nie są kluczowe, można uniknąć tego problemu poprzez użycie timeoutów:

String result = future.get(200, TimeUnit.MILLISECONDS);

Jeśli czas wykonania jest dłuższy niż określony (w tym przypadku 200 milisekund), zostanie rzucony wyjątek TimeoutException.

Możemy użyć metody isDone(), aby sprawdzić, czy przypisane zadanie zostało już przetworzone, czy nie.

Interfejs Future umożliwia również anulowanie wykonania zadania za pomocą metody cancel() oraz sprawdzenie anulowania za pomocą metody isCancelled():

boolean canceled = future.cancel(true);boolean isCancelled = future.isCancelled();

Interfejs ScheduledExecutorService

Serwis ScheduledExecutorService uruchamia zadania po pewnym predefiniowanym opóźnieniu i/lub cyklicznie.

Ponownie, najlepszym sposobem na zainicjowanie ScheduledExecutorService jest użycie metod fabrycznych klasy Executors.

W tym rozdziale, użyjemy ScheduledExecutorService z jednym wątkiem:

ScheduledExecutorService executorService = Executors .newSingleThreadScheduledExecutor();

Aby zaplanować wykonanie pojedynczego zadania po ustalonym opóźnieniu, użyj metody scheduled() klasy ScheduledExecutorService.

Dwie metody scheduled() pozwalają na wykonywanie zadań typu Runnable lub Callable:

Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);

Metoda scheduleAtFixedRate() pozwala nam uruchamiać zadanie cyklicznie po ustalonym opóźnieniu. Powyższy kod opóźnia przez jedną sekundę przed wykonaniem callableTask.

Poniższy blok kodu uruchomi zadanie po początkowym opóźnieniu 100 milisekund. A następnie będzie uruchamiał to samo zadanie co 450 milisekund:

Future<String> resultFuture = service .scheduleAtFixedRate(runnableTask, 100, 450, TimeUnit.MILLISECONDS);

Jeśli procesor potrzebuje więcej czasu na wykonanie przypisanego zadania niż parametr period metody scheduleAtFixedRate(), ScheduledExecutorService poczeka aż bieżące zadanie zostanie zakończone przed rozpoczęciem kolejnego.

Jeżeli konieczne jest wprowadzenie opóźnienia o stałej długości pomiędzy iteracjami zadania, należy użyć metody scheduleWithFixedDelay().

Na przykład, poniższy kod zagwarantuje 150-milisekundową przerwę pomiędzy zakończeniem bieżącego wykonania a rozpoczęciem kolejnego:

service.scheduleWithFixedDelay(task, 100, 150, TimeUnit.MILLISECONDS);

Zgodnie z umowami metod scheduleAtFixedRate() i scheduleWithFixedDelay(), okresowe wykonywanie zadania zakończy się w momencie zakończenia działania usługi ExecutorService lub jeśli podczas wykonywania zadania zostanie rzucony wyjątek.

ExecutorService vs Fork/Join

Po wydaniu Javy 7 wielu programistów zdecydowało się na zastąpienie frameworka ExecutorService frameworkiem fork/join.

Nie zawsze jest to jednak słuszna decyzja. Pomimo prostoty i częstego wzrostu wydajności związanego z fork/join, zmniejsza on kontrolę dewelopera nad współbieżnym wykonywaniem.

ExecutorService daje deweloperowi możliwość kontrolowania liczby generowanych wątków i ziarnistości zadań, które powinny być uruchamiane przez oddzielne wątki. Najlepszym przypadkiem użycia ExecutorService jest przetwarzanie niezależnych zadań, takich jak transakcje lub żądania według schematu „jeden wątek dla jednego zadania.”

Z kolei, zgodnie z dokumentacją Oracle, fork/join został zaprojektowany do przyspieszenia pracy, która może być rekurencyjnie podzielona na mniejsze części.

Podsumowanie

Pomimo względnej prostoty ExecutorService, istnieje kilka często spotykanych pułapek.

Podsumujmy je:

Podtrzymywanie przy życiu nieużywanej ExecutorService: Zobacz szczegółowe wyjaśnienie w sekcji 4, jak zamknąć usługę ExecutorService.

Nieprawidłowa pojemność puli wątków podczas korzystania z puli wątków o stałej długości: Bardzo ważne jest określenie, ilu wątków będzie potrzebowała aplikacja do wydajnego wykonywania zadań. Zbyt duża pula wątków spowoduje niepotrzebny narzut tylko po to, aby utworzyć wątki, które w większości będą w trybie oczekiwania. Zbyt mała liczba wątków może sprawić, że aplikacja będzie wydawała się nieresponsywna z powodu długiego oczekiwania na zadania w kolejce.

Wywołanie metody get() Future po anulowaniu zadania: Próba uzyskania wyniku już anulowanego zadania wywołuje wyjątek CancellationException.

Niespodziewanie długie blokowanie za pomocą metody get() metody Future: Powinniśmy używać timeoutów, aby uniknąć nieoczekiwanego oczekiwania.

Jak zawsze, kod do tego artykułu jest dostępny w repozytorium GitHub.

Zacznij przygodę ze Spring 5 i Spring Boot 2, dzięki kursowi Learn Spring:

>> CHECK OUT THE COURSE

Zapoznaj się z kursem Spring 5 i Spring Boot 2.

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *