Home Php C# Sql C C++ Javascript Python Java Go Android Git Linux Asp.net Django .net Node.js Ios Xcode Cocoa Iphone Mysql Tomcat Mongodb Bash Objective-c Scala Visual-studio Apache Elasticsearch Jar Eclipse Jquery Ruby-on-rails Ruby Rubygems Android-studio Spring Lua Sqlite Emacs Ubuntu Perl Docker Swift Amazon-web-services Svn Html Ajax Xml Java-ee Maven Intellij-idea Rvm Macos Unix Css Ipad Postgresql Css3 Json Windows-server Vue.js Typescript Oracle Hibernate Internet-explorer Github Tensorflow Laravel Symfony Redis Html5 Google-app-engine Nginx Firefox Sqlalchemy Lucene Erlang Flask Vim Solr Webview Facebook Zend-framework Virtualenv Nosql Ide Twitter Safari Flutter Bundle Phonegap Centos Sphinx Actionscript Tornado Register | Login | Edit Tags | New Questions | 繁体 | 简体


7 questions online user: 11

0
votes
answers
30 views
+10

RabbitMQ是否支持按時間從隊列中執行進程?

-1

這次可以在RabbitMQ中爲呼叫處理程序存儲時間嗎? RabbitMQ是否支持這個?RabbitMQ是否支持按時間從隊列中執行進程?

沙发
0
1

是的,它支持,但只能通過額外的插件。

There is more dateiled about that

簡單地說,你需要安裝插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange,並添加新標題到消息:

byte[] messageBodyBytes = "delayed payload".getBytes(); 
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder(); 
headers = new HashMap<String, Object>(); 
headers.put("x-delay", 5000); 
props.headers(headers); 
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes); 

所以你需要把x-delay值與毫秒後應該處理此消息。

+0

也許你可以推薦我另一種Python的替代機制?當用戶可以將事件(日期)放入隊列以便進一步執行時? – Oleg

+0

如何從存儲中獲取日期並將其添加到Countdown EAT中?每次沒有停止時需要ping數據庫? – Oleg

+0

這樣你建議不支持datetime格式的延遲時間,只需要幾毫秒 – Oleg

0
votes
answers
48 views
+10

調度程序不排隊作業

0

我試圖測試Kubernetes上的氣流。調度程序,工作者,隊列和Web服務器都在不同的部署上,我使用Celery Executor來運行我的任務。調度程序不排隊作業

除了調度程序無法排隊工作以外,一切正常。當我從Web UI或CLI手動執行它時,Airflow能夠正常運行我的任務,但我試圖測試調度程序以使其工作。

我的配置幾乎一樣,因爲它是在單個服務器上:

sql_alchemy_conn = postgresql+psycopg2://username:[email protected]/db 
broker_url = amqp://user:[email protected]$RABBITMQ_SERVICE_HOST:5672/vhost 
celery_result_backend = amqp://user:[email protected]$RABBITMQ_SERVICE_HOST:5672/vhost 

我相信,隨着這些配置,我應該能夠讓它運行,但由於某些原因,只有工人能夠看到DAG和他們的狀態,但不看調度器,即使調度器能夠記錄他們的心跳就好。還有什麼我應該調試或看看?

+0

氣流有幾個已知的臭蟲芹菜,執行者有一些bug ... – Liqiang

+0

@李強你會推薦什麼呢?我相信Celery是分佈式任務執行的唯一方式 –

+0

是的,Celery是Python語言中唯一強大且穩定的分佈式任務執行,對於我的觀點,我建議不要使用airflow來運行,只需使用芹菜本身。 – Liqiang

沙发
0
0

首先,您將postgres用作氣流數據庫,不是嗎?你是否爲postgres部署了一個pod和服務?如果是的話,你請驗證您的配置文件,你必須:

sql_alchemy_conn = postgresql+psycopg2://username:[email protected]/db 

您可以使用此github。我在3周前用它進行了第一次測試,效果非常好。 入口點對驗證rabbitMq和Postgres是否配置良好很有用。

+0

