Home php c# sql c c++ javascript python java go android git linux asp.net django .net node.js ios xcode cocoa iphone mysql tomcat mongodb bash objective-c scala visual-studio apache elasticsearch jar eclipse jquery ruby-on-rails ruby rubygems android-studio spring lua sqlite emacs ubuntu perl docker swift amazon-web-services svn html ajax xml java-ee maven intellij-idea rvm macos unix css ipad postgresql css3 json windows-server vue.js typescript oracle hibernate internet-explorer github tensorflow laravel symfony redis html5 google-app-engine nginx firefox sqlalchemy lucene erlang flask vim solr webview facebook zend-framework virtualenv nosql ide twitter safari flutter bundle phonegap centos Register | Login | Edit Tags | New Questions | 繁体 | 简体


10 questions online user: 32

0
votes
answers
10 views
+10

Spark與AVI兼容BigQuery

1

我試圖在Hive中創建一個外部表,並使用存儲在Avro格式的Google存儲中的相同數據在BigQuery中創建另一個表。Spark與AVI兼容BigQuery

我使用的是Dataproc集羣星火2.2.0,星火的Avro 4.0.0和2.1.1蜂房

有Avro的版本/包之間的差異一樣,但如果我創建使用蜂巢表和然後我使用Spark編寫文件,我可以在Hive中看到它們。

但是對於BigQuery是不同的,它能夠讀取Hive Avro文件而不是Spark Avro文件。

錯誤:

The Apache Avro library failed to parse the header with the follwing error: Invalid namespace: .someField 

搜索一些關於錯誤,問題是,星火Avro的文件是從蜂巢/ BigQuery的Avro的文件不同。

我不知道如何解決這個問題,也許在Spark中使用不同的Avro包,但我還沒有找到哪一個與所有系統兼容。

我也想避免棘手的解決方案,如創建蜂巢臨時表和另一個使用insert into ... select * from ...我會寫很多數據的創建,我想避免這種解決方案

任何幫助將是讚賞。謝謝

+0

錯誤是「Invalid namespace:.someField」。 「.someField」是正確的全名嗎? http://avro.apache.org/docs/current/spec.html#names –

+0

這是另一個名字,但它正是其中一個字段的名稱。實際上,是一個Struct的字段數組的名稱。似乎Avro版本之間的架構定義有些不同。 –

沙发
0
1

錯誤消息由BigQuery使用的C++ Avro庫拋出。 Hive可能使用Java Avro庫。 C++庫不喜歡以「。」開頭的命名空間。

這是從庫中的代碼:

if (! ns_.empty() && (ns_[0] == '.' || ns_[ns_.size() - 1] == '.' || std::find_if(ns_.begin(), ns_.end(), invalidChar1) != ns_.end())) { 
    throw Exception("Invalid namespace: " + ns_); 
} 
+0

很高興知道,謝謝。但問題仍然存在,你知道是否有某種方法可以使Spark Avro與BigQuery Avro兼容? –

+0

你可以將命名空間更改爲不以「。」開頭嗎?然後Spark和BigQuery都應該可以讀取它。 –

+0

我想但我不能,我的領域沒有「。」起初,Spark的Avro正在寫這篇文章。「 –

板凳
0
0

想知道如果你找到了答案。

我看到了同樣的事情,我試圖將數據加載到bigquery表中。庫首先將數據以avro格式加載到GCS中。該模式也有一個結構數組,並且命名空間存在一個.

+0

我沒有找到它。目前我正在以JSON格式寫入數據,但我想在某些時候將其更改爲AVRO。 –

0
votes
answers
10 views
+10

pyspark:按值將單個RDD分割爲多個RDD

1

我無法找到答案。 如果我有RDDpyspark:按值將單個RDD分割爲多個RDD

rdd = sc.parallelize([('a', [1,2,3]), ('b',[4,5,6])]) 

其中每個值都是一個列表。 有沒有方法可以拆分RDD,使得它成爲

sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b'6)]) 

現在在哪裏每個值是列表的元素之一,搭配的關鍵。 我大致知道解決方案。我們可以先collect()

a = rdd.collect() 

,並重新分配一個RDD作爲

