Home php c# sql c c++ javascript python java go android git linux asp.net django .net node.js ios xcode cocoa iphone mysql tomcat mongodb bash objective-c scala visual-studio apache elasticsearch jar eclipse jquery ruby-on-rails ruby rubygems android-studio spring lua sqlite emacs ubuntu perl docker swift amazon-web-services svn html ajax xml java-ee maven intellij-idea rvm macos unix css ipad postgresql css3 json windows-server vue.js typescript oracle hibernate internet-explorer github tensorflow laravel symfony redis html5 google-app-engine nginx firefox sqlalchemy lucene erlang flask vim solr webview facebook zend-framework virtualenv nosql ide twitter safari flutter bundle phonegap centos Register | Login | Edit Tags | New Questions | 繁体 | 简体


10 questions online user: 37

0
votes
answers
11 views
+10

Scala toString函數

-2

我有一個類,它有一個打印出字符串的方法。我想重寫toString函數,以便將字符串打印爲t | h | i | s。我怎麼做?Scala toString函數

+2

你應該真的提供更多的細節,或許你已經有了一些代碼。 – SergGr

沙发
0
0

我要解釋你的問題是「有回報字符串的方法」,而不是「有打印出一個字符串的方法」,因爲我只是猜測這就是你的意思,你問的問題將是一個更難的問題。

這裏是一個類Foo它有一個方法bar,返回一個字符串。類Foo覆蓋toString方法,使其返回值bar|字符穿插。

scala> class Foo(val bar: String) { 
    | override def toString: String = bar.mkString("|") 
    | } 
defined class Foo 

scala> val x = new Foo("this") 
x: Foo = t|h|i|s 

scala> x.bar 
res0: String = this 
+0

@jwvh正確的你,更新。 –

0
votes
answers
9 views
+10

Pureconfig類型安全配置與層次的根密鑰

0

我有以下application.conf並試圖找出定義我的課班上最好的方式來加載配置:Pureconfig類型安全配置與層次的根密鑰