是的,我一直在使用該圖像(實際上是叉[版本](https://github.com/Stibbons/kube-airflow/tree/helm_chart),因爲我試圖寫一個頭盔圖)。一切工作正常,從我可以看到心臟跳動到Postgres分貝,但沒有任何東西仍在計劃中 –

+0

您是否已驗證您的dag是否未被暫停?他們在用戶界面上的狀態是什麼?你有沒有嘗試在氣流艙(kubectl exec ...)中手動運行一個DAG? (對不起,也許愚蠢的問題)。 – pcc

+0

我如何驗證我的DAG是否未被暫停?我打開了它們,調度程序正在發送心跳。它們在用戶界面上的狀態反映了我不得不在UI上單獨運行任務的時間,因此有些正在運行,有些已完成。但我其實並沒有手動運行它們,所以謝謝你的建議! –

0
votes
answers
54 views
+10

如何刪除rabbitmq中的消息(pika)

0

如何刪除發佈和接收的消息,在rabbitmq 我正在用python測試rabitmq,同樣的消息還在繼續發佈和接收,如何避免這種情況。 我怎樣才能刪除公佈並收到如何刪除rabbitmq中的消息(pika)

+0

你必須發送確認返回其中收到並設置的消息是自動刪除選項,或者在接收到確認消息時執行自定義代碼以刪除消息。 –

+0

是否可以在rabbitmq的配置中設置它 – user3640571

+0

你可以將'''task_id.revoke(termination = True)'''設置爲params。或清除所有任務https://stackoverflow.com/questions/24899772/how-do-i-permanently-remove-a-celery-task-from-rabbitmq?answertab=active#tab-top –

沙发
0
0

請遵循這樣的消息:

https://www.rabbitmq.com/tutorials/tutorial-two-python.html

最幸運的你是不是發送ACK:

def callback(ch, method, properties, body): 
    print " [x] Received %r" % (body,) 
    time.sleep(body.count('.')) 
    print " [x] Done" 
    ch.basic_ack(delivery_tag = method.delivery_tag) ### <--- this one 

channel.basic_consume(callback, 
         queue='hello') 
0
votes
answers
54 views
+10

如何對RabbitMQ Cluster進行性能測試以做進一步的微調?

0

我已經創建了一個RabbitMQ羣集,它正在對由應用程序生成的消息進行排隊。我需要對集??羣進行性能測試,以便找出集羣的整體效率,並做出決定進行進一步的微調以提高性能。我們用PerfTest java工具嘗試過。但實現不了多少。如何對RabbitMQ Cluster進行性能測試以做進一步的微調?

+0

你應該詳細說明你有什麼確切的問題。還有什麼意思是「無法實現的」? –

沙发
0
0

我想這個問題從你想要測試哪個界面開始?這將決定你的支持該接口的工具。

您是否正在推送和流行?

有多少個隊列?

有多少生產者和多少消費者?您是否會對消費者產生輕微的負面影響,以影響總是或接近空的隊列集?

您將如何定義效率?這是由隊列中的項目數量,從隊列中推送或彈出的時間還是以前的一些組合來定義的?

???

0
votes
answers
41 views
+10

如何註冊一個隊列,並且它是RabbitMQ和Spring的獨佔使用者/偵聽器?

0

我有以下問題需要解決: 我想實現一個簡單的使用RabbitMQ消息傳遞的延遲重試機制。我有一個基礎設施,可以讓我延遲傳遞信息。我可以有任何想要在運行時利用這種延遲重試機制的感興趣的參與者。如何註冊一個隊列,並且它是RabbitMQ和Spring的獨佔使用者/偵聽器?

參與者只想給我提供2個細節和消息: 1.隊列名稱,他們希望在延遲T秒後傳遞消息。 2.隊列的消費者(比如消息的消費者。)

我試圖做到以下幾點:

private void startSeparateListener(final Object messageConsumer, 
            final Queue queue) { 
     SimpleMessageListenerContainer simpleMessageListenerContainer 
        = myCustomeSimpleMessageListenerFactory.create(); 
     simpleMessageListenerContainer.setRabbitAdmin(rabbitAdmin); 
     simpleMessageListenerContainer.setQueues(queue); 
     simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(messageConsumer)); 
     simpleMessageListenerContainer.start(); 

    } 

請注意,隊列已被創建並與rabbitadmin已註冊並且對象使用者有一個名爲handleMessage的方法來偵聽隊列。

這是在運行時爲隊列的消息使用者動態註冊隊列的正確方法嗎?

注: 春天已經提供了類型SimpleMessageListenerContainer一樣的豆,但會使用bean添加隊列和消費者的動態會導致發言權Q1的無意識消費的問題,被稱爲另一個隊列的接收器的一部分,說Q2,其內容類型可能與Q1相同?

我嘗試了很多關於它的搜索,但無法獲得任何具體的解釋。如果它是一個重複的問題和任何天真的話,事先道歉。

沙发
0
0

我不能編譯你的問題,但我可以告訴你已經有了RabbitMQ和Sring AMQP supports的Dealyed Exchange解決方案。

我建議遠離動態添加SimpleMessageListenerContainer:它不像它看起來那麼簡單。有沒有像addQueueNames()一個選項:

/** 
* Add queue(s) to this container's list of queues. The existing consumers 
* will be cancelled after they have processed any pre-fetched messages and 
* new consumers will be created. The queue must exist to avoid problems when 
* restarting the consumers. 
* @param queueName The queue to add. 
*/ 
@Override 
public void addQueueNames(String... queueName) { 

所以,你可能會考慮不增加新的集裝箱,但添加新的隊列,以現有的一個。 amqp_consumerQueue的下游路由可能有助於區分來自不同隊列的消息。

+0

尊敬的@ artem-bilan,感謝您的回覆。我的道歉是我不能很好地解決問題。我知道延遲交換插件,但是我已經創建了自己的延遲基礎結構,使用交換和隊列以及適當定義的x-message-ttl和x-deadletter-exchange。這整個infra發送所有的消息到匯兌。我想創建一個隊列,將它綁定到交換機上,並在隊列中註冊一個消費者,全部在運行時。因此,我創建一個新的SimpleMessageListenerContainer並使用它的問題。我希望我能更好地解釋它。 –

0
votes
answers
37 views
+10

RabbitMQ連接錯誤「沒有指定的端點可達」

0

我在服務器和我的系統上安裝了rabbitmq服務。 我想使用RPC模式:RabbitMQ連接錯誤「沒有指定的端點可達」

var factory = new ConnectionFactory() { 
    HostName = "158.2.14.42", 
    Port = Protocols.DefaultProtocol.DefaultPort, 
    UserName = "Administrator", 
    Password = "@[email protected]", 
    VirtualHost = "/" 
    ContinuationTimeout = new TimeSpan(10, 0, 0, 0) 
}; 

connection = factory.CreateConnection(); 

我有創造與此消息連接的錯誤:
無指定端點分別可達

當我使用它在服務器上的本地主機比如,它工程,但是當我創建從本地到該服務器的連接時,它返回錯誤。 它不適用於我本地計算機的本地IP和用戶名和密碼。


任何人都可以幫助我嗎?

+0

[RabbitMQ C#連接使用用戶名和密碼時出現問題]的可能重複(https://stackoverflow.com/questions/4987438/rabbitmq-c-sharp-connection-trouble-when-using-a-username-and -password) –

+0

@RazvanDumitru,我這樣做,但仍然有錯誤。 和代碼無法識別** FromEnvironment ** – parsa

+0

@RazvanDumitru謝謝你,你的指導幫助了我。 – parsa

沙发
0
0

謝謝大家。 由於這樣的:
RabbitMQ C# connection trouble when using a username and password
安裝的RabbitMQ後,我啓用了管理工具在服務器和我的本地計算機與此有關:

rabbitmq-plugins enable rabbitmq_management 

然後我重新啓動的RabbitMQ服務從services.msc
我能看到的RabbitMQ管理在http://localhost:15672
我登錄到兔子管理與用戶:來賓和通行證:來賓
我添加了我最喜歡的用戶通行證與管理員訪問,所以它的工作。

板凳
0
0

這意味着客戶端無法訪問服務器158.2.14.42和默認虛擬主機/

也許防火牆配置

+0

我在服務器的widnows防火牆上添加** 5672 **端口,但沒有解決。 – parsa

+0

有什麼建議嗎?有什麼解決方案,我必須這樣做? – parsa

地板
0
0

默認情況下,你說的RabbitMQ將監聽5672但是這是可以改變的,如果你看看你的配置(在MQ服務器上),你應該有一節:

rabbit.tcp_listeners

它將詳細說明正在使用的端口。檢查它是你的想法。

另外,你使用的是IPv4還是IPv6?您可能需要有其他配置才能支持這兩種配置。

對此有讀:

https://www.rabbitmq.com/networking.html

+0

它表示默認情況下,RabbitMQ將在所有可用接口上監聽端口5672。 在配置文件上,我必須改變什麼? – parsa

0
votes
answers
57 views
+10

Apache進程不會在與RabbitMQ斷開連接後死亡

0

我試圖在我的項目中使用Server Side Events機制。 (這類似於類固醇上的長輪詢)Apache進程不會在與RabbitMQ斷開連接後死亡

來自「Sending events from the server」字幕的例子效果非常好。幾秒鐘後,斷開連接,apache進程終止。此方法工作正常。

但是!如果我嘗試使用RabbitMQ,則在瀏覽器從服務器斷開連接後,Apache不會導致進程中斷(es.close())。並且進程保持原樣並在docker容器重新啓動後纔會被殺死。

connection_abortedconnection_status根本不起作用。 connection_aborted僅返回0,即使斷開連接,connection_status也返回CONNECTION_NORMAL。只有當我使用RabbitMQ時纔會發生。沒有RMQ這個功能運作良好。

ignore_user_abort(false)也不起作用。

代碼示例:

<?php 
use PhpAmqpLibChannelAMQPChannel; 
use PhpAmqpLibConnectionAbstractConnection; 
use PhpAmqpLibExceptionAMQPTimeoutException; 
use PhpAmqpLibMessageAMQPMessage; 

class RequestsRabbit 
{ 
    protected $rabbit; 

    /** @var AMQPChannel */ 
    protected $channel; 

    public $exchange = 'requests.events'; 

    public function __construct(AbstractConnection $rabbit) 
    { 
     $this->rabbit = $rabbit; 
    } 

    public function getChannel() 
    { 
     if ($this->channel === null) { 
      $channel = $this->rabbit->channel(); 

      $channel->exchange_declare($this->exchange, 'fanout', false, false, false); 

      $this->channel = $channel; 
     } 

     return $this->channel; 
    } 

    public function send($message) 
    { 
     $channel = $this->getChannel(); 

     $message = json_encode($message); 

     $channel->basic_publish(new AMQPMessage($message), $this->exchange); 
    } 

    public function subscribe(callable $callable) 
    { 
     $channel = $this->getChannel(); 

     list($queue_name) = $channel->queue_declare('', false, false, true, false); 

     $channel->queue_bind($queue_name, $this->exchange); 

     $callback = function (AMQPMessage $msg) use ($callable) { 
      call_user_func($callable, json_decode($msg->body)); 
     }; 

     $channel->basic_consume($queue_name, '', false, true, false, false, $callback); 

     while (count($channel->callbacks)) { 
      if (connection_aborted()) { 
       break; 
      } 

      try { 
       $channel->wait(null, true, 5); 
      } catch (AMQPTimeoutException $exception) { 
      } 
     } 

     $channel->close(); 
     $this->rabbit->close(); 
    } 
} 

會發生什麼:

  • 瀏覽器建立連接SSE到服務器。 var es = new EventSource(url);
  • Apache2產生新的進程來處理這個請求。
  • PHP生成一個新的隊列並連接到它。
  • 瀏覽器關閉連接es.close()
  • Apache2不會終止進程並保持原樣。 RabbitMQ隊列不會被刪除。如果我做了一些重新連接,它會產生一堆進程和一堆隊列(1重新連接= 1進程= 1隊列)。
  • 我關閉所有選項卡 - 活動進程。我關閉瀏覽器 - 相同的情況。

尋找某種類型的PHP錯誤。還是Apach2?

我用什麼:

一些截圖:

RabbitMQ queues

Processes

請幫我弄清楚是怎麼回事...

附:對不起我的英語不好。如果您可以找到錯誤或錯字,請在評論中指出它。我會很感激:)

沙发
0
0

,如果你在你的服務器端事件使用send()subscribe()(或兩者)你不說。假設你正在使用subscribe()沒有錯誤。這個循環:

while (count($channel->callbacks)) { 
    if (connection_aborted()) { 
     break; 
    } 

    try { 
     $channel->wait(null, true, 5); 
    } catch (AMQPTimeoutException $exception) { 
    } 
} 

將運行,直到進程被殺死或連接遠離的RabbitMQ關閉。收聽排隊消息時這是正常的。如果你需要在某個時候停止循環,你可以設置一個變量來檢查循環,或者在SSE結束時拋出異常(儘管我覺得這很尷尬)。

+0

編號進程不會被apache2殺死。這是自2004年以來最古老的錯誤 - https://bugs.php.net/bug.php?id=30301。 'connection_aborted'和'connection_status'函數不起作用。而這些無法解決它。對我來說,最好使用Websocketd(最後用D) –