Ultra-détaillé! SparkStreaming Article détaillé sur la façon d'intégrer Kafka! Peut être pratiqué avec le code

Source | bactéries Alice

Zebian | Carol

Photo de couverture | RPSC télécharger la Chine visuelle

Produit | RPSC (ID: CSDNnews)

Je crois que beaucoup de petits partenaires ont contacté SparkStreaming, la théorie ne dit pas trop, aujourd'hui vise principalement à mettre à l'intégration de Kafka est SparkStreaming tutoriel.

Ce document contient le code, les amis intéressés peuvent essayer de mains réplicats!

Kafka examen

Avant le début officiel, passons en revue une vague de Kafka.

  • Le concept de base illustration

courtier: Services d'installation de la machine Kafka est un courtier

producteur: producteur de message, responsable de l'écriture des données au courtier (push) consommation: Nouvelles des consommateurs, responsable de la prise de données (pull) de kafka traction, l'ancienne version de la nécessité des consommateurs à compter ZK, la nouvelle version ne nécessite pas sujet: Thème, est l'équivalent d'une catégorie de données, les données stockées dans différentes entreprises dans différents sujet - le sujet: Differentiated Services La réplication: Une copie des données stockées nombre de copies (pour assurer que les données ne sont pas perdues) - Copie: sécurité des données partition: Partition est une partition physique, une partition est un fichier, un sujet peut avoir 1 ~ n partitions, chaque partition a sa propre copie - Subdivision: lecture et écriture concurrente Groupe consommateurs: Les groupes de consommateurs, un sujet peut avoir plusieurs consommateurs / groupes de consommateurs en même temps, plus les consommateurs si un groupe de consommateurs, ils ne peuvent pas être données répétées de consommation - groupes de consommateurs: la vitesse d'augmentation des dépenses de consommation, la gestion unifiée pratique Remarque: Sujet peut être plus qu'un abonnement à la consommation ou d'un groupe, un consommateur / groupe peut également vous abonner aux sujets multiples Remarque: Lire les données ne peuvent être lues à partir du chef, les données d'écriture ne peuvent être écrites au leader, Suiveur viendra faire une copie des données Synchronisez Leader de là! ! !

  • Common Commandes

Démarrer kafka

/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties

arrêt kafka

/export/servers/kafka/bin/kafka-server-stop.sh

Afficher des informations sur le sujet

/export/servers/kafka/bin/kafka-topics.sh --list --zookeeper node01: 2181

rubrique Création

/export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01: 2181 --replication-facteur 3 --partitions 3 essais --topic

Afficher des informations sur un sujet

/export/servers/kafka/bin/kafka-topics.sh --describe --zookeeper node01: 2181 Test --topic

Supprimer le sujet

/export/servers/kafka/bin/kafka-topics.sh --zookeeper node01: 2181 Test --delete --topic

producteurs départ - les producteurs sont généralement utilisés pour tester la console

/export/servers/kafka/bin/kafka-console-producer.sh --broker liste node01: 9092 --topic spark_kafka

Début de consommateurs - les consommateurs sont généralement utilisés pour tester la console

/export/servers/kafka/bin/kafka-console-consumer.sh --zookeeper node01: 2181 --topic spark_kafka - de-début

Les consommateurs connectés à l'adresse Borker

/export/servers/kafka/bin/kafka-console-consumer.sh --bootstrap-serveur node01: 9092, node02: 9092, node03: 9092 --topic spark_kafka --from-début

L'intégration KAFKA deux modes expliqués

Il est également un visage de questions hot spot.

Développement, nous utilisons souvent SparkStreaming données en temps réel est lu et traité dans kafka, après la version de spark1.3, kafkaUtils qui fournit deux façons de créer DSTREAM de:

1, le mode de réception du récepteur:

  • KafkaUtils.createDstream (Sans développement, nous pouvons comprendre, mais l'entretien peut se demander).
  • run récepteur comme Exécuteur résident des tâches en attente dans les données, mais une faible efficacité du récepteur, la nécessité d'ouvrir plusieurs, puis fusionner manuellement les données (syndicat), puis traitées, beaucoup de problèmes

  • Récepteur machine qui a raccroché, les données peuvent être perdues, il est donc nécessaire d'ouvrir WAL (de WAL) pour assurer la sécurité des données, l'efficacité réduit!

  • Le récepteur est relié par l'intermédiaire zookeeper file d'attente Kafka, les appels API Kafka d'ordre supérieur, le décalage stockée dans le zookeeper, maintenu par le récepteur.

  • la consommation d'étincelle de temps afin d'assurer que les données ne sont pas perdues dans le Checkpoint gardera une copie de l'offset, les données peuvent sembler incohérent

  • Donc, peu importe de quel point de vue, le mode récepteur ne sont pas adaptés pour une utilisation dans le développement, il a été éliminé

2, connexion directe directe

  • KafkaUtils.createDirectStream (Utilisé dans le développement, nécessaires à la maîtrise)
  • Le mode direct est directement connecté aux partitions kafka d'acquisition de données, chaque donnée de partition en lecture directement à partir de la capacité grandement améliorée de parallèle
  • Direct Invoke API de bas niveau Kafka (API sous-jacente), offset leur propre stockage et la maintenance, par défaut par le poste de contrôle de maintenance Spark, éliminant ainsi incompatible avec ZK
  • Bien sûr, vous pouvez posséder la maintenance manuelle, l'existence mysql offset, en Redis
  • Il peut être utilisé dans le développement basé sur le mode direct et le mode direct par les caractéristiques de fonctionnement manuel + des données précises exactement une fois de plus

Résumé:

  • mode de réception Récepteur
  • Récepteur accepte le risque de multiples rendement élevé de données, mais il y a des données manquantes

  • Activer le journal (WAL) pour éviter la perte de données, mais la faible efficacité de l'écriture deux fois les données.

  • Zookeeper sauvegarde des données des consommateurs compensent la duplication possible.

  • Utiliser l'API de haut niveau

    • Direct Direct Connect
  • Récepteur non utilisé, directement à la lecture de kafka partition de données

  • Ne pas utiliser le mécanisme log (WAL)

  • Spark maintenir leur propre décalage

  • Utilisez API de bas niveau

  • Extended: A propos de la sémantique du message

    Remarque:

    le développement Kafka SparkStreaming et de l'intégration en deux versions: 0,8 et 0.10+

    Il récepteur version 0.8 et le mode direct (version 0.8 mais plus l'environnement de production ne sont pas pris en charge dans la version 0.8 après Spark2.3 a).

    0,10 Après avoir retenu que le mode direct (mode Reveiver ne convient pas pour les environnements de production), et la version 0.10 du changement API (plus puissant)

    conclusion:

    Notre apprentissage et le développement directement en utilisant la version 0.10 du modèle direct, mais sur la différence entre le récepteur direct et quand l'entrevue devait être en mesure de répondre en

    étincelle streaming kafka-0-8 (comprendre)

    1.Receiver

    récepteurs KafkaUtils.createDstream utilisés pour recevoir des données, l'utilisation des niveaux élevés de consommation Kafka api, compensée par le récepteur de l'entretien, les données reçues pour tous les récepteurs seront enregistrés dans Huissiers Spark, suivi par Spark travail en continu pour commencer le traitement de ces données, seront perdues par défaut, pour activer le journal WAL, il est synchronisé avec les données reçues stockées sur un système de fichiers distribué tels que HDFS. Assurez-vous que les données en cas d'erreur peuvent être récupérés. Bien que cette approche peut être combiné avec le mécanisme WAL pour assurer aucune perte et une grande fiabilité des données, mais l'efficacité a permis WAL sera plus faible, et ne peut pas garantir que les données sont traitées une fois et une seule fois, peut être traitée deux fois. Parce qu'il ne peut pas être synchronisés entre le Spark et ZooKeeper.

    (Approche maintenant intégrée officiellement n'est pas recommandé.)

    • préparations

    1) groupe Zookeeper Démarrer

    zkServer.sh début

    2) Démarrer kafka groupe

    kafka-server-start.sh /export/servers/kafka/config/server.properties

    3. Créer un sujet

    kafka-topics.sh --create --zookeeper node01: 21811 --partitions --replication facteur 3 --topic spark_kafka

    4. Envoyer un message à l'interpréteur de commandes par thème

    kafka-console-producer.sh --broker liste node01: 9092 --topic spark_kafka

    5. Ajouter la kafka dépendante pom

    < dépendance > < groupId > org.apache.spark < / GroupId > < artifactId > étincelle streaming kafka-0-8_2.11 < / ArtifactId > < version > 2.2.0 < / version > < / dépendance >
    • API

    Acquis par le récepteur dans le récepteur des données de sujet kafka peuvent fonctionner en parallèle récepteur plus lit le sujet kafak de données, ici 3

     val receiverDStream: immutable.IndexedSeq = (1 à 3) .map (x = >  { flux val: ReceiverInputDStream = KafkaUtils.createStream (ssc, zkQuorum, groupId, sujets) ruisseau })

    Si la WAL (spark.streaming.receiver.writeAheadLog.enable = true) peut être réglé pour activer le niveau de stockage (par défaut StorageLevel.MEMORY_AND_DISK_SER_2)

    code démontre

    org.apache.spark.streaming.dstream d'importation. {DSTREAM, ReceiverInputDStream} org.apache.spark.streaming.kafka.KafkaUtils d'importation org.apache.spark.streaming d'importation. {secondes, StreamingContext} org.apache.spark d'importation. {SparkConf, SparkContext} importation scala.collection.immutable objet SparkKafka { def principaux (args: Array): Unité = { 1 // Création StreamingContext config val: SparkConf = nouvelle SparkConf.setAppName ( "SparkStream"). setMaster ( "local") .set ( "spark.streaming.receiver.writeAheadLog.enable", "true") // ouvert WAL WAL pour assurer la fiabilité de la source de données val sc = new SparkContext (config) sc.setLogLevel ( "WARN") val ssc = new StreamingContext (sc, secondes (5)) ssc.checkpoint ( "./ kafka") // ============================================== // 2. Préparer les paramètres de configuration val zkQuorum = "node01: 2181, node02: 2181, node03: 2181" val groupId = "étincelle" val = sujets Plan ( "spark_kafka" - >  2) // 2 représentent chacun le sujet correspond à partition utilise deux fils de passer, RDD kafka des partitions et des partitions // ssc le sujet n'est pas le même, ce qui augmente le nombre de threads de consommation, ne pas augmenter le nombre de traitement parallèle des étincelles de données // 3. Kafka acquis par le récepteur de récepteur de données sujet peut fonctionner plus récepteur parallèle lit le sujet kafak de données, ici 3 val receiverDStream: immutable.IndexedSeq = (1 à 3) .map (x = >  { flux val: ReceiverInputDStream = KafkaUtils.createStream (ssc, zkQuorum, groupId, sujets) ruisseau }) // 4. méthode de l'Union utilisé, tout le réceptacle récepteur DSTREAM sont combinés pour produire val allDStream: DSTREAM = ssc.union (receiverDStream) // 5. Les données acquises sujet (String, String) Représentation de chaîne d'un nom de sujet et deuxième données représentatives de sujet Chaîne val données: DSTREAM = allDStream.map (_._ 2) // ============================================== //6.WordCount mots val: DSTREAM = data.flatMap (. _ split ( "")) val wordAndOne: DSTREAM = words.map ((_, 1)) Résultat val: DSTREAM = wordAndOne.reduceByKey (+ _ _) result.print ssc.start ssc.awaitTermination } }

    2.Direct

    mode direct vérifiera périodiquement le sujet de la partition correspondante kafka les dernières données de décalage traitées dans chaque lot, puis à l'intérieur d'une plage de décalage, plage de lecture API Spark en invoquant simple des données de consommation kafka .

    • carence directe ne repose pas les outils de surveillance de kafka de Zookeeper

    • La comparaison directe approche fondée sur le récepteur présente plusieurs avantages:

  • Simplifier parallèle

    Kafka besoin de créer plusieurs flux d'entrée, et leur union, sparkStreaming sera créé et le nombre de partitions kafka RDD nombre de partitions différentes, et de lire les données du kafka parallèle, étincelle RDD le nombre de partitions et les partitions kafka les données sont relation un à un.

  • efficace

    Récepteur atteindre zéro perte de données sont les données précédemment stockées dans WAL, les données sont copiées à nouveau, elle se traduira par des données copiées deux fois, le premier étant copié kafka, et une fois écrit au WAL. Le Direct ne pas utiliser ce WAL élimine problème.

  • Exactement une fois exactement la sémantique (-once-sémantique)

    les données du récepteur est lu par kafka kafka api de haut niveau pour compenser Zookeeper d'écriture, bien que cette méthode peut enregistrer les données dans le WAL assurer que les données ne sont pas perdues, mais l'écart peut être enregistré car le sparkStreaming et ZK décalage les données de cause à consommer plusieurs fois.

  • Direct-les exactement une fois sémantique (EOS) kafka obtenus par api bas niveau, seul le décalage est stocké dans le poste de contrôle ssc, ce qui élimine les incohérences et ssc ZK problèmes de décalage.

    • API
    KafkaUtils.createDirectStream (ssc, kafkaParams, sujets)

    code démontre

    importation kafka.serializer.StringDecoder org.apache.spark.streaming.dstream d'importation. {DSTREAM, InputDStream} org.apache.spark.streaming.kafka.KafkaUtils d'importation org.apache.spark.streaming d'importation. {secondes, StreamingContext} org.apache.spark d'importation. {SparkConf, SparkContext} SparkKafka2 objet { def principaux (args: Array): Unité = { 1 // Création StreamingContext config val: SparkConf = nouvelle SparkConf.setAppName ( "SparkStream"). setMaster ( "local") val sc = new SparkContext (config) sc.setLogLevel ( "WARN") val ssc = new StreamingContext (sc, secondes (5)) ssc.checkpoint ( "./ kafka") // ============================================== // 2. Préparer les paramètres de configuration val kafkaParams = Carte ( "metadata.broker.list" - >  "Node01: 9092, node02: 9092, node03: 9092", "group.id" - >  "Spark") sujets val = Set ( "spark_kafka") val allDStream: InputDStream = KafkaUtils.createDirectStream (ssc, kafkaParams, sujets) // 3. Le sujet des données acquises val données: DSTREAM = allDStream.map (_._ 2) // ============================================== // WordCount mots val: DSTREAM = data.flatMap (. _ split ( "")) val wordAndOne: DSTREAM = words.map ((_, 1)) Résultat val: DSTREAM = wordAndOne.reduceByKey (+ _ _) result.print ssc.start ssc.awaitTermination } }

    étincelle streaming kafka-0-10

    • explication

    étincelle streaming kafka-0-10 la version, il y a quelques modifications de l'API, un fonctionnement plus souple, utilisé dans le développement

    • pom.xml
    < ! - < dépendance > < groupId > org.apache.spark < / GroupId > < artifactId > étincelle streaming kafka-0-8_2.11 < / ArtifactId > < version > $ {} Spark.version < / version > < / dépendance > - > < dépendance > < groupId > org.apache.spark < / GroupId > < artifactId > étincelle streaming kafka-0-10_2.11 < / ArtifactId > < version > $ {} Spark.version < / version > < / dépendance >
    • API:

    • rubrique Création
    /export/servers/kafka/bin/kafka-topics.sh --create --zookeeper node01: 2181 --replication facteur 3 --partitions 3 --topic spark_kafka
    • Producteur début

    /export/servers/kafka/bin/kafka-console-producer.sh --broker liste node01: 9092, node01: 9092, node01: 9092 --topic spark_kafka
    • code démontre

    importation org.apache.kafka.clients.consumer.ConsumerRecord importation org.apache.kafka.common.serialization.StringDeserializer org.apache.spark.streaming.dstream d'importation. {DSTREAM, InputDStream} org.apache.spark.streaming.kafka010 d'importation. {ConsumerStrategies, KafkaUtils, LocationStrategies} org.apache.spark.streaming d'importation. {secondes, StreamingContext} org.apache.spark d'importation. {SparkConf, SparkContext} objet SparkKafkaDemo { def principaux (args: Array): Unité = { 1 // Création StreamingContext //spark.master devrait être défini comme local, n >  1 val = new SparkConf.setAppName conf ( "wc"). setMaster ( "local") val sc = new SparkContext (conf) sc.setLogLevel ( "WARN") val ssc = new StreamingContext (sc, secondes (5)) // 5 représente 5 secondes pour former un RDD de données de segmentation // prêt à se connecter les paramètres de Kafka val kafkaParams = Carte ( "Bootstrap.servers" - >  "Node01: 9092, node02: 9092, node03: 9092", "Key.deserializer" - >  classof, "Value.deserializer" - >  classof, "Group.id" - >  "SparkKafkaDemo", // plus tôt: Lorsque décalage a été soumis dans le cadre du district, du début des dépenses de décalage soumis, aucune compensation lorsqu'il est soumis, la consommation de zéro En aucun données de décalage lorsqu'il est soumis, un nouveau consommateur créé la partition, quand il est décalé soumis dans le cadre du district, par rapport aux dépenses de démarrage soumis offset: // dernière // none: sujet quand il y a district décalage a été soumis, le décalage par rapport au début du consommateur, tant qu'il est une partition n'existe décalage soumis pas, une exception est levée // cette dernière configuration automatiquement remis à zéro décalage du dernier offset, qui est, s'il y a une position décalée décalée par rapport au début de la consommation, il n'y a pas de décalage aux données de nouvelles dépenses de démarrage "Auto.offset.reset" - >  « Les dernières », // faux moyens fermés automatiquement soumis. Par l'étincelle pour vous aider à soumettre programmeur Checkpoint ou manuel d'entretien "Enable.auto.commit" - >  (Faux: java.lang.Boolean) ) sujets val = Array ( "spark_kafka") // 2. Kafak connexion de données acquises à l'aide KafkaUtil val recordDStream: InputDStream = KafkaUtils.createDirectStream (ssc, LocationStrategies.PreferConsistent, // emplacement stratégie Source fortement recommandé d'utiliser cette stratégie, faire spark Exécuteur et même la correspondance de Kafka Broker ConsumerStrategies.Subscribe (sujets, kafkaParams)) // stratégie de consommateur, le code source est fortement recommandé d'utiliser cette stratégie // 3. Valeur d'acquisition de données val lineDStream: DSTREAM = recordDStream.map (. _ valeur) // _ fait référence ConsumerRecord val wrodDStream: DSTREAM = lineDStream.flatMap // _ fait référence aux cheveux sur la valeur, à savoir, une ligne de données (_ split ( "").) val wordAndOneDStream: DSTREAM = wrodDStream.map ((_, 1)) Résultat val: DSTREAM = wordAndOneDStream.reduceByKey (+ _ _) result.print ssc.start // ouvert ssc.awaitTermination // attendre arrêt élégant } }

    Eh bien, le processus d'intégration SparkStreaming Kafka dans ce chapitre montrent pour expliquer et vous amène à revoir les bases d'une vague de Kafka, si utile pour vous, les mains mal à la main le point d'une « observation », il ~

    Cet article par les auteurs à partir RPSC Blog, le lien d'origine:

    https://blog.csdn.net/weixin_44318830/article/details/105612516

    chiffre d'affaires flambée de 30 ans: d'une communauté libre à une entreprise de plusieurs milliards de dollars

    comprendre l'une des plus grandes réalisations de l'IA: limitations convolution réseau de neurones

    GitHub a joué 10000 +, projet de haut niveau Apache ShardingSphere de The Open Road

    HKUST Académicien interrogation future Zheng Guangting, a révélé les dernières applications et la pratique de l'intelligence artificielle

    intelligents d'exploitation et d'entretien des défis en grande promotion: comment Ali résista les « doubles 11 chats fin »?

    Ethernet Place 2.0 Jeu de garde et mettre en uvre MPC

    très difficile pour vous d'écrire neuf questions face de MySQL, nous vous recommandons de la collection!

    08h00 demain soir, pour vous Secret « chirurgie de greffe de visage AI »: la technologie de base et de l'application des images vidéo fixes ou derrière la greffe du visage
    Précédent
    Cette minute de traitement d'un milliard de noeuds de calcul dans le graphique de Platon, maintenant comment?
    Prochain
    15 avril Changzhou transactions immobilières 348 ensembles de logements de seconde main un chiffre d'affaires total de 175 jeux
    Hardcover appartement chèque sac, le talent Xiamen ici béni! Plus un super avantages multi-investissement
    oie herbe | HyunA / Jennie / Lisa, avec qui vous l'argent à ongles sélectionner?
    Enseignez-vous la beauté La bonne façon d'ouvrir les cardigans en tricot, cliquez-moi
    Je suis allé porter la saison Mary Jane, s'il vous plaît ont joué Emma Roberts et Alexa Chung
    Arc-en-fart | Elsa original, les haricots frères et Ken sont les amateurs de bijoux vintage
    États-Unis pour vous enseigner | Lorsque la mise en feu de l'élément papillon, pas une fille est innocente
    Speak vraiment | Je veux essayer et essayer à nouveau Zhang Yixing, ce succès?
    Cravate Pour Son ouverture Shanghai Lujiazui L + MALL magasin phare, en passant l'esthétique de la mode actuelle de la vie urbaine
    Planter des oies graminées Pourquoi les accessoires pour cheveux sont-ils de plus en plus «bon marché»?
    DOUX MONSTER X Jennie nouvelle série de fantasy listés JENTLE HOME
    Acteur Lee maintenant, par leur rôle compléter « slash » Vie