rdd2 = sc.parallelize([x for x in a]) 

但如果RDD巨大,collect()會非常耗時。我們必須按比例考慮它。有沒有分散的方式來做到這一點? (如使用lambda function ..等)謝謝!

沙发
0
2

它是flatMap任務:

rdd.flatMap(lambda x: [(x[0], v) for v in x[1]]).collect() 
# [('a', 1), ('a', 2), ('a', 3), ('b', 4), ('b', 5), ('b', 6)] 

這裏lambda函數需要從原來的RDD一個鍵值對和地圖的關鍵,每個單獨的值:

lamb = lambda x: [(x[0], v) for v in x[1]] 

lamb(('a', [1,2,3])) 
# [('a', 1), ('a', 2), ('a', 3)] 

flatMap將該操作映射到每個鍵值對並將結果展平。

+1

謝謝!這正是我需要的! – Hsiang

0
votes
answers
9 views
+10

如果在htacess文件中出現錯誤,Apache php fpm默認配置

0

在Apache服務器中,我有幾個安裝在FPM模式下的PHP版本。所有的正常工作與此CONF:如果在htacess文件中出現錯誤,Apache php fpm默認配置

<Directory /var/www> 
      Options Indexes FollowSymLinks Includes ExecCGI 
      AllowOverride All 
      Require all granted 
      SetHandler php7 
    </Directory> 

<IfModule mod_fastcgi.c> 
    #PHP 5 
     Action php5 /php5 
     Alias /php5 /usr/lib/cgi-bin/php5 
     FastCgiExternalServer /usr/lib/cgi-bin/php5 -socket /var/run/php/php5.6-fpm.sock -pass-header Authorization 
    #PHP 7 
     Action php7 /php7 
     Alias /php7 /usr/lib/cgi-bin/php7 
     FastCgiExternalServer /usr/lib/cgi-bin/php7 -socket /var/run/php/php7.2-fpm.sock -pass-header Authorization 
</IfModule> 

,當我在.htaccess文件中使用SetHandler,它的作品太:

SetHandler php5 

但是,如果用戶做出的.htaccess一些錯誤,比如這裏:

SetHandler php5.6 

瀏覽器返回的PHP代碼(安全破裂時):

<?php phpinfo(); ?> 

如何正確管理apache.conf的安全性,以及 如果用戶在htaccess文件中發生錯誤,只加載PHP默認版本?

我嘗試這個配置,但默認情況下只做PHP7版本。 在這裏,用戶無法改變的.htaccess文件什麼:

<FilesMatch ".+.ph(p[345]?|t|tml)$"> 
SetHandler php7 
</FilesMatch> 

感謝的求助。

沙发
0
0

現在我在默認的PHP配置中找到SetEnv設置的解決方案。 需要的阿帕奇2.4 MOD:CGI代理

a2enmod proxy_fcgi 
a2enmod macro 

工作Ubuntu服務器配置16.04(前phpfpm.conf文件。):

SetEnv PHP_VER 5 

<IfModule proxy_fcgi_module> 

    ProxyErrorOverride on 

    <FilesMatch ".ph(p[2-6]?|tml)$"> 

     <Macro PHPver $ver $cgipath> 
       <If "env('PHP_VER') == '$ver'"> 
       SetHandler '$cgipath' 
       </If> 
     </Macro> 

     #PHP VERSIONS WITH SOCKET OR HOST : 
     Use PHPver 7 'proxy:unix:/var/run/php/php7.2-fpm.sock|fcgi://localhost' 
     Use PHPver 5 'proxy:unix:/var/run/php/php5.6-fpm.sock|fcgi://localhost' 
     Use PHPver 5.4 'proxy:fcgi://localhost:9054' 
     Use PHPver 5.5 'proxy:fcgi://localhost:9055' 
     Use PHPver 5.6 'proxy:unix:/var/run/php/php5.6-fpm.sock|fcgi://localhost' 
     Use PHPver 7.0 'proxy:unix:/var/run/php/php7.0-fpm.sock|fcgi://localhost' 
     Use PHPver 7.1 'proxy:unix:/var/run/php/php7.1-fpm.sock|fcgi://localhost' 
     Use PHPver 7.2 'proxy:unix:/var/run/php/php7.2-fpm.sock|fcgi://localhost' 
     #ELSE IF (default version) : 
     SetHandler 'proxy:fcgi://localhost:9054' 

    </FilesMatch> 

