18. diel - Java chat - Klient - Spojenie so serverom 2. časť
V minulej lekcii, Java chat - Klient - Spojenie so serverom 1. časť , sme si okrem iného vytvorili rozhranie pre klientsky komunikátor. V dnešnom Java tutoriálu ho implementujeme.
Klientský komunikátor
V balíčku service
vytvoríme novú triedu
ClientCommunicationService
a necháme ju implementovať rozhranie
IClientCommunicationService
:
public class ClientCommunicationService implements IClientCommunicationService {}
Premenné a konštanty
Do triedy pridáme nasledujúce premenné a konštanty:
private final ObjectProperty<Socket> socket = new SimpleObjectProperty<>(this, "socket", null); private final ReadOnlyObjectWrapper<ConnectionState> connectionState = new ReadOnlyObjectWrapper<>(this, "connectionState", ConnectionState.DISCONNECTED); private final HashMap<String, List<OnDataReceivedListener>> listeners = new HashMap<>(); private final StringProperty host = new SimpleStringProperty(this, "host", null); private final IntegerProperty port = new SimpleIntegerProperty(this, "port", -1); private final StringProperty connectedServerName = new SimpleStringProperty(this, "connectedServerName", null); private final ObjectProperty<ServerStatus> serverStatus = new SimpleObjectProperty<>(this, "serverStatus", ServerStatus.EMPTY); private final Queue<Request> requests = new LinkedBlockingQueue<>(); private ReaderThread readerThread; private WriterThread writerThread;
Všetky konštanty sú samopopisující, takže k ich popisu nebudem dávať
žiadny komentár. Za zmienku stojí konštanta socket
, ktorá je
zabalená do triedy ObjectProperty
. Tým získavame možnosť
pozorovať zmenu hodnoty. Zaujímavá je taky fronta requests
,
pomocou ktorej budeme realizovať komunikáciu typu
request-responce. Triedu Request
vytvoríme
neskôr. Premenné readerThread
a writerThread
budú
obsahovať čítacie a zapisovacie vlákno. Tieto premenné inicializujeme až
pri pokuse o vytvorenie nového spojenia.
Konštruktor
Konštruktor triedy nebude vyžadovať žiadny parameter. V konstruktoru sa nastaví listener na socket a vytvorí sa binding na názov servera, ktorý sa bude mať formát: "názov: port":
public ClientCommunicationService() { socket.addListener(this::socketListener); connectedServerName.bind(Bindings.createStringBinding(() -> String.format("%s:%d", host.get(), port.get()), host, port, connectionState)); }
Poslucháč zmeny stavu socketu
Vytvoríme privátne metódu socketListener()
, ktorú sme
registrovali v konstruktoru. V tejto metóde budeme inicializovať / rušiť
čítacie / zapisovacie vlákno:
private void socketListener(ObservableValue<? extends Socket> observableValue, Socket oldSocket, Socket newSocket) { if (newSocket == null) { readerThread = null; writerThread = null; unregisterMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener); return; } try { readerThread = new ReaderThread(newSocket.getInputStream(), listener, this::disconnect); writerThread = new WriterThread(newSocket.getOutputStream(), this::disconnect); readerThread.start(); writerThread.start(); registerMessageObserver(ServerStatusMessage.MESSAGE_TYPE, this.serverStatusListener); } catch (IOException e) { System.out.println("Vyskytl se problém při vytváření komunikace se serverem."); } }
Metóda sa skladá z dvoch častí. V hornej časti sa spracováva prípad,
kedy bolo z nejakého dôvodu spojenia ukončené a je potrebné odstrániť
inštancia starého čítacieho / zapisovacieho vlákna. Vo zvyšku metódy sa
predpokladá, že premenná newSocket
obsahuje nový validný
socket, ktorý sa vytvoril na základe novo vytvoreného spojenia. Vytvorí sa
nové inštancie čítacieho a zapisovacieho vlákna a metódou
start()
sa spustí. Metódu
(un)registerMessageObserver()
si vysvetlíme, až ju budeme
implementovať.
Delegácia spracovanie prijatých správ
Do triedy pridáme ďalšie inštančný konštantu, ktorá bude obsahovať anonymné funkciu starajúci sa o rozoslanie prijatých správ zaregistrovaným pozorovateľom:
private final OnDataReceivedListener listener = message -> { if (message.isResponce()) { final Request poll = requests.poll(); if (poll != null) { poll.onResponce(message); } return; } final List<OnDataReceivedListener> listenerList = listeners.get(message.getType()); if (listenerList == null) { return; } for (OnDataReceivedListener listener : listenerList) { listener.onDataReceived(message); } };
Na začiatku metódy sa pozrieme, či je prijatá správa odpoveď na nejaký
požiadavku. Ak áno, vyzdvihne sa a zavolá sa obsluha správy z frontu
requests
. Pokiaľ sa jedná o bežnú správu, získame z mapy
listeners
všetky Listener a postupne im oznámime, že majú
spracovať prijatú správu.
Prihlásenie / odhlásenie odberu správ
Ďalšie metódy, ktoré musíme podľa rozhranie implementovať, sú metódy
pre prihlásenie a odhlásenie odberu správ. Tieto metódy budú modifikovať
mapu listeners
:
@Override public synchronized void registerMessageObserver(String messageType, OnDataReceivedListener listener) { List<OnDataReceivedListener> listenerList = listeners.computeIfAbsent(messageType, k -> new ArrayList<>()); listenerList.add(listener); } @Override public synchronized void unregisterMessageObserver(String messageType, OnDataReceivedListener listener) { List<OnDataReceivedListener> listenerList = listeners.get(messageType); if (listenerList == null) { return; } listenerList.remove(listener); }
Pri registrácii poslucháča využijeme metódu
computeIfAbsent()
, ktorá sa pozrie do mapy a ak na zadanom
kľúči neexistuje hodnota, tak ju vytvoria.
Nadviazanie spojenia
Konečne sa dostávame k najdôležitejším metódam celého komunikátora.
Začneme implementáciu metódy connect()
, kde po prvýkrát
použijeme triedu CompletableFuture
:
@Override public CompletableFuture <Void> connect(String host, int port) { if (isConnected()) { throw new RuntimeException("Spojení je již vytvořeno."); } changeState(ConnectionState.CONNECTING); return CompletableFuture.supplyAsync(() -> { final Socket socket = new Socket(); try { socket.connect(new InetSocketAddress(host, port), 3000); return socket; } catch (IOException e) { return null; } }, ThreadPool.COMMON_EXECUTOR) .thenApplyAsync(socket -> { this.socket.set(socket); if (socket != null) { this.host.set(host); this.port.set(port); } else { changeState(ConnectionState.DISCONNECTED); this.host.set(null); this.port.set(-1); } if (socket == null) { throw new RuntimeException("Spojení se nepodařilo vytvořit."); } return null; }, ThreadPool.JAVAFX_EXECUTOR); }
Metóda je opäť rozdelená do logických celkov. V prvej časti sa
pozrieme, či ak sme už pripojenie na server. Ak sme pripojení, vyhodíme
výnimku. RuntimeException
nebudeme musieť ani ošetrovať, len sa
nám zapíše do konzoly. Dôležité je, že aplikácia sa neukončí. Metódou
changeState()
informujeme okolie, že sa pokúšame pripojiť k
serveru. V druhej časti metódy vytvoríme budúcnosť, v ktorej sa pokúsime
vytvoriť samotné spojenie so serverom volaním metódy
socket.connect
. Konštantou ThreadPool.COMMON_EXECUTOR
nastavíme, že pripojenie sa má vykonať v samostatnom vlákne. Ak sa
úspešne spojíme so serverom, vrátime socket. Metódou
thenApplyAsync()
"transformujeme" socket na výsledok. V tretej
časti uložíme socket volaním príkazu this.socket.set(socket)
.
Tým sa mimojiné zavolá changeListener
a vytvorí / odstráni sa
čítacie a zapisovacie vlákno. Celá tretia časť sa musí odohrať v JavaFX
vlákne, pretože na niektoré pozorovateľné konštanty neskôr navešiame
grafické komponenty, ktoré ako som už hovoril minule, sa môžu aktualizovať
iba v JavaFX vlákne, inak sa vyhodí výnimka.
Ukončenie spojenia
Spojenie budeme ukončovať metódou disconnect()
. Metóda bude
mať za úlohu riadne ukončiť čítacie a zapisovacie vlákno:
public CompletableFuture<Boolean> disconnect() { if (!isConnected()) { return CompletableFuture.completedFuture(false); } return CompletableFuture.supplyAsync(() -> { try { socket.get().close(); readerThread.shutdown(); writerThread.shutdown(); } catch (IOException e) { e.printStackTrace(); return false; } return true; }, ThreadPool.COMMON_EXECUTOR) .thenApplyAsync(success -> { if (success) { this.socket.set(null); changeState(ConnectionState.DISCONNECTED); } return success; }, ThreadPool.JAVAFX_EXECUTOR); }
Ak sa úspešne ukončí spojenie, tak sa pomocou príkazu
this.socket.set(null)
vymaže čítacie a zapisovacie vlákno a
komunikátor sa dostane do stavu DISCONNECTED
.
Odosielanie správ
Správy budeme odosielať dvojakého typu:
- bez čakania na výsledok
- s čakaním na výsledok
Metóda bez čakania na výsledok bude veľmi jednoduchá. Vezme správu, zašle ju zapisovacímu vláknu ao nič viac sa nestará:
public synchronized void sendMessage(IMessage message) { if (writerThread != null) { writerThread.addMessageToQueue(message); } }
Odoslanie správy s čakaním na výsledok má jeden problém, ktorý musíme vyriešiť. Tým problémom je ono čakanie na odpoveď zo servera:
public synchronized CompletableFuture<IMessage> sendMessageFuture(IMessage message) { return CompletableFuture.supplyAsync(() -> { sendMessage(message); return null; }) .thenCompose(ignored -> { Request request = new Request(); requests.add(request); return request.getFuture(); }); }
Metóda, ktorá bude odosielať správu s odpoveďou, bude vracať
budúcnosť, v ktorej odpoveď príde. Najskôr sa štandardne odošle správa a
potom sa zavolá nová metóda thenCompose()
. Táto metóda vlastne
hovorí, že výsledok budúcnosti získame z inej inštancie triedy
CompletableFuture
. Túto inú inštanciu získame volaním metódy
getFuture()
nad inštancií triedy Request
, ktorú si
za okamih nadefinujeme.
Request-response správy
Vytvoríme podpornú triedu, ktorá nám zabezpečí čakanie na odpoveď zo
servera. Vytvoríme teda novú triedu Request
a všetko by naraz
malo začať dávať zmysel:
class Request { private final Object lock = new Object(); private boolean waiting = true; private IMessage responce; CompletableFuture<IMessage> getFuture() { return CompletableFuture.supplyAsync(() -> { while (waiting) { synchronized (lock) { try { lock.wait(); } catch (InterruptedException ignored) {} } } return responce; }); } void onResponce(IMessage message) { this.responce = message; waiting = false; synchronized (lock) { lock.notify(); } } }
Trieda má iba dve metódy: getFuture()
a
onResponce()
. Prvá metóda vytvorí budúcnosť, v ktorej sa
vlákno uspí volaním metódy wait()
. Jediný, kto môže túto
budúcnosť prebudiť, je metóda onResponce()
, ktorá je
zavolaná, keď je prijatá odpoveď zo servera. Týmto jednoduchým trikom
vytvoríme dojem komunikácie request-response.
Na koniec len doplním implementáciu zvyšných metód, ktoré vyžaduje rozhranie:
@Override public ConnectionState getConnectionState() { return connectionState.get(); } @Override public ReadOnlyObjectProperty<ConnectionState> connectionStateProperty() { return connectionState.getReadOnlyProperty(); } @Override public String getConnectedServerName() { return connectedServerName.get(); }
To by pre túto dlhšiu lekciu všetko. Nabudúce, v lekcii Java chat - Klient - Spojenie so serverom 3. časť , už nadviažeme spojenie so serverom.
Mal si s čímkoľvek problém? Stiahni si vzorovú aplikáciu nižšie a porovnaj ju so svojím projektom, chybu tak ľahko nájdeš.
Stiahnuť
Stiahnutím nasledujúceho súboru súhlasíš s licenčnými podmienkami
Stiahnuté 14x (120.98 kB)
Aplikácia je vrátane zdrojových kódov v jazyku Java