6. diel - Java server - Client dispatcher
V minulej lekcii, Java server - Správca spojenia , sme implementovali správcu spojenia. Dnes sa pozrieme na správu klientov, ktorí boli presunutí do čakacej fronty, pretože boli naplnené maximálnej kapacity servera.
Client dispatcher
Trieda bude mať jednoduchá úloha. Bude sa snažiť udržať aktívne spojenie medzi serverom a klientom. V definovanom intervale bude prechádzať jednotlivé klientov v čakacej fronte a odošle im (zatiaľ) jednoduchú textovú správu s informáciou o počte klientov vo fronte. V prípade, že sa správu nepodarí doručiť, ukončí sa spojenie s klientom a klient sa vyradí z čakacej fronty. Celý tento proces sa bude diať len, ak vo fronte budú nejakí klienti.
Funkcie
Funkcia som už opísal v odseku vyššie, teraz si ich prehľadne spíšeme v bodoch:
- vloženie klienta do fronty
- získanie klienta z frontu
- otázku, či je vo fronte klient
V balíčku core
založíme nový balíček s názvom
dispatcher
, v ktorom vytvoríme rozhraní
IClientDispatcher
. Rozhranie bude definovať
funkcie Dispatcher:
public interface IClientDispatcher extends IThreadControl { boolean hasClientInQueue(); Client getClientFromQueue(); boolean addClientToQueue(Client client); }
Rozhranie dedí od IThreadControl
, aby sme
mohli ovládať vlákno, v ktorom dispatcher pobeží.
Implementácia triedy
Triedu, ktorá bude implementovať rozhranie, nazveme jednoducho
ClientDispatcher
, necháme ju dediť od triedy
Thread
a implementovať rozhranie
IClientDispatcher
:
public class ClientDispatcher extends Thread implements IClientDispatcher
Nadefinujeme si jednu triedny konštantu, ktorá bude reprezentovať interval, v ktorom sa bude opakovať komunikácia s klientmi vo fronte:
private static final int SLEEP_TIME = 5000;
Nasledujú inštančné premenné:
private boolean interupt = false; private final Semaphore semaphore = new Semaphore(0); private final Queue<Client> waitingQueue = new ConcurrentLinkedQueue<>(); private final Collection<Client> clientsToRemove = new ArrayList<>(); private final int waitingQueueSize;
Premenná interrupt
bude riadiť vlákno. Kým bude mať hodnotu
false
, vlákno bude bežať. Semafor tu bude mať riadiacu funkciu.
Kým vo fronte nebudú žiadni klienti, vlákno bude na semafore čakať.
Akonáhle sa pripojený klient dostane do fronty, vlákno prejde cez semafor a
bude robiť svoju prácu. Po odobratí všetkých klientov z frontu sa vlákno
opäť uspia na semafore. Nasledujú dve kolekcie. Vo waitingQueue
sa budú uchovávať klienti a clientsToRemove
bude obsahovať
klientov, ktorí ukončili spojenie a je potrebné ich odstrániť z frontu.
Premenná waitingQueueSize
obsahuje maximálny počet klientov vo
fronte.
Konštruktor triedy bude vyžadovať jediný parameter. Bude to práve maximálny počet klientov, ktorí budú čakať v rade:
public ClientDispatcher(int waitingQueueSize) { this.waitingQueueSize = waitingQueueSize; }
Implementácia funkcií
Začneme implementovať metódy z rozhrania
IClientDispatcher
:
@Override public boolean hasClientInQueue() { return !waitingQueue.isEmpty(); } @Override public Client getClientFromQueue() { return waitingQueue.poll(); } @Override public boolean addClientToQueue(Client client) { if (waitingQueue.size() < waitingQueueSize) { waitingQueue.add(client); semaphore.release(); return true; } return false; }
Prvé dve metódy majú implementáciu jednoduchú a netreba ich komentovať.
U funkcie pre pridanie klienta do fronty musíme najskôr zistiť, či ak front
pojme ďalšieho klienta. Pokiaľ ho pojme, uvoľní sa semafor a vráti sa
true
, inak sa vráti false
a nič viac sa nestane.
Výkonný kód vlákna
Teraz implementujeme najdôležitejšie metódu, run()
, v ktorej
sa bude odohrávať všetka logika:
@Override public void run() { while(!interupt) { while(waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} } if (interupt) { clientsToRemove.addAll(waitingQueue); } else { final int count = waitingQueue.size(); waitingQueue.iterator().forEach(client -> { try { client.writer.write(("count: " + count + "\n").getBytes()); client.writer.flush(); } catch (IOException e) { clientsToRemove.add(client); } }); } waitingQueue.removeAll(clientsToRemove); for (Client client : clientsToRemove) { client.close(); } clientsToRemove.clear(); try { Thread.sleep(SLEEP_TIME); } catch (InterruptedException ignored) {} } }
Ako prvý sa nadefinuje nekonečná slučka, ktorá bude závislá na
premenné interupt
. Nasleduje ďalšia slučka, ktorá bude
závislá na semafore. Vždy je lepšie mať čakanie na semafore v slučke než
v jednej podmienke. Rozdiel medzi kódom:
if (waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} }
a kódom:
while (waitingQueue.isEmpty() && !interupt) { try { semaphore.acquire(); } catch (InterruptedException ignored) {} }
je síce iba v jednom slove (if
a
while
), ale významovo je rozdiel veľký. Počas
čakania na semafore sa môže vyvolať ona
InterruptedException
výnimka. Ak by sme mali
čakanie na semafore pomocou if
, tak by vlákno
začalo zbytočne vykonávať výkonný kód. Preto je dôležité čakať na
semafore v slučke. V slučke sa kontrolujú dve veci:
- obsadenosť fronty
- príznak interupt
Ak sa vlákno vzbudí na semafore a front bude prázdna, alebo príznak
interupt
bude false
, tak sa vlákno opäť uspí. Je
dôležité, aby bol príznak interupt
prítomný. Inak by sme
nemohli ukončiť vlákno pri vypínaní servera.
Keď opustíme čakanie vlákna na semafore, nasleduje vyhodnotenie, či ak sa bude vlákno ukončovať alebo nie. Pokiaľ má nastať ukončenie vlákna, tak sa všetci klienti z frontu vloží to kolekcia pre odstránenie klientov z frontu. V prípade štandardného priechodu sa pošle všetkým klientom jednoduchá správa. Ak sa správu nepodarí doručiť, vloží sa klient do kolekcie pre odstránenie klientov z frontu, pretože klient najskôr neudržal spojenie.
Vo finále sa ukončí spojenie so všetkými užívateľmi, ktorí boli v kolekcii na odstránenie klientov, a počká sa definovaný čas a celý cyklus začne od začiatku.
Ukončenie vlákna
Nakoniec implementujeme metódu shutdown()
, ktorú nám
predpisuje rozhranie IThreadControl
:
@Override public void shutdown() { interupt = true; semaphore.release(); try { join(); } catch (InterruptedException ignored) { } }
V tejto metóde urobíme tri veci:
- nastavíme príznak
interupt
natrue
- uvoľníme semafor
- počkáme, až sa vlákno ukončí
Uvoľnením semafore sa spustí vlákno Dispatcher. Vďaka tomu, že sme
nastavili príznak interupt
na true
,
pridajú sa všetci klienti z frontu na zoznam adeptov pre ukončenie spojenia.
Po ukončení spojenia s klientmi a odobratie z frontu sa už nesplní podmienka
v nekonečnej slučke a vlákno sa bezpečne ukončí.
Prepojenie logiky
V druhej časti dnešného článku použijeme client dispatcher v triede
ConnectionManager
. Pre začiatok pridáme novú
triednu konštantu typu IClientDispatcher
a do
konstruktoru triedy ConnectionManager
parameter
rovnakého typu, ktorým konštantu inicializujeme:
public ConnectionManager(IClientDispatcher clientDispatcher, ExecutorService pool, int maxClients) { this.clientDispatcher = clientDispatcher; this.pool = pool; this.maxClients = maxClients; }
Ďalej dokončíme implementáciu metódy
insertClientToListOrQueue()
. Upravíme
connectionClosedListener
tak, aby sa server
pokúsil získať z frontu čakajúceho klienta a pridal ho medzi aktívnych
klientov:
client.setConnectionClosedListener(() - > { clients.remove(client); if (clientDispatcher.hasClientInQueue()) { this.insertClientToListOrQueue(clientDispatcher.getClientFromQueue()); } });
Namiesto druhého TODO naimplementujeme vloženie klienta do fronty:
if (!clientDispatcher.addClientToQueue(client)) {
client.close();
}
Tu využijeme návratovej hodnoty metódy addClientToQueue()
,
ktorá vráti true
, ak klienta vloží do fronty. Ak je fronta
plná, vráti false
a ako reakciu na plnú front odpojíme
klienta.
Teraz je potrebné už len spustiť vlákno Dispatcher. O spustení sa
postaráme v metóde onServerStart()
triedy
ConnectionManager
, kde zavoláme:
clientDispatcher.start();
V metóde onServerStop()
ukončíme Dispatcher:
clientDispatcher.shutdown();
Nad Dispatcher zavoláme metódu shutdown()
, ktorou sa nastaví
príznak a prebudí vlákno. Po chvíli sa vlákno Dispatcher ukončí.
Nakoniec vytvoríme továreň na výrobu Dispatcher a zaregistrujeme ju.
Vytvoríme teda rozhranie
IClientDispatcherFactory
, ktoré bude mať jedinú
metódu getClientDispatcher()
, ktorá bude v parametri prijímať
maximálny počet klientov vo fronte.
public interface IClientDispatcherFactory { IClientDispatcher getClientDispatcher(int waitingQueueSize); }
Implementácia tohto rozhrania bude veľmi jednoduchá. Vytvoríme teda
triedu ClientDispatcherFactory
, ktorá bude toto
rozhranie implementovať a implementujeme jedinú metódu
getClientDispatcher()
:
public class ClientDispatcherFactory implements IClientDispatcherFactory { @Override public IClientDispatcher getClientDispatcher(int waitingQueueSize) { return new ClientDispatcher(waitingQueueSize); } }
Továreň zaregistrujeme v triede ServerModule
obvyklým
spôsobom:
bind(IClientDispatcherFactory.class).to(ClientDispatcherFactory.class);
Všetko je takmer v poriadku, až na továreň správcu spojenie
ConnectionManagerFactory
. Triede
ConnectionManager
sme zmenili signatúru
konstruktoru pridaním parametra typu
IClientDispatcher
. Vytvoríme teda v tejto
továrni novú inštančný konštantu typu
IClientDispatcherFactory
. Túto konštantu bude
dostávať továreň správcu spojenia v konstruktoru:
private final IClientDispatcherFactory clientDispatcherFactory; @Inject ConnectionManagerFactory(IClientDispatcherFactory clientDispatcherFactory) { this.clientDispatcherFactory = clientDispatcherFactory; }
Teraz nám nič nebráni upraviť metódu
getConnectionManager()
. Pre vytvorenie novej inštancie správcu
spojenie využijeme práve továreň client Dispatcher:
return new ConnectionManager(clientDispatcherFactory.getClientDispatcher( waitingQueueSize), pool, maxClients);
Tým by sme mali hotovú správu klientov čakajúcich vo fronte. Nabudúce, v lekcii Java server - Zapisovacia vlákno , vytvoríme vlákno, ktoré bude asynchrónne odosielať správy zo servera ku klientom.
Mal si s čímkoľvek problém? Stiahni si vzorovú aplikáciu nižšie a porovnaj ju so svojím projektom, chybu tak ľahko nájdeš.
Stiahnuť
Stiahnutím nasledujúceho súboru súhlasíš s licenčnými podmienkami
Stiahnuté 21x (156.7 kB)
Aplikácia je vrátane zdrojových kódov v jazyku Java