001/*
002 * Copyright (c) 2016 Chris K Wensel <chris@wensel.net>. All Rights Reserved.
003 * Copyright (c) 2007-2017 Xplenty, Inc. All Rights Reserved.
004 *
005 * Project and contact information: http://www.cascading.org/
006 *
007 * This file is part of the Cascading project.
008 *
009 * Licensed under the Apache License, Version 2.0 (the "License");
010 * you may not use this file except in compliance with the License.
011 * You may obtain a copy of the License at
012 *
013 *     http://www.apache.org/licenses/LICENSE-2.0
014 *
015 * Unless required by applicable law or agreed to in writing, software
016 * distributed under the License is distributed on an "AS IS" BASIS,
017 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
018 * See the License for the specific language governing permissions and
019 * limitations under the License.
020 */
021
022package cascading.flow.stream.duct;
023
024/**
025 *
026 */
027public class Fork<Incoming, Outgoing> extends Duct<Incoming, Outgoing>
028  {
029  protected final Duct[] allNext; // ordinal in array must match ordinal expected downstream
030
031  public Fork( Duct[] allNext )
032    {
033    this.allNext = allNext;
034    }
035
036  public Duct[] getAllNext()
037    {
038    return allNext;
039    }
040
041  @Override
042  public Duct getNext()
043    {
044    // doesn't matter, next after a fork always takes value fields, not grouping fields.
045    return allNext[ 0 ];
046    }
047
048  @Override
049  public void start( Duct previous )
050    {
051    for( int i = 0; i < allNext.length; i++ )
052      allNext[ i ].start( previous );
053    }
054
055  @Override
056  public void receive( Duct previous, int ordinal, Incoming incoming )
057    {
058    for( int i = 0; i < allNext.length; i++ )
059      allNext[ i ].receive( previous, i, incoming );
060    }
061
062  @Override
063  public void complete( Duct previous )
064    {
065    for( int i = 0; i < allNext.length; i++ )
066      allNext[ i ].complete( previous );
067    }
068  }