1+ <?php namespace App \Worker ;
2+ /*
3+ * Copyright 2024 OpenStack Foundation
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ * http://www.apache.org/licenses/LICENSE-2.0
8+ * Unless required by applicable law or agreed to in writing, software
9+ * distributed under the License is distributed on an "AS IS" BASIS,
10+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+ * See the License for the specific language governing permissions and
12+ * limitations under the License.
13+ **/
14+ use Doctrine \DBAL \Exception ;
15+ use Doctrine \ORM \EntityManagerInterface ;
16+ use Doctrine \ORM \Exception \ORMException ;
17+ use Illuminate \Contracts \Debug \ExceptionHandler ;
18+ use Illuminate \Contracts \Events \Dispatcher ;
19+ use Illuminate \Contracts \Queue \Factory as QueueManager ;
20+ use Illuminate \Contracts \Queue \Job ;
21+ use Illuminate \Queue \Worker as IlluminateWorker ;
22+ use Illuminate \Queue \WorkerOptions ;
23+ use Illuminate \Support \Facades \Log ;
24+ use Throwable ;
25+
26+ /**
27+ * Class DoctrineWorker
28+ * @package App\Worker
29+ */
30+ class DoctrineWorker extends IlluminateWorker
31+ {
32+ /**
33+ * @var EntityManagerInterface
34+ */
35+ private $ entityManager ;
36+
37+ public function __construct (
38+ QueueManager $ manager ,
39+ Dispatcher $ events ,
40+ EntityManagerInterface $ entityManager ,
41+ ExceptionHandler $ exceptions ,
42+ callable $ isDownForMaintenance
43+ ) {
44+ $ this ->entityManager = $ entityManager ;
45+
46+ parent ::__construct ($ manager , $ events , $ exceptions , $ isDownForMaintenance );
47+ }
48+
49+ /**
50+ * @throws Throwable
51+ */
52+ protected function runJob ($ job , $ connectionName , WorkerOptions $ options ): void
53+ {
54+ try {
55+ $ this ->assertEntityManagerIsOpen ();
56+ $ this ->ensureDatabaseConnectionIsOpen ();
57+ $ this ->ensureEntityManagerIsClear ();
58+
59+ parent ::runJob ($ job , $ connectionName , $ options );
60+ } catch (Throwable $ exception ) {
61+ Log::error (sprintf ("DoctrineWorker::runJob error %s " , $ exception ->getMessage ()));
62+ // It's safe to assume that any exceptions caught by this block are a result of our assertions or setup,
63+ // since the parent runJob method catches all exceptions that occur during job execution.
64+ $ this ->exceptions ->report ($ exception );
65+ $ this ->requeueJob ($ job );
66+ $ this ->signalWorkerProcessShouldStop ();
67+ }
68+ }
69+
70+ /**
71+ * Asserts that the EntityManager is not closed.
72+ *
73+ * @throws ORMException If the EntityManager is closed.
74+ */
75+ private function assertEntityManagerIsOpen (): void
76+ {
77+ if ($ this ->entityManager ->isOpen ()) {
78+ return ;
79+ }
80+
81+ throw new ORMException ('The entity manager is closed. ' );
82+ }
83+
84+ /**
85+ * Pings the EntityManager's database connection to ensure that it is still open. If the connection is not open,
86+ * this method will attempt to re-open the connection.
87+ *
88+ * @throws Exception
89+ */
90+ private function ensureDatabaseConnectionIsOpen (): void
91+ {
92+ $ connection = $ this ->entityManager ->getConnection ();
93+
94+ // This replicates what the deprecated ping() function used to do.
95+ try {
96+ $ connection ->executeQuery ($ connection ->getDatabasePlatform ()->getDummySelectSQL ());
97+ $ ping = true ;
98+ } catch (Exception $ e ) {
99+ Log::warning (sprintf ("DoctrineWorker::ensureDatabaseConnectionIsOpen error %s " , $ e ->getMessage ()));
100+ $ ping = false ;
101+ }
102+
103+ if (!$ ping ) {
104+ $ connection ->close ();
105+ $ connection ->connect ();
106+ }
107+ }
108+
109+ /**
110+ * Clears the EntityManager to ensure that nothing persists between job runs.
111+ */
112+ private function ensureEntityManagerIsClear (): void
113+ {
114+ $ this ->entityManager ->clear ();
115+ }
116+
117+ /**
118+ * Immediately places the job back on the queue, so it can be handled by a different worker process (or the same
119+ * worker process if it restarts before the job is processed). We don't respect the configured "backoff" option
120+ * for the job here, since if we reach this point it means the job was never actually processed.
121+ */
122+ private function requeueJob (Job $ job ): void
123+ {
124+ if (!$ job ->isDeleted () && !$ job ->isReleased () && !$ job ->hasFailed ()) {
125+ $ job ->release ();
126+ }
127+ }
128+
129+ /**
130+ * Kills the worker process, so it can be restarted by a process supervisor.
131+ */
132+ private function signalWorkerProcessShouldStop (): void
133+ {
134+ $ this ->shouldQuit = true ;
135+ }
136+ }
0 commit comments