allKeys { 
    mysql { 
    dev { 
     host = <host1> 
     user = <user1> 
    } 
    prod { 
     host = <host1> 
     user = <user1> 
    } 
    hdfs { 
    endpoint = <host1> 
    port = <port1> 
    } 
} 

my case classes: 
    case class Settings(mysql: DbSettings, hdfs: HdfsSettings) 
    case class DbSettings(host: String, user: String) 
    case class HdfsSettings(endpoint: String, port: String) 

我在遇到問題的認識如何正確加載,以便它在hdfs中查找類似的鍵時不會失敗。

沙发
0
3

您需要定義您的案例類以適應配置結構。

case class HdfsConfig(endpoint: String, port: Int) 
case class DbConfig(host: String, user: String) 
case class MySqlConfig(dev: DbConfig, prod: DbConfig) 
case class AllConfigs(mysql: MySqlConfig, hdfs: HdfsConfig) 

case class MyConfig(allKeys: AllConfigs) 

現在你可以閱讀這些作爲,

loadConfig[MyConfig](conf) 
+0

完美。謝謝。 – horatio1701d

0
votes
answers
8 views
+10

如何模擬上下文(ActorContext)?

0

我正在使用TestKit來測試Akka Actor。我有一個Demo類,它有方法getActorRef,它將輸入作爲字符串並返回一個ActorRef。如何模擬上下文(ActorContext)?

class Demo @Inject()(a: A. b: B, context: ActorContext) { 
    def getActorRef(id: String): ActorRef 
} 

我在創建Demo.Now對象時嘲笑了A,B,我正面臨着如何模擬上下文的問題。

我做了什麼嘲笑它? val context = mock [ActorContext]

但它沒有奏效。

+0

是否必須嘲笑?爲什麼不使用akka測試包來提供'ActorContext'?類似於https://stackoverflow.com/questions/36945414/how-do-i-supply-an-implicit-value-for-an-akka-stream-materializer-when-sending-a/36985091#36985091 –

沙发
0
0

目前尚不清楚你想要做什麼,或者遇到什麼錯誤。我是猜測您正在使用ActorContextgetActorRef內部獲得ActorRef。如果是這樣,你是否嘗試過使用Mockito來創建一個模擬ActorContext,當你在測試中構造它時將它傳遞到Demo,然後將所調用的ActorContext方法截掉,以便它返回你想要的值(例如,測試探頭,還是一些這樣的東西)?

我希望能夠工作,假設這就是你想要做的。

0
votes
answers
8 views
+10

Scala中Tuple成員的重新排序

0

如何重新排列元組的成員?我有2元組的列表如下Scala中Tuple成員的重新排序

((115,vp,London,1001),(2,ZIP1,ZIP2)) 

我要重新排序的元組作爲

((vp,London), (115,1001,2,ZIP1,ZIP2)) 
+2

文字vp,倫敦總是會出現在相同的位置?如果是的話,那很簡單。你只需要編寫一個地圖功能! – sparkr

+0

是的,他們將處於相同的位置。我對scala很陌生,所以很難爲此寫一個函數。你能指出一個教程或一個例子嗎? – BobCoder

沙发
0
2

因此,基於你的假設,即數組的元素出現在相同的位置,你可以請執行以下操作:

val tpl = Seq(((115,"vp","London",1001),(2,"ZIP1","ZIP2"))) 

tpl.map { 
    case (elem1, elem2) => ((elem1._2, elem1._3), (elem1._1, elem1._4, elem2._1, elem2._2, elem2._3)) 
} 

更好的方法是使用case類而不是像這樣的元組!您可以將元組的元素放入案例類中,然後根據需要映射它!

0
votes
answers
7 views
+10

Akka Streams KillSwitch在alpakka jms

1

我有一個場景,我開始使用alpakka多個jmsSource(對於不同的隊列)。我還需要在任何時候卸下隊列。所以我已經添加KillSwitch到jms阿卡流,如下所示: -Akka Streams KillSwitch在alpakka jms

trait MessageListener { 

    lazy val jmsPipeline = jmsSource 
    .map { x => log.info(s"Received message ${x} from ${queue}"); x } 
    .viaMat(KillSwitches.single)(Keep.right) 
    .toMat(Sink.foreach { x => pipelineActorRef ! PreProcessorMessage(x) }) 
    (Keep.both) 
    .run() 

    def start(): Unit = { 
      log.info("Invoking listener : {}", queue) 
      jmsPipeline 
      log.info("listener : {} started", queue) 
      } 
    def stop():Unit =  jmsPipeline._1.shutdown() 

    def queue: String 

} 

object ListenerA extends MessageListener { 
    override def queue: String = "Queue_A" 
} 

object ListenerB extends MessageListener { 
    override def queue: String = "Queue_B" 
} 

..等等。

啓動應用程序後,所有的隊列連接並正常工作。但是,當我嘗試使用停止方法分離隊列時,並非所有隊列都斷開連接並且行爲是隨機的。我還檢查了killSwitch對所有聽衆都不同。

有人可以告訴我這裏有什麼問題嗎?

沙发
0
0

您的日誌支持您連接到具有不同流的多個隊列的錯覺,但是您有多個可能連接到同一隊列的流。在這兩個監聽器對象中,記錄器都會記錄覆蓋的queue名稱,但該隊列名稱不用於配置jmsSource

您沒有顯示jmsSource的定義;顯然它是在MessageListener性狀之外的某處定義的,在這種情況下,ListenerAListenerB都使用相同的jmsSource。換言之,而ListenerAListenerB具有jmsPipeline不同實例(這就是爲什麼殺開關是不同的),這兩個jmsPipeline實例由相同jmsSource實例衍生(除非jmsSourcedef即在每次調用創建一個不同的Source ,但即使情況如此,基本問題仍然存在:queue未在配置中使用)。

在Alpakka,JMS隊列上JmsSourceSettings配置,所以jmsSource可能看起來像下面這樣:

val jmsSource: Source[String, NotUsed] = JmsSource.textSource(
    JmsSourceSettings(connectionFactory).withBufferSize(10).withQueue("MyQueue") 
)      // the queue is configured here^

ListenerA.start(),例如,被調用時,以下記錄:

Invoking listener : Queue_A 
listener : Queue_A started 

再次,在以上日誌語句中的"Queue_A"ListenerA中被重寫的def queue: String成員的值;它不一定是在jmsSource(上例中的"MyQueue")中實際配置的隊列。與ListenerB以及您在map組合器中登錄的消息一樣。

一個簡單的解決方法是把的jmsSource及其JmsSourceSettings的定義MessageListener特質內部和這些設置實際使用queue

0
votes
answers
6 views
+10

重定向stdout在另一個線程

1

我想寫一個測試,將重定向的主要方法stdout,但似乎一旦我調用主,它似乎開始在另一個線程,我無法捕獲輸出。下面是代碼:重定向stdout在另一個線程

這工作:

val baos = new ByteArrayOutputStream 
val ps = new PrintStream(baos) 
System.setOut(ps) 
print("123") 
Assert.assertEquals("123", baos.toString) 

這不:

val baos = new ByteArrayOutputStream 
val ps = new PrintStream(baos) 
System.setOut(ps) 
GameRunner.main(_) 
Assert.assertEquals("123", baos.toString) 

....

object GameRunner { 
    def main(args: Array[String]) { 
     print("123") 

我如何能趕上在調用print我的測試?

*我也嘗試scala.Console.setOut

編輯

我也注意到,運行GameRunner.main(_)甚至沒有列出任何控制檯時,我不重定向。這是什麼造成的?

沙发
0
2

print確實是Predef.print,它調用Console.print。即使您致電System.setOut我不知道這是否對Console.print有影響。嘗試致電Console.setOut或嘗試:

Console.withOut(ps)(GameRunner.main(null)) 

另一種可能性是,通過調用GameRunner.main(_)你不執行任何(因爲可能它只是返回函數(args: Array[String]) => GameRunner.main(args)要快快的裁決? 。出

編輯沒錯:

scala> object A { def main(args: Array[String]) { println("1") } } 
defined module A 
scala> A.main(null) 
1 
scala> A.main(_) 
res1: Array[String] => Unit = <function1> 
+0

正如我所編輯,'Console.setOut'沒有工作,但是,'Console.withOut'工作 – 2013-05-05 02:59:02

+0

右鍵,在第二個帳戶,也有一次,我使用main(null)而不是main(_)調用,然後運行,我猜我曾以爲這會採取類似行動,因爲我不使用參數...我將不得不進一步瞭解這一點:)非常感謝! – 2013-05-05 03:01:18

0
votes
answers
6 views
+10

單獨的數據庫或新表

0

有我的應用程序,例如不同的模塊 -單獨的數據庫或新表

  1. 內容管理
  2. 費用管理和支付
  3. 調度管理

是什麼爲上述場景創建架構的最佳方法 -

  1. 有單獨的表
  2. 有獨立的DB每個

什麼是可擴展性,代碼的可維護性和企業的角度而言的優勢/劣勢?

沙发
0
1

根據以下問題做出決定。

這些不同的數據集有多相關。

你想對數據做什麼?它是出於某種目的(如分析)還是僅通過某些應用程序通過Web提供內容?

現在aws或gcp本身爲內容管理和調度提供了相當多的服務,您可能需要根據自己的數據庫設置和所有內容自行構建費用管理。

0
votes
answers
6 views
+10

轉化油滑流數據,並使用阿卡的Http

1

目的是從數據庫流數據,在這個組塊數據的執行一些計算(該計算返回一些情況下類的未來)發送分塊響應,併發送該數據作爲分塊的響應給用戶。目前,我能夠流式傳輸數據併發送響應,而無需執行任何計算。但是,我無法執行此計算,然後傳輸結果。轉化油滑流數據,並使用阿卡的Http

這是我實施的路線。

def streamingDB1 = 
path("streaming-db1") { 
    get { 
    val src = Source.fromPublisher(db.stream(getRds)) 
    complete(src) 
    } 
} 

函數getRds返回映射到案例類(使用光滑)的表的行。現在考慮功能計算這需要每一行作爲輸入並返回另一個案例類的未來。喜歡的東西

def compute(x: Tweet) : Future[TweetNew] = ? 

我如何能實現可變SRC此功能,並將其發送給計算用戶的分塊響應(如流)。

沙发
0
1

你可以使用變換的mapAsync來源:

val src = 
    Source.fromPublisher(db.stream(getRds)) 
     .mapAsync(parallelism = 3)(compute) 

complete(src) 

調整並行的水平需要。

+0

這是行不通的。我運行curl命令打端點。然而,連接被關閉。 – user3294786

0
votes
answers
6 views
+10

星火羣組rdd由配對RDD上的鑰匙和羣組組成,並從每個羣組中挑選最新的

0

Spark和Scala的新手。試圖達到以下。我的消息看起來像以下(鑰匙,ID,版本,dataObject時)星火羣組rdd由配對RDD上的鑰匙和羣組組成,並從每個羣組中挑選最新的

val transformedRDD = processedMessages.flatMap(message => { 
    message.isProcessed match { 
     case true => Some(message.key, message.id, message.version, message) 
     case false => None 
    } 
    }).groupByKey 

我想組由ID對每個消息並獲得最新版本的消息,然後groupbykey,然後調用它看起來像下面

預定方法
Ingest(key,RDD[dataObject]) 
+0

這並不回答你的問題,但也許會幫助你選擇正確的模塊,以滿足您的需求。爲什麼你使用Spark Streaming?如果你是_「新的spark和scala。」_ ?! –

+0

我在問題本身中沒有看到任何Spark Stream參考。 –

沙发
0
0

在大多數情況下,您應該避免groupByKey,因爲它可能導致重新洗牌,這可能非常昂貴。在您的使用案例中,您不需要groupByKey,而是可以使用reduceByKey

val transformedRDD = processedMessages 
    // notice that we will have Rdd[(String, Message)] or PairRdd after this flatMap 
    .flatMap(message => message.isProcessed match { 
    case true => Some((message.id, message)) 
    case false => None 
    }) 
    // after this reduction we will have latest message for each id 
    .reduceByKey((m1: Message, m2: Message) => m1.version >= m2.version match { 
    case true => m1 
    case false => m2 
    }) 
    // now we just want to keep message 
    .map({ case (id, message) => message }) 
+0

感謝您的信息...我必須做一步,除了找到每個id的最新版本,我必須按上面提到的鍵來分組,以收集所有rdds去單個表 – user2526641

+0

Rdd去單個表嗎? ??那是什麼意思?哪把鑰匙? –

+0

對不起。我知道如何寫。非常感謝 – user2526641

0
votes
answers
6 views
+10

批量插入記錄Slick

1

我想向Task表中插入10多行數據,所以我創建了這些記錄的列表。除了我的表,我有一個存儲庫,其中包含諸如創建,更新等功能。所以我可以使用創建一個函數來添加一條記錄,但我想用這個函數批量插入數據。批量插入記錄Slick

case class Task (
    idTask: Option[Long], 
    ownerId: Long, 
    name: String, 
    finished: Boolean 
) 

class TaskTable(tag: Tag) extends Table[Task](tag, "tasks"){ 
    val idTask = column[Long]("id_task", O.PrimaryKey) 
    val ownerId = column[Long]("owner") 
    val name = column[String]("name") 
    val finished = column[Boolean]("finished") 

    val ownerFk = foreignKey("owner_id_fk", ownerId, TableQuery[UserTable])(_.idUser) 

    def * = (idTask.?, ownerId, name, finished) <> (Task.apply _ tupled, Task.unapply) 
} 

object TaskTable{ 
    lazy val table = TableQuery[TaskTable] 
} 

class TaskRepository(db: Database) { 
    val taskTableQuery = TableQuery[TaskTable] 
    def create(task: Task): Future[Task] = 
    db.run(taskTableQuery returning taskTableQuery += task) 

def createTasks(tasks: List[Task]): Future[Option[Task]] = 
    db.run(taskTableQuery ++= tasks) 

    def update(task: Task): Future[Int] = 
    db.run(taskTableQuery.filter(_.idTask === task.idTask).update(task)) 

    def delete(task: Task): Future[Int] = 
    db.run(taskTableQuery.filter(_.idTask === task.idTask).delete) 

    def getById(task: Task): Future[Option[Task]] = 
    db.run(taskTableQuery.filter(_.idTask === task.idTask).result.headOption) 
} 

我試圖做這樣的事情:

val tasks = List(
    Task(Some(1), 1,"Analyze logs with Spark", false), 
    Task(Some(2), 1,"Clean and process data", false), 
... 
) 
val createTasks = tasks.map(taskRepository.create(_)) 

但這createTasks值具有類型單位,我不能db.runAwait.result運行它。由於返回類型不匹配,我的方法createTasks不能編譯。 那麼如何創建批量插入的方法或更改現有的方法? 我會非常感謝任何幫助!

+0

因爲你所描述的計算,不執行他們你需要使用'map'而不是'foreach'。 –

+0

@SeanVieira你的意思是''users.map(userRepository.create(_))''?但它仍然返回用戶列表作爲''Future [List [User]]'',我不能將它傳遞給''db.run''或''Await.result''函數 – Cassie

沙发
0
2

如果你知道在編譯時的動作,使用DBIO.seq()到鏈中的DB操作:

db.run(
    DBIO.seq(taskTableQuery += Task(....), 
      taskTableQuery += Task(....), 
      taskTableQuery += Task(....)... 
).transactionally 
) 

否則,使用DBIO.sequence

val taskSeq:Seq[Task] = ... however you get the tasks 
db.run( 
    DBIO.sequence(taskSeq.map(t=>taskTableQuery+=t)).transactionally 
) 
+0

這不完全是我希望我更願意通過任務列表。但是,無論如何謝謝 – Cassie

+0

有道理。請參閱更新的答案。 –

+0

謝謝。這很棒! – Cassie