</IfModule> 

現在我們可以正確地撥打.htaccess文件中的PHP版本,通過:

SetEnv PHP_VER 7.0 
0
votes
answers
8 views
+10

有人通過node.js加載測試繼續呼叫我的API服務

-1

我有一個VPS在Softlayer上託管。一些白癡繼續通過node.js負載測試調用api服務,使得每秒接近50 api的調用次數。有人通過node.js加載測試繼續呼叫我的API服務

它正在放慢我的服務器。攻擊者的IP是35.193.63.213。它本身就是一個node.js api測試程序。

如果你的API應該不爲公衆任何幫助將是明顯的

+0

聯繫主機提供商並報告他們的IP,他們可能會阻止它。 –

+0

@ this.lau_我已與他們聯繫。 IP也被封鎖,但它仍在不斷地調用API。我該怎麼辦? – vitorio

沙发
0
0

,只需添加CORS跨域限制您的應用程序的NodeJS。

先安裝CORS模塊:

npm install cors 

然後例如使用快遞,配置CORS允許只是其中服務託管的同一個域,或者是應該訪問的API域訪問。

var express = require('express') 
var cors = require('cors') 
var app = express() 

var corsOptions = { 
    origin: 'http://example.com', 
    optionsSuccessStatus: 200 // some legacy browsers (IE11, various SmartTVs) choke on 204 
} 

app.get('/products/:id', cors(corsOptions), function (req, res, next) { 
    res.json({msg: 'This is CORS-enabled for only example.com.'}) 
}) 

app.listen(80, function() { 
    console.log('CORS-enabled web server listening on port 80') 
}) 
0
votes
answers
7 views
+10

中級和根證書文件

0

我最近在我的Apache安裝上設置了SSL。當我從我的供應商處收到證書時,有兩個文件:root.crt和intermediate.crt。我的理解是,在VirtualHost中,您應該只參考中間證書。它是否正確?如果是這樣,根版本的目的是什麼?中級和根證書文件

謝謝。

沙发
0
1

您只需要在服務器中配置中間證書。不應該包含根證書。假定客戶端必須在其信任庫中具有根證書。

將它包含在認證鏈中將無意義地增加SSL握手的大小,因爲它必須被客戶端忽略

0
votes
answers
7 views
+10

Apache Spark中的分層數據處理

0

我在Spark(v2.1.1)中有一個包含分層數據的3列(如下所示)的數據集。Apache Spark中的分層數據處理

  • 我的目標的目標是增量編號分配給基礎上,父子層次的每一行。從圖形上可以說,分層數據是一個樹的集合。
  • 根據下表,我已經有基於'Global_ID'分組的行。現在我想以 的增量順序生成'Value'列,但是基於 'Parent'和'Child'列的數據層次結構。

表格表示(數值是所需的輸出):

+-----------+--------+-------+   +-----------+--------+-------+-------+ 
    |  Current Dataset  |   |  Desired Dataset (Output)  | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 
    | Global_ID | Parent | Child |   | Global_ID | Parent | Child | Value | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 
    |  111 | 111 | 123 |   |  111 | 111 | 111 |  1 | 
    |  111 | 135 | 246 |   |  111 | 111 | 123 |  2 | 
    |  111 | 123 | 456 |   |  111 | 123 | 789 |  3 | 
    |  111 | 123 | 789 |   |  111 | 123 | 456 |  4 | 
    |  111 | 111 | 111 |   |  111 | 111 | 135 |  5 | 
    |  111 | 135 | 468 |   |  111 | 135 | 246 |  6 | 
    |  111 | 135 | 268 |   |  111 | 135 | 468 |  7 | 
    |  111 | 268 | 321 |   |  111 | 135 | 268 |  8 | 
    |  111 | 138 | 139 |   |  111 | 268 | 321 |  9 | 
    |  111 | 111 | 135 |   |  111 | 111 | 138 | 10 | 
    |  111 | 111 | 138 |   |  111 | 138 | 139 | 11 | 
    |  222 | 222 | 654 |   |  222 | 222 | 222 | 12 | 
    |  222 | 654 | 721 |   |  222 | 222 | 987 | 13 | 
    |  222 | 222 | 222 |   |  222 | 222 | 654 | 14 | 
    |  222 | 721 | 127 |   |  222 | 654 | 721 | 15 | 
    |  222 | 222 | 987 |   |  222 | 721 | 127 | 16 | 
    |  333 | 333 | 398 |   |  333 | 333 | 333 | 17 | 
    |  333 | 333 | 498 |   |  333 | 333 | 398 | 18 | 
    |  333 | 333 | 333 |   |  333 | 333 | 498 | 19 | 
    |  333 | 333 | 598 |   |  333 | 333 | 598 | 20 | 
    +-----------+--------+-------+   +-----------+--------+-------+-------+ 

