7. diel - Java server - Zapisovacia vlákno
V minulej lekcii, Java server - Client dispatcher , sme sa zaoberali čakacie frontom s klientmi. Dnes implementujeme odosielanie správ klientovi pomocou samostatného vlákna. Dosiahneme tak pocitu lepšie odozvy servera.
Zapisovacie vlákno
Až doteraz sme posielali správy zo servera vo vlákne, ktoré práve spracovávalo prichádzajúce dáta. Tomu sa hovorí synchrónne spracovania. Problém v tomto prípade je, že sa jedná o I / O operáciu, čo je časovo náročná činnosť. Oveľa lepšie riešenie je spracovávať všetky tieto činnosti v samostatnom vlákne. My si za týmto účelom vytvoríme vlákno, ktoré bude mať jedinú úlohu: odoslať správy zo servera klientovi.
V balíčku core
vytvoríme nový balík writer
, v
ktorom budeme vytvárať triedy týkajúce sa zapisovacieho vlákna. Opäť
začneme vytvorením rozhrania, ktorým zadefinujeme metódy. Vytvoríme
rozhranie IWriterThread
a necháme ho dediť od
rozhrania IThreadControl
. Rozhranie bude mať
jedinú metódu sendMessage()
. Metóda bude prijímať dva
parametre: ObjectOutputStream
, do ktorého sa
správa zapíše, a Object
, ktorý je samotná
správa:
public interface IWriterThread extends IThreadControl { void sendMessage(ObjectOutputStream writer, Object message); }
Ďalej vytvoríme triedu WriterThread
, ktorá
bude dediť z triedy Thread
a implementovať
vyššie definované rozhranie:
public class WriterThread extends Thread implements IWriterThread { }
V triede si zadefinujeme dve inštančný konštanty a dve premenné:
private final Semaphore semaphore = new Semaphore(0); private final Queue<QueueTuple> messageQueue = new ConcurrentLinkedQueue<>(); private boolean working = false; private boolean interrupt = false;
Princíp tu prítomného semafore je rovnaký ako v predchádzajúcej lekcii.
Vlákno bude na semafore spať, kým nedostane signál, že má pracovať.
Fronta správ messageQueue
obsahuje správy, ktoré sa budú
posielať. Zatiaľ je typová na neznámy typ QueueTuple
, ktorý si
vzápätí vytvoríme. Premenná working
hovorí, či ak vlákno
pracuje, alebo spí na semafore. Premenná interrupt
má rovnaký
účel ako v predchádzajúcich lekciách, kedy sme vytvárali triedy, ktoré
dedia od triedy Thread
.
Konštruktor triedy necháme tentoraz prázdny. Len za účelom lepšej
orientácie dáme vláknu lepší názov. V konstruktoru zavoláme konštruktor
triedy Thread
s jedným preťažením, ktoré ako
parameter berie názov vlákna:
public WriterThread() { super("WriterThread"); }
Metódy vlákna
Rozhranie nám hovorí, že musíme implementovať dve metódy:
sendMessage()
a shutdown()
. Pozrieme sa na prvý z
nich. Odoslanie správy nebude nič iné, než pridanie správy do frontu správ
a v prípade, že vlákno nepracuje, tak ho prebudíme a nastavíme príznak
working
na hodnotu true
:
@Override public void sendMessage(ObjectOutputStream writer, Object message) { messageQueue.add(new QueueTuple(outputStream, message)); if (!working) { working = true; semaphore.release(); } }
Metóda shutdown()
bude rovnaká ako v minulej lekcii:
@Override public void shutdown() { interrupt = true; semaphore.release(); try { join(); } catch (InterruptedException ignored) {} }
Nastavíme príznak interrupt
na hodnotu
true
, uvoľníme semafor a počkáme, až sa
zapisovacej vlákno ukončí.
Výkonný kód vlákna
Teraz implementujeme samotný kód, ktorý sa postará o odoslaní správ v
našom vlákne. Metóda run()
vyzerá nasledovne:
@Override public void run() { while(!interrupt) { while(messageQueue.isEmpty() && !interrupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} } working = true; while(!messageQueue.isEmpty()) { final QueueTuple entry = messageQueue.poll(); try { entry.writer.writeLine(entry.message); entry.writer.flush(); } catch (IOException e) { e.printStackTrace(); } } working = false; } }
Kód je veľmi podobný tomu, čo sme implementovali u client Dispatcher.
Máme tu nekonečnú slučku závisiaci na premenné interrupt
.
Nasleduje ďalšia nekonečná slučka pre čakanie vlákna na semafore. Prečo
je čakanie vlákna v slučke sme rozoberali opäť v minulej lekcii. V ďalšej
slučke sa iteruje cez front správ a postupne sa odošlú všetky správy,
ktoré sa nahromadili. Keď vo fronte nie sú žiadne správy, ide vlákno
opäť odpočívať na semafor.
Zostáva už len nadefinovať triedu
QueueTuple
. Ide o jednoduchú prepravku, ktorá
bude obsahovať dve konštanty. Nie je náhoda, že tieto konštanty sa
získajú v metóde sendMessage()
. Kód celej triedy je tu:
private static final class QueueTuple { final Object message; final ObjectOutputStream writer; private QueueTuple(ObjectOutputStream writer, Object message) { this.message = message; this.writer = writer; } }
Triedu umiestnite do triedy WriterThread
,
pretože nikde inde ju používať nebudeme.
Prepojenie logiky
V triede ConnectionManager
vytvoríme
inštančný konštantu zapisovacieho vlákna, ktorú budeme inicializovať v
konštruktory z parametra:
private final IWriterThread writerThread; public ConnectionManager(IClientDispatcher clientDispatcher, IWriterThread writerThread, ExecutorService pool, int maxClients) { this.clientDispatcher = clientDispatcher; this.writerThread = writerThread; this.pool = pool; this.maxClients = maxClients; }
Spustenie vlákna vykonáme v metóde onServerStart()
v
správcovi spojení hneď po zapnutí client Dispatcher:
writerThread.start();
Ukončenie vlákna vykonáme opäť v metóde onServerStop()
po
vypnutí client Dispatcher:
writerThread.shutdown();
Ďalej musíme upraviť triedu Client
, aby mala
prístup k zapisovacímu vláknu. Vytvoríme teda aj v triede
Client
inštančný konštantu zapisovacieho
vlákna, ale budeme ju inicializovať v konstruktoru, ktorý ju získa ako
parameter:
Client(Socket socket, IWriterThread writerThread) throws IOException { this.socket = socket; writer = new ObjectOutputStream(socket.getOutputStream()); this.writerThread = writerThread; }
Vytvorenie inštancie triedy Client
pri
prichádzajúcom spojení nechám na láskavom čitateľovi.
Nakoniec musíme upraviť továreň na výrobu správcu spojenie, pretože
sme opäť upravili konštruktor triedy
ConnectionManager
. V továrni pridáme novú
inštančný konštantu typu IWriterThread
, ktorú
budeme inicializovať v konštruktory pomocou parametra:
private final IWriterThread writerThread; @Inject public ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory, IWriterThread writerThread) { this.clientDispatcherFactory = clientDispatcherFactory; this.writerThread = writerThread; }
Vytvorenie inštancie by už nemalo robiť problémy:
return new ConnectionManager(clientDispatcherFactory.getClientDispatcher(waitingQueueSize), writerThread, pool, maxClients);
To by bolo z dnešnej lekcie všetko. Funkčnosť sme nezmenili, iba vylepšili odozvu servera. Nabudúce, v lekcii Java server - Komunikačný protokol , navrhneme komunikačný protokol.
Mal si s čímkoľvek problém? Stiahni si vzorovú aplikáciu nižšie a porovnaj ju so svojím projektom, chybu tak ľahko nájdeš.
Stiahnuť
Stiahnutím nasledujúceho súboru súhlasíš s licenčnými podmienkami
Stiahnuté 19x (138.86 kB)
Aplikácia je vrátane zdrojových kódov v jazyku Java