樹表示(期望值旁邊的每個節點表示):

     +-----+           +-----+ 
        1 | 111 |          17 | 333 | 
         +--+--+           +--+--+ 
         |             | 
     +---------------+--------+-----------------+   +----------+----------+ 
     |      |     |   |   |   | 
     +--v--+     +--v--+   +--v--+  +--v--+ +--v--+ +--v--+ 
    2 | 123 |    5 | 135 |  10 | 138 |  | 398 | | 498 | | 598 | 
     +--+--+     +--+--+   +--+--+  +--+--+ +--+--+ +--+--+ 
    +-----+-----+   +--------+--------+  |   18   19   20 
    |   |   |  |  |  | 
+--v--+  +--v--+ +--v--+ +--v--+ +--v--+ +--v--+ 
| 789 |  | 456 | | 246 | | 468 | | 268 | | 139 |     +-----+ 
+-----+  +-----+ +-----+ +-----+ +--+--+ +-----+    12 | 222 | 
    3   4   6  7  8 |  11     +--+--+ 
             +--v--+        | 
             | 321 |      +------+-------+ 
             +--+--+      |    | 
              9      +--v--+  +--v--+ 
                   13 | 987 | 14 | 654 | 
                    +--+--+  +--+--+ 
                        | 
                       +--v--+ 
                      15 | 721 | 
                       +--+--+ 
                        | 
                       +--v--+ 
                      16 | 127 | 
                       +--+--+ 

代碼段:

Dataset<Row> myDataset = spark 
       .sql("select Global_ID, Parent, Child from RECORDS"); 

JavaPairRDD<Row,Long> finalDataset = myDataset.groupBy(new Column("Global_ID")) 
    .agg(functions.sort_array(functions.collect_list(new Column("Parent").as("parent_col"))), 
     functions.sort_array(functions.collect_list(new Column("Child").as("child_col")))) 
    .orderBy(new Column("Global_ID")) 
    .withColumn("vars", functions.explode(<Spark UDF>) 
    .select(new Column("vars"),new Column("parent_col"),new Column("child_col")) 
    .javaRDD().zipWithIndex(); 


// Sample UDF (TODO: Actual Implementation) 
spark.udf().register("computeValue", 
       (<Column Names>) -> <functionality & implementation>, 
       DataTypes.<xxx>); 

經過大量的調查研究,並通過博客,許多建議去,我曾嘗試下面的方法,但無濟於事,以實現我的方案的結果。

技術堆棧:

  • Apache的火花(V2.1。1)

  • 爪哇8

  • AWS EMR集羣(火花應用部署)


數據量:

  • 大約?Dataset中

20000000點方法下的行嘗試:

  1. 星火GraphX + GraphFrames:

  2. 星火GraphX預凝膠API:


替代品的任何建議(或)在當前的方法修改將是很有益的,因爲我搞清楚這個用例的解決方案完全丟失。

感謝您的幫助!謝謝!

沙发
0
0

注意:下面的解決方案是scala spark。您可以輕鬆轉換爲Java代碼。

檢查了這一點。我試着用Spark Sql來做這件事,你可以得到一個想法。基本上的想法是在對它們進行聚合和分組的同時對孩子,父母和全球身份進行排序。一旦按globalid進行分組和排序,則展開其餘部分。你會得到有序的結果表到以後你可以zipWithIndex添加等級(值)

import org.apache.spark.sql.SQLContext 
    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.expressions.UserDefinedFunction 
    import org.apache.spark.sql.functions.udf 

    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._ 

    val t = Seq((111,111,123), (111,111,111), (111,123,789), (111,268,321), (222,222,654), (222,222,222), (222,721,127), (333,333,398), (333,333,333), (333,333,598)) 
    val ddd = sc.parallelize(t).toDF 
    val zip = udf((xs: Seq[Int], ys: Seq[Int]) => xs zip ys) 
    val dd1 = ddd 
    .groupBy($"_1") 
    .agg(sort_array(collect_list($"_2")).as("v"), 
     sort_array(collect_list($"_3")).as("w")) 
    .orderBy(asc("_1")) 
    .withColumn("vars", explode(zip($"v", $"w"))) 
    .select($"_1", $"vars._1", $"vars._2").rdd.zipWithIndex 

    dd1.collect 

輸出

res24: Array[(org.apache.spark.sql.Row, Long)] = Array(([111,111,111],0), ([111,111,123],1), ([111,123,321],2), 
([111,268,789],3), ([222,222,127],4), ([222,222,222],5), ([222,721,654],6),([333,333,333],7), ([333,333,398],8), ([333,333,598],9)) 
+0

這似乎是一個非常可行的解決方案。感謝您的代碼!我肯定會試試這個,但只是有點關注'收藏列表',因爲我的數據大約在2000萬行左右,但它應該是好的。如果您可以提供與Scala代碼完全相同的Java代碼,那就太好了,因爲我是Scala的新手。再次感謝! – Sridher

+0

@Sridher我認爲你可以很容易地將其轉換爲java代碼。這裏主要要注意的是你可以在java中複製的spark代碼。 –

+0

我將大部分代碼轉換爲與Java相當的代碼,但是面臨着您使用過的Spark UDF的一些問題。你能幫我解決UDF嗎?請參閱我編輯過的代碼片段。 – Sridher

0
votes
answers
7 views
+10

將Apache重定向到/ media/...路徑後出現Apache錯誤403

0

從我的主磁盤(安裝操作系統的位置)移動文件到第二個磁盤(其他存儲)時,我試圖將Apache重定向到新的磁盤,但是這返回error number 403。我正在運行Ubuntu 16.04。將Apache重定向到/ media/...路徑後出現Apache錯誤403

我本來重定向Apache以/home/user/Documents通過添加以下行的相應文件:

/etc/apache2/apache2.conf

<Directory /home/jawa/Documents/> 
#<Directory /media/jawa/R2D2/Webpages> 
     Options Indexes FollowSymLinks 
     AllowOverride None 
     Require all granted 
</Directory> 

/etc/apache2/sites-enabled/000-default.conf

DocumentRoot /home/jawa/Documents 
#DocumentRoot /media/jawa/R2D2/Webpages 

這工作得很好,但因爲我有將文件移動到新的位置,我還必須重定向Apache。 嘗試將Apache重定向到/media/jawa/R2D2/Webpages時,我切換上述註釋並使用sudo service apache2 restart重新啓動Apache。在瀏覽器中訪問localhost返回error 403並且使用命令cat /var/log/apache2/error.log提供了以下日誌:

[Wed Dec 27 17:24:48.583062 2017] [core:error] [pid 22343] (13)Permission denied: [client ::1:35704] AH00035: access to/denied (filesystem path '/media/jawa/R2D2') because search permissions are missing on a component of the path 

權限和這兩個文件夾的所有者:

drwxr-xr-x 2 jawa jawa 4096 dec 27 17:12 Documents 
drwxrwxrwx 1 jawa jawa  0 dec 27 16:46 Webpages 

是什麼原因造成的錯誤,怎麼能解決嗎?

+0

您能提供以下輸出: ls -ld/home// home/jawa/home/jawa/Documents ls -ld/media// media/jawa// media/jawa/R2D2/media/jawa/R2D2 /網頁 – bubbly

+0

'/ media/jawa /'權限錯誤,'chmod 755'解決了這個問題。也許你可以解釋一下,爲什麼'/ media /'和其他所有內容被設置爲755或更高,但'/ media/jawa /'不是。不要回複評論,但寫一個正式的答案,所以我可以標記回答的問題。謝謝,亞歷克斯 – Alexander

沙发
0
1

因爲我們知道一個作品和其他沒有,同時列出文件夾結構的權限:

ls -ld /home/ /home/jawa /home/jawa/Documents 
ls -ld /media/ /media/jawa/ /media/jawa/R2D2 /media/jawa/R2D2/Webpages 

一旦上市比較兩個。
默認權限由umask控制,這可能會導致權限不同。

0
votes
answers
7 views
+10

PySpark:數據幀 - 轉換結構數組

0

我有以下結構的數據幀:PySpark:數據幀 - 轉換結構數組

root 
|-- index: long (nullable = true) 
|-- text: string (nullable = true) 
|-- topicDistribution: struct (nullable = true) 
| |-- type: long (nullable = true) 
| |-- values: array (nullable = true) 
| | |-- element: double (containsNull = true) 
|-- wiki_index: string (nullable = true) 

我需要將其更改爲:

root 
|-- index: long (nullable = true) 
|-- text: string (nullable = true) 
|-- topicDistribution: array (nullable = true) 
| |-- element: double (containsNull = true) 
|-- wiki_index: string (nullable = true) 

請問我該怎麼辦呢?

非常感謝。

沙发
0
3

我認爲你正在尋找

df.withColumn("topicDistribution", col("topicDistribution").getField("values")) 
0
votes
answers
7 views
+10

pyspark RDD - 在一些指標加元組的列表

0

我有一個RDD看起來像這樣pyspark RDD - 在一些指標加元組的列表

[(3,6,7), (2,5,7), (4,3,7)] 

我想獲得的平均第一要素,以及第二個元素之和的總和第三要素。這是輸出是什麼樣子:

(3,14,21) 

是否有可能做到這一點使用pyspark?

沙发
0
3

可以轉換成數據幀,並使用groupBy

spark.version 
# u'2.2.0' 

# toy data 
rdd = sc.parallelize([(3,6,7), (2,5,7), (4,3,7)]) 
df = spark.createDataFrame(rdd,("x1", "x2", "x3")) 

(df.groupBy().avg("x1").collect()[0][0], 
df.groupBy().sum('x2').collect()[0][0], 
df.groupBy().sum('x3').collect()[0][0]) 
# (3.0, 14, 21) 

或者你可以組2個sum操作:

ave = df.groupBy().avg("x1").collect() 
sums = df.groupBy().sum("x2","x3").collect() 
(ave[0][0], sums[0][0], sums[0][1]) 
# (3.0, 14, 21) 

UPDATE(後評論):user8371915的建議導致了一個更優雅的解決方案:

from pyspark.sql.functions import avg, sum 

num_cols = len(df.columns) # number of columns 
res = df.groupBy().agg(avg("x1"), sum("x2"), sum("x3")).first() 
[res[i] for i in range(num_cols)] 
# [3.0, 14, 21] 
板凳
0
-1

是的,它可能在pyspark。您可以使用數據框功能來獲取所有這些值。請在下面嘗試。

from pyspark.sql.functions import * 

my_rdd=sc.parallelize([(3,6,7), (2,5,7), (4,3,7)]) 
df = sqlContext.createDataFrame(my_rdd,("fld1", "fld2", "fld3")) 
df.groupBy().agg(avg(col("fld1")),sum(col("fld2")),sum(col("fld3"))).rdd.collect() 

做的另一種方式:

df.registerTempTable('mytable') 
df1=sqlContext.sql("select avg(fld1), sum(fld2), sum(fld3) from mytable") 
df1.rdd.collect() 

感謝, 馬努

+0

這給'AttributeError的: '據幀' 對象有沒有屬性「avg'' – desertnaut

+0

你可以試試它的意思。它也適用於數據框中的平均值。 –

+0

它確實,但不是'sum'之後 - 請再次檢查,並且包含您生成的輸出! – desertnaut

地板
0
2

隨着RDD您可以使用與NumPy陣列和統計:

import numpy as np 

stats = sc.parallelize([(3,6,7), (2,5,7), (4,3,7)]).map(np.array).stats() 
stats.mean()[0], stats.sum()[1], stats.sum()[2] 

# (3.0, 14.0, 21.0) 
0
votes
answers
7 views
+10

Pyspark:在UDF中通過動態列

0

嘗試在UDF中逐個發送列的列表,但使用for循環但出現錯誤,即數據框未找到col_name。目前在列表list_col我們有兩列,但它可以改變。所以我想寫一個代碼,它適用於列的每一個列表。在這段代碼中,我一次連接一列的行,行的值是結構格式即列表中的列表。對於每一個空,我必須給空間。Pyspark:在UDF中通過動態列

list_col=['pcxreport','crosslinediscount'] 
    def struct_generater12(row): 
    list3 = [] 
    main_str = '' 
    if(row is None): 
     list3.append(' ') 
    else: 
     for i in row: 
      temp = '' 
      if(i is None): 
       temp+= ' ' 
      else: 
       for j in i: 
        if (j is None): 
         temp+= ' ' 
        else: 
         temp+= str(j) 
      list3.append(temp) 
    for k in list3: 
     main_str +=k 
    return main_str 


    A = udf(struct_generater12,returnType=StringType()) 
    # z = addlinterestdetail_FDF1.withColumn("Concated_pcxreport",A(addlinterestdetail_FDF1.pcxreport)) 
    for i in range(0,len(list_col)-1): 
     struct_col='Concate_' 
     struct_col+=list_col[i] 
     col_name=list_col[i] 
     z = addlinterestdetail_FDF1.withColumn(struct_col,A(addlinterestdetail_FDF1.col_name)) 
     struct_col='' 

    z.show() 
沙发
0
1

addlinterestdetail_FDF1.col_name意味着列被命名爲"col_name",你沒有訪問包含在可變col_name的字符串。

當調用在列UDF,可以

  • 直接使用其字符串名稱:A(col_name)
  • 或使用pyspark SQL函數col

    import pyspark.sql.functions as psf 
    z = addlinterestdetail_FDF1.withColumn(struct_col,A(psf.col(col_name))) 
    

你應該考慮使用pyspark sql函數進行連接,而不是編寫UDF。首先,讓我們創建一個嵌套結構的樣本數據幀:

import json 
j = {'pcxreport':{'a': 'a', 'b': 'b'}, 'crosslinediscount':{'c': 'c', 'd': None, 'e': 'e'}} 
jsonRDD = sc.parallelize([json.dumps(j)]) 
df = spark.read.json(jsonRDD) 
df.printSchema() 
df.show() 

    root 
    |-- crosslinediscount: struct (nullable = true) 
    | |-- c: string (nullable = true) 
    | |-- d: string (nullable = true) 
    | |-- e: string (nullable = true) 
    |-- pcxreport: struct (nullable = true) 
    | |-- a: string (nullable = true) 
    | |-- b: string (nullable = true) 

    +-----------------+---------+ 
    |crosslinediscount|pcxreport| 
    +-----------------+---------+ 
    |  [c,null,e]| [a,b]| 
    +-----------------+---------+ 

我們將寫有嵌套列名的字典:

list_col=['pcxreport','crosslinediscount'] 
list_subcols = dict() 
for c in list_col: 
    list_subcols[c] = df.select(c+'.*').columns 

現在我們可以「平坦」 StructType,與' '取代None,和連結:

import itertools 
import pyspark.sql.functions as psf 
df.select([c + '.*' for c in list_col]) 
    .na.fill({c:' ' for c in list(itertools.chain.from_iterable(list_subcols.values()))}) 
    .select([psf.concat(*sc).alias(c) for c, sc in list_subcols.items()]) 
    .show() 

    +---------+-----------------+ 
    |pcxreport|crosslinediscount| 
    +---------+-----------------+ 
    |  ab|    c e| 
    +---------+-----------------+ 
+0

感謝隊友其爲我工作 –

+0

@RahulKumarSingh也許你應該考慮[接受答案](https://stackoverflow.com/help/someone-answers)。 – Prem

+0

在列表中我有很多數據幀我應該如何合併一個數據幀中的所有數據幀。名單的長度不固定...................謝謝先進 –